# Step 1.1.2 Stage Dataset

We would do the following:
 - Upload the data into stage
 - Load the same into Snowflake
 - Perform data/feature engineering for handling
     - missing data
     - null records


In [16]:
from IPython.display import display, HTML, Image , Markdown
from snowflake.snowpark.session import Session
# from snowflake.snowpark.types import * 
# from snowflake.snowpark.functions import *
import configparser

PROJECT_HOME_DIR = '../../..'
CONFIG_FL = f'{PROJECT_HOME_DIR}/config.ini'
LOCAL_TEMP_DIR = f'{PROJECT_HOME_DIR}/temp'

%run ./scripts/notebook_helpers.py

In [17]:
# Initialization
set_cell_background('#EAE3D2')

config = configparser.ConfigParser()
sflk_session = None

print(" Initialize Snowpark session")
with open(CONFIG_FL) as f:
    config.read(CONFIG_FL)
    snow_conn_flpath =  f"{PROJECT_HOME_DIR}/{config['DEFAULT']['connection_fl']}"
    
    # ------------
    # Connect to snowflake
    with open(snow_conn_flpath) as conn_f:
        snow_conn_info = json.load(conn_f)
        sflk_session = Session.builder.configs(snow_conn_info).create()

if(sflk_session == None):
    raise(f'Unable to connect to snowflake. Validate connection information in file: {CONFIG_FL} ')

df = sflk_session.sql('select current_warehouse(), current_user(), current_role();').to_pandas()
display(df)

 Initialize Snowpark session


Unnamed: 0,CURRENT_WAREHOUSE(),CURRENT_USER(),CURRENT_ROLE()
0,LAB_WH,VSEKAR,DEV_BLOGGER


---
## Dataset

