# ETL or ELT

This file is intended to run the ETL or ELT pipeline once.

In our machine learning use case we can apply ETL to derive a harmonized SQL database that hase multiple tables that can be joined in the MLOps pipeline.

## ELT

In [131]:
import os
from os import walk
import sys
import logging
import time
import datetime
import json
import requests
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from sqlalchemy import create_engine

URL = f"sqlite:///ELT.db"
with open("ELT.db", "w") as f:
    pass

path = "dso_data"

### Extract and Load

Usually in practical applications data is extracted and load directly into a database.

In [132]:
def extract_and_load(path):
    building_metadata = None
    building_metadata_path = ""
    meter_data = None
    weather_data = None

    file_paths = []
    for (dirpath, dirnames, filenames) in walk(path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            file_paths.append(file_path)
    file_paths = [file_path for file_path in file_paths if ".DS_Store" not in file_path and ".ipynb_checkpoints" not in file_path]
    
    pop_index = None
    if 'dso_data/building_metadata.csv' in file_paths:
        pop_index = file_paths.index('dso_data/building_metadata.csv')
    if pop_index != None:
        building_metadata_path = file_paths.pop(pop_index)
    

    meter_data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
    meter_data_list = []
    for file_path in file_paths:
        data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
        if file_path.endswith(".csv") and "meter_reading" in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    row = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    row.remove("meter_reading")
                    data[["site_id", "building_id"]] = row
                    data = data.melt(
                        id_vars=["timestamp", "site_id", "building_id", ], 
                        var_name="meter_id", 
                        value_name="meter_reading",
                    )
                    data["meter_id"] = data["meter_id"].apply(lambda x: int(x.split("_")[-1]))
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
            except EmptyDataError as e:
                print("csv", e)
        elif file_path.endswith(".jsonl"):
            try:
                data = pd.read_json(file_path,  lines=True)
                if len(data) > 0:    
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".jsonl", "").split("dso_data/")[-1].split("/")
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
            except Exception as e:
                print("json", e)
        if len(data) > 0:
            meter_data_list.append(data)
    meter_data = pd.concat(meter_data_list).sort_values(by=["timestamp", "site_id", "building_id", "meter_id"])
    meter_data["meter_id"] = meter_data["meter_id"].astype(int)
    meter_data["building_id"] = meter_data["building_id"].astype(int)
    meter_data = meter_data.rename(columns={"meter_id": "meter"})
    
    # retrieve weather data
    weather_data_list = []
    for site_id in meter_data.site_id.unique():
        res_hist = requests.get("http://131.234.154.105:33100/history/", params={'site': site_id, 'start': meter_data.timestamp.min(), 'end': meter_data.timestamp.max()})
        content = json.loads(res_hist.content)["results"]
        weather = pd.DataFrame.from_dict(content)
        weather_data_list.append(weather)
    weather_data = pd.concat(weather_data_list).reset_index(drop=True)
    building_metadata = pd.read_csv(building_metadata_path)
    building_metadata["building_id"] = building_metadata["building_id"].astype(int)
    
    site_buildings = building_metadata[["site_id", "building_id"]].reset_index(drop=True).copy()
    print("Table Sites and Buildings")
    display(site_buildings)
    meter_data = meter_data[["timestamp", "building_id", "meter", "meter_reading"]].reset_index(drop=True).copy()
    meter_data = meter_data.merge(site_buildings, how="left", on=["building_id"])
    print("Table Meter Data")
    display(meter_data)
    print("Table Weather Data")
    display(weather_data)
    building_metadata = building_metadata.drop(columns=["site_id"]).reset_index(drop=True).copy()
    building_metadata = building_metadata.merge(site_buildings, how="left", on=["building_id"])
    print("Table Building Metadata")
    display(building_metadata)
    table_names_dfs = [["sites_buildings", site_buildings], ["meter_data",  meter_data], ["weather_data", weather_data], ["building_metadata", building_metadata], ]

    engine = create_engine(URL)
    for [table_name, df] in table_names_dfs:    
        with engine.begin() as connection:
            df.to_sql(name=table_name, con=connection, if_exists='replace', index=False)
    return [table_name for table_name, _ in table_names_dfs]

In [133]:
table_names = extract_and_load(path)

Table Sites and Buildings


Unnamed: 0,site_id,building_id
0,0,0
1,0,1
2,0,2
3,0,3
4,0,4
...,...,...
1444,15,1444
1445,15,1445
1446,15,1446
1447,15,1447


