# ETL or ELT

This file is intended to run the ETL or ELT pipeline once. It is only to understand the extrations, transformation and load.

In [1]:
import os
from os import walk
import sys
import logging
import time
import datetime
import json
import requests
import numpy as np
import pandas as pd
from pandas.errors import EmptyDataError
from sqlalchemy import create_engine
USER = ""
PASSWORD = ""
URL = f"mysql+pymysql://{USER}:{PASSWORD}@localhost/etl?charset=utf8mb4"

# ETL

In [2]:
def extract(path):
    building_metadata = None
    building_metadata_path = ""
    meter_data = None
    weather_data = None

    file_paths = []
    for (dirpath, dirnames, filenames) in walk(path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            file_paths.append(file_path)
    file_paths = [file_path for file_path in file_paths if ".DS_Store" not in file_path and ".ipynb_checkpoints" not in file_path]
    
    pop_index = None
    if 'dso_data/building_metadata.csv' in file_paths:
        pop_index = file_paths.index('dso_data/building_metadata.csv')
    if pop_index != None:
        building_metadata_path = file_paths.pop(pop_index)
    

    meter_data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
    meter_data_list = []
    for file_path in file_paths:
        data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
        if file_path.endswith(".csv") and "meter_reading" not in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except EmptyDataError as e:
                print(e)
        elif file_path.endswith(".csv") and "meter_reading" in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    row = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    row.remove("meter_reading")
                    data[["site_id", "building_id"]] = row
                    #dfs = []
                    #for meter_id in ["meter_0", "meter_1", "meter_2", "meter_3", ]:
                    #    df = data[["timestamp", "site_id", "building_id", meter_id]].copy()
                    #    df.columns = ["timestamp", "site_id", "building_id", "meter_reading"]
                    #    df["meter_id"] = meter_id
                    #    dfs.append(df)
                    #data = pd.concat(dfs)
                    data = data.melt(
                        id_vars=["timestamp", "site_id", "building_id", ], 
                        var_name="meter_id", 
                        value_name="meter_reading",
                    )
                    data["meter_id"] = data["meter_id"].apply(lambda x: int(x.split("_")[-1]))
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except EmptyDataError as e:
                print(e)
        elif file_path.endswith(".json"):
            try:
                data = pd.read_json(file_path, orient="index")
                if len(data) > 0:    
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".json", "").split("dso_data/")[-1].split("/")
                    data = data.reset_index()
                    data = data.rename(columns={"index": "timestamp"})
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except Exception as e:
                print(e)
        if len(data) > 0:
            meter_data_list.append(data)
            #print("#" * 25)
    meter_data = pd.concat(meter_data_list).sort_values(by=["timestamp", "site_id", "building_id", "meter_id"])
    meter_data["meter_id"] = meter_data["meter_id"].astype(int)
    meter_data["building_id"] = meter_data["building_id"].astype(int)
    meter_data = meter_data.rename(columns={"meter_id": "meter"})
    weather_data_list = []
    for site_id in meter_data.site_id.unique():
        res_hist = requests.get("https://weatherfakeapi.de/history/", params={'site': site_id, 'start': meter_data.timestamp.min(), 'end': meter_data.timestamp.max()})
        content = json.loads(res_hist.content)["results"]
        weather = pd.DataFrame.from_dict(content)
        weather_data_list.append(weather)
    weather_data = pd.concat(weather_data_list).reset_index(drop=True)
    building_metadata = pd.read_csv(building_metadata_path)
    building_metadata["building_id"] = building_metadata["building_id"].astype(int)
    return meter_data, weather_data, building_metadata