For the demo, we are using the dataset that is available from [kaggle: smart-building-system](https://www.kaggle.com/datasets/ranakrc/smart-building-system).
Would recommend that you read the description, to understand the structure & metadata .

A sample portion of the data is present in the 'data' folder, will be using this portion in the demo.If you prefer to run with 
the entire dataset, then you would need to download the data into a local folder and then update the configuration 'smart_building' 
in 'config.ini' file.


In [19]:
# Upload data
set_cell_background('#EAE3D2')

import pandas as pd
import os 
from pathlib import Path

print('  Loading sensor readings across rooms into corresponding dataframes ...')

co2_pddfs = []
humidity_pddfs = []
light_pddfs = []
pir_pddfs = []
temperature_pddfs = []

DATA_DIR = f'''{PROJECT_HOME_DIR}/{config['DATA']['smart_building']}/'''
for path, currentDirectory, files in os.walk(DATA_DIR):
    
    # Ignore root level directory
    if path == DATA_DIR:
        continue

    # extract the room no from the file path
    room_no = path.replace(DATA_DIR,'').split('/')[0]
    
    for data_fl in files:
        data_filepath = os.path.join(path, data_fl)

        # There could be none data type files like .DS_STORE,
        # Which is introduced by visual code IDE. These should be ignored

        if data_filepath.endswith('.csv') == False:
            continue
        
        sensor_type = data_fl.split('.')[0]
        tmp_df = pd.read_csv(data_filepath, header=None
            ,names=['M_TS', sensor_type])
        tmp_df['room_no'] = room_no

        tmp_df.columns = [x.upper() for x in tmp_df.columns]

        if data_fl.endswith('co2.csv') == True:
            co2_pddfs.append(tmp_df)

        elif data_fl.endswith('humidity.csv') == True:
            humidity_pddfs.append(tmp_df)

        elif data_fl.endswith('light.csv') == True:
            light_pddfs.append(tmp_df)

        elif data_fl.endswith('pir.csv') == True:
            pir_pddfs.append(tmp_df)

        elif data_fl.endswith('temperature.csv') == True:
            temperature_pddfs.append(tmp_df)

        # break
    
co2_pddf = pd.concat(co2_pddfs)
humidity_pddf = pd.concat(humidity_pddfs)
light_pddf = pd.concat(light_pddfs)
pir_pddf = pd.concat(pir_pddfs)
temperature_pddf = pd.concat(temperature_pddfs)

  Loading sensor readings across rooms into corresponding dataframes ...


In [20]:
# Store into snowflake
set_cell_background('#EAE3D2')

print(' Storing sensor readings into tables ...')

target_db = config['DEFAULT']['db']
target_schema = config['DEFAULT']['sch']

# Perform cleanup, useful in case of debugging
stmts = [
    f' use schema {target_db}.{target_schema}; '
    ,f' drop table if exists {target_db}.{target_schema}.CO2; '
    ,f' drop table if exists {target_db}.{target_schema}.HUMIDITY; '
    ,f' drop table if exists {target_db}.{target_schema}.LIGHT; '
    ,f' drop table if exists {target_db}.{target_schema}.PIR; '
    ,f' drop table if exists {target_db}.{target_schema}.TEMPERATURE; '
]
for stmt in stmts:
    sflk_session.sql(stmt).collect()

print('  Saving table CO2 ...')
co2_spdf = sflk_session.write_pandas(co2_pddf
    ,f'CO2' 
    , auto_create_table=True, create_temp_table=False)

print('  Saving table HUMIDITY ...')
humidity_sddf = sflk_session.write_pandas(humidity_pddf
    ,f'HUMIDITY'
    , auto_create_table=True, create_temp_table=False)

print('  Saving table LIGHT ...')
light_sddf = sflk_session.write_pandas(light_pddf
    ,f'LIGHT'
    , auto_create_table=True, create_temp_table=False)

print('  Saving table PIR ...')
pir_sddf = sflk_session.write_pandas(pir_pddf
    ,f'PIR'
    , auto_create_table=True, create_temp_table=False)

print('  Saving table TEMPERATURE ...')
temperature_sddf = sflk_session.write_pandas(temperature_pddf
    ,f'TEMPERATURE'
    , auto_create_table=True, create_temp_table=False)

 Storing sensor readings into tables ...
  Saving table CO2 ...
  Saving table HUMIDITY ...
  Saving table LIGHT ...
  Saving table PIR ...
  Saving table TEMPERATURE ...


### Stitch all the sensors data

We need to create one dataframe which will contain the measurement across all the sensor types joined for a specific room no and measuring time.

We perform this operation using Snowpark. For those of us, who prefers SQL here is a version of what
the code does.

##### Effective SQL:
```sql
create or replace view ref_sensor_measurements_vw as
select
    case
        when c.m_ts is not null then c.m_ts
        when h.m_ts is not null then h.m_ts
        when l.m_ts is not null then l.m_ts
        when p.m_ts is not null then p.m_ts
        when t.m_ts is not null then t.m_ts
    end as upd_mts
    ,to_timestamp_ntz(upd_mts) as measured_at
    ,c.room_no as measured_room_no
    ,c.co2
    ,h.humidity
    ,l.light
    ,p.pir
    ,t.temperature
    
from co2 as c
    full outer join humidity as h 
        on h.room_no = c.room_no
            and h.m_ts = c.m_ts
    full outer join light as l
        on l.room_no = c.room_no
            and l.m_ts = c.m_ts
    full outer join pir as p
        on p.room_no = c.room_no
            and p.m_ts = c.m_ts
    full outer join temperature as t 
        on t.room_no = c.room_no
            and t.m_ts = c.m_ts
where measured_room_no is not null
;
```


In [21]:
# Load data into dataframe
set_cell_background('#EAE3D2')

co2_spdf = sflk_session.table(f'{target_db}.{target_schema}.CO2')
humidity_sddf = sflk_session.table(f'{target_db}.{target_schema}.HUMIDITY')
light_sddf = sflk_session.table(f'{target_db}.{target_schema}.LIGHT')
temperature_sddf = sflk_session.table(f'{target_db}.{target_schema}.TEMPERATURE')
pir_sddf = sflk_session.table(f'{target_db}.{target_schema}.PIR')

In [22]:
# start from co2
set_cell_background('#EAE3D2')

room_spdf = co2_spdf

room_spdf = room_spdf.join(humidity_sddf 
    ,(room_spdf.ROOM_NO == humidity_sddf.ROOM_NO) & (room_spdf.M_TS == humidity_sddf.M_TS)
    ,'full') \
    .select(
        coalesce(room_spdf.M_TS ,humidity_sddf.M_TS).alias('M_TS')
        ,room_spdf.ROOM_NO.alias('ROOM_NO') 
        ,room_spdf.CO2 ,humidity_sddf.HUMIDITY
        )

room_spdf = room_spdf.join(light_sddf 
    ,(room_spdf.ROOM_NO == light_sddf.ROOM_NO) & (room_spdf.M_TS == light_sddf.M_TS)
    ,'full') \
    .select(
        coalesce(room_spdf.M_TS ,light_sddf.M_TS).alias('M_TS')
        ,room_spdf.ROOM_NO.alias('ROOM_NO') 
        ,room_spdf.CO2 ,room_spdf.HUMIDITY
        ,light_sddf.LIGHT
        )

room_spdf = room_spdf.join(temperature_sddf 
    ,(room_spdf.ROOM_NO == temperature_sddf.ROOM_NO) & (room_spdf.M_TS == temperature_sddf.M_TS)
    ,'full') \
    .select(
        coalesce(room_spdf.M_TS ,temperature_sddf.M_TS).alias('M_TS')
        ,room_spdf.ROOM_NO.alias('ROOM_NO') 
        ,room_spdf.CO2 ,room_spdf.HUMIDITY ,room_spdf.LIGHT
        ,temperature_sddf.TEMPERATURE
        )

room_spdf = room_spdf.join(pir_sddf 
    ,(room_spdf.ROOM_NO == pir_sddf.ROOM_NO) & (room_spdf.M_TS == pir_sddf.M_TS)
    ,'full') \
    .select(
        coalesce(room_spdf.M_TS ,pir_sddf.M_TS).alias('M_TS')
        ,room_spdf.ROOM_NO.alias('ROOM_NO') 
        ,room_spdf.CO2 ,room_spdf.HUMIDITY ,room_spdf.LIGHT ,room_spdf.TEMPERATURE
        ,pir_sddf.PIR
        )
    
room_spdf = room_spdf.with_column("MEASURED_AT", to_timestamp(room_spdf.M_TS))

room_spdf = room_spdf.filter(is_null(room_spdf.ROOM_NO) == False)

sensor_measurements_vw = config['DEFAULT']['sensor_measurements_view']
print(f'  Creating view: {sensor_measurements_vw} ...')
room_spdf.createOrReplaceView(f'{target_db}.{target_schema}.{sensor_measurements_vw}')
room_spdf.show(3)

  Creating view: sensor_measurements_vw ...
-------------------------------------------------------------------------------------------------------
|"M_TS"      |"ROOM_NO"  |"CO2"  |"HUMIDITY"  |"LIGHT"  |"TEMPERATURE"  |"PIR"  |"MEASURED_AT"        |
-------------------------------------------------------------------------------------------------------
|1377299107  |413        |488.0  |45.34       |97.0     |23.93          |NULL   |2013-08-23 23:05:07  |
|1377299108  |413        |495.0  |45.34       |97.0     |23.94          |0.0    |2013-08-23 23:05:08  |
|1377299110  |413        |487.0  |NULL        |NULL     |NULL           |NULL   |2013-08-23 23:05:10  |
-------------------------------------------------------------------------------------------------------



#### Impute missing values

There are lots of records whose measurement (for ex Light) is empty. Instead of dropping these rows, lets assume the measurement remains 
the same, as the last measured value. We thus fill in the missing measurement values, based on previous/last known measurement for each
sensor type.

We need to create one dataframe which will contain the measurement across all the sensor types joined for a specific room no and measuring time. 

##### Effective SQL:
```sql

create or replace transient table ref_sensor_measurements_imputed as
select
    upd_mts as m_ts
    ,measured_at ,measured_room_no as room_no
    ,nvl(co2
        ,LAG(co2, 1, -1) IGNORE NULLS OVER (PARTITION BY room_no ORDER BY measured_at ASC NULLS FIRST )
        ) AS co2
    ,nvl(humidity
        ,LAG(humidity, 1, -1) IGNORE NULLS OVER (PARTITION BY room_no ORDER BY measured_at ASC NULLS FIRST )
        ) AS humidity
    ,nvl(light
        ,LAG(light, 1, -1) IGNORE NULLS OVER (PARTITION BY room_no ORDER BY measured_at ASC NULLS FIRST )
        ) AS light
    ,nvl(temperature
        ,LAG(temperature, 1, -1) IGNORE NULLS OVER (PARTITION BY room_no ORDER BY measured_at ASC NULLS FIRST )
        ) AS temperature
    ,nvl(pir
        ,LAG(pir, 1, -1) IGNORE NULLS OVER (PARTITION BY room_no ORDER BY measured_at ASC NULLS FIRST )
        ) AS pir
    
from ref_sensor_measurements_vw
order by room_no ,m_ts asc
;
```

In [23]:
# merge all the dataframe together
set_cell_background('#EAE3D2')

from snowflake.snowpark import Window ,WindowSpec

room_imputed_spdf = sflk_session.table(f'{target_db}.{target_schema}.{sensor_measurements_vw}')


room_imputed_spdf = room_imputed_spdf.with_column("CO2", 
                        coalesce(room_spdf.CO2 
                            ,lag(room_spdf.CO2 ,offset = 1 ,default_value = -1 ,ignore_nulls = True)
                                .over(Window.partitionBy(room_spdf.ROOM_NO).order_by(room_spdf.M_TS))
                        )
)

room_imputed_spdf = room_imputed_spdf.with_column("HUMIDITY", 
                        coalesce(room_spdf.HUMIDITY 
                            ,lag(room_spdf.HUMIDITY ,offset = 1 ,default_value = -1 ,ignore_nulls = True)
                                .over(Window.partitionBy(room_spdf.ROOM_NO).order_by(room_spdf.M_TS))
                        )
)

room_imputed_spdf = room_imputed_spdf.with_column("LIGHT", 
                        coalesce(room_spdf.LIGHT 
                            ,lag(room_spdf.LIGHT ,offset = 1 ,default_value = -1 ,ignore_nulls = True)
                                .over(Window.partitionBy(room_spdf.ROOM_NO).order_by(room_spdf.M_TS))
                        )
)

room_imputed_spdf = room_imputed_spdf.with_column("TEMPERATURE", 
                        coalesce(room_spdf.TEMPERATURE 
                            ,lag(room_spdf.TEMPERATURE ,offset = 1 ,default_value = -1 ,ignore_nulls = True)
                                .over(Window.partitionBy(room_spdf.ROOM_NO).order_by(room_spdf.M_TS))
                        )
)

room_imputed_spdf = room_imputed_spdf.with_column("PIR", 
                        coalesce(room_spdf.PIR 
                            ,lag(room_spdf.PIR ,offset = 1 ,default_value = -1 ,ignore_nulls = True)
                                .over(Window.partitionBy(room_spdf.ROOM_NO).order_by(room_spdf.M_TS))
                        )
)

room_imputed_spdf = room_imputed_spdf.with_column("OCCUPIED"
    ,iff(room_spdf.PIR < 0, -1 
        ,iff(room_spdf.PIR > 0, 1 ,0)
    )
)

room_imputed_spdf = room_imputed_spdf.sort(room_imputed_spdf.ROOM_NO ,room_imputed_spdf.M_TS.asc())

imputed_table = config['DEFAULT']['sensor_measurements_imputed_table']
print(f' Storing result to table: {imputed_table} ...')
room_imputed_spdf.write.mode("overwrite").save_as_table(f'{target_db}.{target_schema}.{imputed_table}')


room_imputed_spdf.show(3)

 Storing result to table: sensor_measurements_imputed ...
--------------------------------------------------------------------------------------------------------------------
|"M_TS"      |"ROOM_NO"  |"MEASURED_AT"        |"CO2"  |"HUMIDITY"  |"LIGHT"  |"TEMPERATURE"  |"PIR"  |"OCCUPIED"  |
--------------------------------------------------------------------------------------------------------------------
|1377299107  |413        |2013-08-23 23:05:07  |488.0  |45.34       |97.0     |23.93          |-1.0   |-1          |
|1377299108  |413        |2013-08-23 23:05:08  |495.0  |45.34       |97.0     |23.94          |0.0    |0           |
|1377299110  |413        |2013-08-23 23:05:10  |487.0  |45.34       |97.0     |23.94          |0.0    |0           |
--------------------------------------------------------------------------------------------------------------------



---
### Close out

    With that we are finished this section of the demo setup

In [24]:
sflk_session.close()
print('Finished!!!')

Finished!!!