Table Meter Data


Unnamed: 0,timestamp,building_id,meter,meter_reading,site_id
0,2016-01-01 00:00:00,0,0,0.000,0
1,2016-01-01 00:00:00,1,0,0.000,0
2,2016-01-01 00:00:00,1,1,,0
3,2016-01-01 00:00:00,1,2,,0
4,2016-01-01 00:00:00,1,3,,0
...,...,...,...,...,...
340522,2016-09-30 23:00:00,8,0,332.748,0
340523,2016-09-30 23:00:00,9,0,147.433,0
340524,2016-09-30 23:00:00,9,1,886.418,0
340525,2016-09-30 23:00:00,9,2,,0


Table Weather Data


Unnamed: 0,site_id,timestamp,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed
0,0,2016-01-01 00:00:00,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
1,0,2016-01-01 01:00:00,24.4,-9999.0,21.1,-1.0,1020.2,70.0,1.5
2,0,2016-01-01 02:00:00,22.8,2.0,21.1,0.0,1020.2,0.0,0.0
3,0,2016-01-01 03:00:00,21.1,2.0,20.6,0.0,1020.1,0.0,0.0
4,0,2016-01-01 04:00:00,20.0,2.0,20.0,-1.0,1020.0,250.0,2.6
...,...,...,...,...,...,...,...,...,...
6571,0,2016-09-30 19:00:00,31.7,-9999.0,21.7,0.0,1014.9,180.0,3.6
6572,0,2016-09-30 20:00:00,32.2,-9999.0,22.2,0.0,1014.3,170.0,4.6
6573,0,2016-09-30 21:00:00,31.1,-9999.0,21.7,0.0,1013.7,170.0,2.6
6574,0,2016-09-30 22:00:00,23.0,-9999.0,22.0,236.0,1015.4,160.0,4.6


Table Building Metadata


Unnamed: 0,building_id,primary_use,square_feet,year_built,floor_count,site_id
0,0,Education,7432,2008.0,,0
1,1,Education,2720,2004.0,,0
2,2,Education,5376,1991.0,,0
3,3,Education,23685,2002.0,,0
4,4,Education,116607,1975.0,,0
...,...,...,...,...,...,...
1444,1444,Entertainment/public assembly,19619,1914.0,,15
1445,1445,Education,4298,,,15
1446,1446,Entertainment/public assembly,11265,1997.0,,15
1447,1447,Lodging/residential,29775,2001.0,,15


## Transform

Then all transformation steps for machine learning is applied (usually during the MLOPs pipeline).

In [134]:
def transform(table_names):
    engine = create_engine(URL)
    table_names_dfs = []
    for table_name in table_names:    
        with engine.begin() as connection:
            df = pd.read_sql_table(table_name, con=connection)
            table_names_dfs.append([table_name, df])
    table_names_dfs = dict(table_names_dfs)

    #table_names_dfs["building_metadata"] = table_names_dfs["building_metadata"].merge(table_names_dfs["sites_buildings"], on=["building_id", ]).reset_index(drop=True).copy()
    #table_names_dfs["meter_data"] = table_names_dfs["meter_data"].merge(table_names_dfs["sites_buildings"], on=["building_id", ]).reset_index(drop=True).copy()
    
    all_df = table_names_dfs["meter_data"].merge(table_names_dfs["building_metadata"], on=["site_id", "building_id", ]).merge(table_names_dfs["weather_data"], on=["site_id", "timestamp"])
    all_df = all_df.reset_index(drop=True).copy()
    return all_df

In [135]:
transform(table_names)

Unnamed: 0,timestamp,building_id,meter,meter_reading,site_id,primary_use,square_feet,year_built,floor_count,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed
0,2016-01-01 00:00:00,0,0,0.000,0,Education,7432,2008.0,,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
1,2016-01-01 00:00:00,1,0,0.000,0,Education,2720,2004.0,,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
2,2016-01-01 00:00:00,1,1,,0,Education,2720,2004.0,,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
3,2016-01-01 00:00:00,1,2,,0,Education,2720,2004.0,,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
4,2016-01-01 00:00:00,1,3,,0,Education,2720,2004.0,,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
340522,2016-09-30 23:00:00,8,0,332.748,0,Education,60809,2003.0,,23.9,-9999.0,20.6,3.0,1014.8,0.0,0.0
340523,2016-09-30 23:00:00,9,0,147.433,0,Office,27000,2010.0,,23.9,-9999.0,20.6,3.0,1014.8,0.0,0.0
340524,2016-09-30 23:00:00,9,1,886.418,0,Office,27000,2010.0,,23.9,-9999.0,20.6,3.0,1014.8,0.0,0.0
340525,2016-09-30 23:00:00,9,2,,0,Office,27000,2010.0,,23.9,-9999.0,20.6,3.0,1014.8,0.0,0.0