In [3]:
def transform_first(meter_data, weather_data, building_metadata):
    #display(meter_data)
    site_buildings = building_metadata[["site_id", "building_id"]].reset_index(drop=True).copy()
    print("Table Sites and Buildings")
    display(site_buildings)
    meter_data = meter_data[["timestamp", "building_id", "meter", "meter_reading"]].reset_index(drop=True).copy()
    print("Table Meter Data")
    display(meter_data)
    print("Table Weather Data")
    display(weather_data)
    building_metadata = building_metadata.drop(columns=["site_id"]).reset_index(drop=True).copy()
    print("Table Building Metadata")
    display(building_metadata)
    return [["sites_buildings", site_buildings], ["meter_data",  meter_data[meter_data["timestamp"] >= "2016-09-30 00:00:00"]], ["weather_data", weather_data[weather_data["timestamp"] >= "2016-09-30 00:00:00"]], ["building_metadata", building_metadata], ]

In [4]:
def load(table_names_dfs):
    engine = create_engine("mysql+pymysql://root:Schlaujamin-sbk94@localhost/etl?charset=utf8mb4")
    for [table_name, df] in table_names_dfs:    
        with engine.begin() as connection:
            df.to_sql(name=table_name, con=connection, if_exists='replace', index=False)

In [5]:
path = "dso_data"
meter_data, weather_data, building_metadata = extract(path)

In [6]:
tables_dfs = transform_first(meter_data, weather_data, building_metadata)

Table Sites and Buildings


Unnamed: 0,site_id,building_id
0,0,0
1,0,1
2,0,2
3,0,3
4,0,4
...,...,...
1444,15,1444
1445,15,1445
1446,15,1446
1447,15,1447


Table Meter Data


Unnamed: 0,timestamp,building_id,meter,meter_reading
0,2016-01-01 00:00:00,0,0,0.000
1,2016-01-01 00:00:00,1,0,0.000
2,2016-01-01 00:00:00,1,1,
3,2016-01-01 00:00:00,1,2,
4,2016-01-01 00:00:00,1,3,
...,...,...,...,...
25508946,2016-09-30 23:00:00,996,2,0.000
25508947,2016-09-30 23:00:00,997,0,40.000
25508948,2016-09-30 23:00:00,997,1,69.915
25508949,2016-09-30 23:00:00,997,2,57.950


Table Weather Data


Unnamed: 0,site_id,timestamp,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed
0,0,2016-01-01 00:00:00,25.0,6.0,20.0,-9999.0,1019.7,0.0,0.0
1,0,2016-01-01 01:00:00,24.4,-9999.0,21.1,-1.0,1020.2,70.0,1.5
2,0,2016-01-01 02:00:00,22.8,2.0,21.1,0.0,1020.2,0.0,0.0
3,0,2016-01-01 03:00:00,21.1,2.0,20.6,0.0,1020.1,0.0,0.0
4,0,2016-01-01 04:00:00,20.0,2.0,20.0,-1.0,1020.0,250.0,2.6
...,...,...,...,...,...,...,...,...,...
104554,9,2016-09-30 19:00:00,26.1,0.0,10.6,0.0,1021.0,0.0,0.0
104555,9,2016-09-30 20:00:00,26.7,0.0,10.6,0.0,1020.0,-9999.0,3.1
104556,9,2016-09-30 21:00:00,26.7,0.0,11.1,0.0,1019.1,0.0,0.0
104557,9,2016-09-30 22:00:00,27.8,0.0,11.1,0.0,1018.3,0.0,0.0


Table Building Metadata


Unnamed: 0,building_id,primary_use,square_feet,year_built,floor_count
0,0,Education,7432,2008.0,
1,1,Education,2720,2004.0,
2,2,Education,5376,1991.0,
3,3,Education,23685,2002.0,
4,4,Education,116607,1975.0,
...,...,...,...,...,...
1444,1444,Entertainment/public assembly,19619,1914.0,
1445,1445,Education,4298,,
1446,1446,Entertainment/public assembly,11265,1997.0,
1447,1447,Lodging/residential,29775,2001.0,


In [7]:
load(tables_dfs)

# ELT

In [8]:
def extract(path):
    building_metadata = None
    building_metadata_path = ""
    meter_data = None
    weather_data = None

    file_paths = []
    for (dirpath, dirnames, filenames) in walk(path):
        for filename in filenames:
            file_path = os.path.join(dirpath, filename)
            file_paths.append(file_path)
    file_paths = [file_path for file_path in file_paths if ".DS_Store" not in file_path and ".ipynb_checkpoints" not in file_path]
    
    pop_index = None
    if 'dso_data/building_metadata.csv' in file_paths:
        pop_index = file_paths.index('dso_data/building_metadata.csv')
    if pop_index != None:
        building_metadata_path = file_paths.pop(pop_index)
    

    meter_data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
    meter_data_list = []
    for file_path in file_paths:
        data = pd.DataFrame(columns=["timestamp", "site_id", "building_id", "meter_id" ,"meter_reading"])
        if file_path.endswith(".csv") and "meter_reading" not in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except EmptyDataError as e:
                print(e)
        elif file_path.endswith(".csv") and "meter_reading" in file_path:
            try:
                data = pd.read_csv(file_path)
                if len(data) > 0: 
                    row = file_path.replace(".csv", "").split("dso_data/")[-1].split("/")
                    row.remove("meter_reading")
                    data[["site_id", "building_id"]] = row
                    #dfs = []
                    #for meter_id in ["meter_0", "meter_1", "meter_2", "meter_3", ]:
                    #    df = data[["timestamp", "site_id", "building_id", meter_id]].copy()
                    #    df.columns = ["timestamp", "site_id", "building_id", "meter_reading"]
                    #    df["meter_id"] = meter_id
                    #    dfs.append(df)
                    #data = pd.concat(dfs)
                    data = data.melt(
                        id_vars=["timestamp", "site_id", "building_id", ], 
                        var_name="meter_id", 
                        value_name="meter_reading",
                    )
                    data["meter_id"] = data["meter_id"].apply(lambda x: int(x.split("_")[-1]))
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except EmptyDataError as e:
                print(e)
        elif file_path.endswith(".json"):
            try:
                data = pd.read_json(file_path, orient="index")
                if len(data) > 0:    
                    data[["site_id", "building_id", "meter_id"]] = file_path.replace(".json", "").split("dso_data/")[-1].split("/")
                    data = data.reset_index()
                    data = data.rename(columns={"index": "timestamp"})
                    data = data[["timestamp", "site_id", "building_id", "meter_id", "meter_reading",]].copy()
                    data.timestamp = data.timestamp.astype(str)
                #display(data)
            except Exception as e:
                print(e)
        if len(data) > 0:
            meter_data_list.append(data)
            #print("#" * 25)
    meter_data = pd.concat(meter_data_list).sort_values(by=["timestamp", "site_id", "building_id", "meter_id"])
    meter_data["meter_id"] = meter_data["meter_id"].astype(int)
    meter_data["building_id"] = meter_data["building_id"].astype(int)
    meter_data = meter_data.rename(columns={"meter_id": "meter"})
    weather_data_list = []
    for site_id in meter_data.site_id.unique():
        res_hist = requests.get("https://weatherfakeapi.de/history/", params={'site': site_id, 'start': meter_data.timestamp.min(), 'end': meter_data.timestamp.max()})
        content = json.loads(res_hist.content)["results"]
        weather = pd.DataFrame.from_dict(content)
        weather_data_list.append(weather)
    weather_data = pd.concat(weather_data_list).reset_index(drop=True)
    building_metadata = pd.read_csv(building_metadata_path)
    meter_data["site_id"] = meter_data["site_id"].astype(int)
    weather_data["site_id"] = weather_data["site_id"].astype(int)
    building_metadata["building_id"] = building_metadata["building_id"].astype(int)
    all_df = meter_data.merge(building_metadata, on=["site_id", "building_id", ]).merge(weather_data, on=["site_id", "timestamp"])
    all_df = all_df[all_df["timestamp"] >= "2016-09-30 00:00:00"].reset_index(drop=True).copy()
    return [["ml_ready", all_df]]

In [9]:
def load_first(table_names_dfs):
    engine = create_engine(URL)
    for [table_name, df] in table_names_dfs:    
        with engine.begin() as connection:
            df.to_sql(name=table_name, con=connection, if_exists='replace')