## ETL

In [136]:
import os
from os import walk
import sys
import logging
import time
import datetime
import json
import requests
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from sqlalchemy import create_engine

URL = f"sqlite:///ETL.db"
with open("ETL.db", "w") as f:
    pass

path = "dso_data"

### Extract and Transform

In ETL, we extract and directly transform the data into a machine learning ready format.

In [137]:
def extract_and_transform(path):
    building_metadata = None
    building_metadata_path = ""
    meter_data = None
    weather_data = None

    file_paths = []
    for (dirpath, dirnames, filenames) in walk(path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            file_paths.append(file_path)
    file_paths = [file_path for file_path in file_paths if ".DS_Store" not in file_path and ".ipynb_checkpoints" not in file_path]
    
    pop_index = None
    if 'dso_data/building_metadata.csv' in file_paths:
        pop_index = file_paths.index('dso_data/building_metadata.csv')
    if pop_index != None:
        building_metadata_path = file_paths.pop(pop_index)

    meter_data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
    meter_data_list = []
    for file_path in file_paths:
        data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
        if file_path.endswith(".csv") and "meter_reading" in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    row = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    row.remove("meter_reading")
                    data[["site_id", "building_id"]] = row
                    data = data.melt(
                        id_vars=["timestamp", "site_id", "building_id", ], 
                        var_name="meter_id", 
                        value_name="meter_reading",
                    )
                    data["meter_id"] = data["meter_id"].apply(lambda x: int(x.split("_")[-1]))
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
            except EmptyDataError as e:
                print(e)
        elif file_path.endswith(".jsonl"):
            try:
                data = pd.read_json(file_path, lines=True)
                if len(data) > 0:    
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".jsonl", "").split("dso_data/")[-1].split("/")
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
            except Exception as e:
                print(e)
        if len(data) > 0:
            meter_data_list.append(data)
    meter_data = pd.concat(meter_data_list).sort_values(by=["timestamp", "site_id", "building_id", "meter_id"])
    meter_data["meter_id"] = meter_data["meter_id"].astype(int)
    meter_data["building_id"] = meter_data["building_id"].astype(int)
    meter_data = meter_data.rename(columns={"meter_id": "meter"})
    
    # retrieve weather data
    weather_data_list = []
    for site_id in meter_data.site_id.unique():
        res_hist = requests.get("http://131.234.154.105:33100/history/", params={'site': site_id, 'start': meter_data.timestamp.min(), 'end': meter_data.timestamp.max()})
        content = json.loads(res_hist.content)["results"]
        weather = pd.DataFrame.from_dict(content)
        weather_data_list.append(weather)
    weather_data = pd.concat(weather_data_list).reset_index(drop=True)
    building_metadata = pd.read_csv(building_metadata_path)
    
    meter_data["site_id"] = meter_data["site_id"].astype(int)
    weather_data["site_id"] = weather_data["site_id"].astype(int)
    building_metadata["building_id"] = building_metadata["building_id"].astype(int)
    
    # merge all dataframes
    all_df = meter_data.merge(building_metadata, on=["site_id", "building_id", ]).merge(weather_data, on=["site_id", "timestamp"])
    all_df = all_df.reset_index(drop=True).copy() # now ready for ML
    return [["ml_ready", all_df]]

In [138]:
tables_dfs = extract_and_transform(path)

### Load

Afterward, we load the machine learning ready data into our database.

In [139]:
def load(table_names_dfs):
    engine = create_engine(URL)
    for [table_name, df] in table_names_dfs:    
        with engine.begin() as connection:
            df.to_sql(name=table_name, con=connection, if_exists='replace')

In [140]:
load(tables_dfs)

## Reality

In reality, we would run a data warehouse not only for machine learning. Therefore, we would use a data warehouse to store the data into a harmonized structure and then load it into an additional machine learning pipeline. Independent of ETL or ELT, there would be a data warehouse not developed for ML as a sub stage.

Now let's continue with the watchdogs.