In [10]:
def transform(table_name):
    #display(meter_data)
    engine = create_engine("mysql+pymysql://root:Schlaujamin-sbk94@localhost/etl?charset=utf8mb4")
    all_df = pd.read_sql(f"SELECT * FROM {table_name}", engine)
    site_buildings = all_df[["site_id", "building_id"]].drop_duplicates().reset_index(drop=True).copy()
    print("Table Sites and Buildings")
    display(site_buildings)
    meter_data = all_df[["timestamp", "building_id", "meter", "meter_reading"]].drop_duplicates().reset_index(drop=True).copy()
    print("Table Meter Data")
    display(meter_data)
    print("Table Weather Data")
    weather_data = all_df[["timestamp", "site_id", "air_temperature", "cloud_coverage", "dew_temperature", "precip_depth_1_hr", "sea_level_pressure", "wind_direction", "wind_speed", ]].drop_duplicates().reset_index(drop=True).copy()
    display(weather_data)
    building_metadata = all_df[["building_id", "primary_use", "square_feet", "year_built", "floor_count", ]].drop_duplicates().reset_index(drop=True).copy()
    print("Table Building Metadata")
    display(building_metadata)

In [11]:
path = "dso_data"
tables_dfs = extract(path)

In [12]:
load_first(tables_dfs)

In [13]:
transform("ml_ready")

Table Sites and Buildings


Unnamed: 0,site_id,building_id
0,0,0
1,0,1
2,0,10
3,0,100
4,0,101
...,...,...
1421,9,993
1422,9,994
1423,9,995
1424,9,996


Table Meter Data


Unnamed: 0,timestamp,building_id,meter,meter_reading
0,2016-09-30 00:00:00,0,0,249.135
1,2016-09-30 00:00:00,1,0,136.785
2,2016-09-30 00:00:00,1,1,
3,2016-09-30 00:00:00,1,2,
4,2016-09-30 00:00:00,1,3,
...,...,...,...,...
95668,2016-09-30 23:00:00,996,2,0.000
95669,2016-09-30 23:00:00,997,0,40.000
95670,2016-09-30 23:00:00,997,1,69.915
95671,2016-09-30 23:00:00,997,2,57.950


Table Weather Data


Unnamed: 0,timestamp,site_id,air_temperature,cloud_coverage,dew_temperature,precip_depth_1_hr,sea_level_pressure,wind_direction,wind_speed
0,2016-09-30 00:00:00,0,25.6,6.0,20.6,0.0,1016.9,210.0,4.6
1,2016-09-30 01:00:00,0,24.4,-9999.0,20.6,0.0,1017.1,210.0,2.1
2,2016-09-30 02:00:00,0,24.4,-9999.0,21.1,0.0,1017.4,230.0,2.6
3,2016-09-30 03:00:00,0,23.9,-9999.0,21.7,0.0,1017.8,190.0,2.6
4,2016-09-30 04:00:00,0,23.9,-9999.0,21.7,0.0,1018.0,190.0,1.5
...,...,...,...,...,...,...,...,...,...
379,2016-09-30 19:00:00,9,26.1,0.0,10.6,0.0,1021.0,0.0,0.0
380,2016-09-30 20:00:00,9,26.7,0.0,10.6,0.0,1020.0,-9999.0,3.1
381,2016-09-30 21:00:00,9,26.7,0.0,11.1,0.0,1019.1,0.0,0.0
382,2016-09-30 22:00:00,9,27.8,0.0,11.1,0.0,1018.3,0.0,0.0


Table Building Metadata


Unnamed: 0,building_id,primary_use,square_feet,year_built,floor_count
0,0,Education,7432,2008.0,
1,1,Education,2720,2004.0,
2,10,Entertainment/public assembly,370773,1991.0,
3,100,Lodging/residential,24456,1968.0,
4,101,Office,18860,1986.0,
...,...,...,...,...,...
1421,993,Education,428647,,
1422,994,Education,108392,,
1423,995,Education,46230,,
1424,996,Education,77632,,
