Testing Pipeline

In [50]:
import pandas as pd
from sklearn.pipeline import Pipeline
from sklearn.neighbors import KNeighborsRegressor

In [54]:
from dotenv import load_dotenv
import os 
import pandas as pd
import boto3
from boto3.dynamodb.conditions import Key, Attr

class FetchData:

    def __init__(self, table_id):
        self.__scan_results = []
        self.raw_data = None
        self.table_id = table_id
        self.__table = self.__get_table(self.table_id)

    def __configure(self):
        load_dotenv()

    def __get_table(self, table_id):
        self.__configure()
        dynamodb = boto3.resource(
            "dynamodb",
            aws_access_key_id = os.getenv("access_key_id"),
            aws_secret_access_key = os.getenv("secret_access_key"),
            region_name="us-east-2",
        )
        return dynamodb.Table(table_id)

    # TODO: Only query for records I don't have. Currently don't know how, so I'm scanning for all records.
    def scan_save_all_records(self):
        done = False
        start_key = None
        params = {}

        while not done:
            if start_key is not None:
                params = {"ExclusiveStartKey": start_key}
                
            response = self.__table.scan(**params)
            self.__scan_results.extend(response.get("Items", []))

            start_key = response.get("LastEvaluatedKey", None)
            done = start_key is None

        self.raw_data = pd.json_normalize(self.__scan_results)
        self.raw_data.to_csv("data/raw_data.csv")

In [69]:
class CleanData:

    def __init__(self):
        self.__add_pipeline()
        self.__raw_data = pd.read_csv("data/raw_data.csv")

    def __add_pipeline(self):
        self.__pipeline = Pipeline([
            ("sort_drop_cast", SortDropCast()),
            ("create_helpers", HelperFeatureCreation()),
            ("create_session_TS", CreateSessionTS()),
            ("create_features", FeatureCreation()),
            ("save_to_csv", SaveToCsv()),
        ])

    def clean_raw_data(self):
        self.cleaned_data = self.__pipeline.fit_transform(self.__raw_data)
        return self.cleaned_data

In [70]:
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd 

class SortDropCast(BaseEstimator, TransformerMixin):
    """
    This pipeline step will sort values by field "connectTime",
    drop columns "user_email", "slrpPaymentId", 
    and cast columns "cumEnergy_Wh", "peakPower_W" as float values. 
    """
    def fit(self, X, y=None):
        return self

    def transform(self, X) -> pd.DataFrame:
        X = X.sort_values(by="connectTime").drop(columns=["user_email", "slrpPaymentId"]).reset_index(drop=True)
        X["cumEnergy_Wh"] = X["cumEnergy_Wh"].astype(float)
        X["peakPower_W"] = X["peakPower_W"].astype(float)
        return X

class HelperFeatureCreation(BaseEstimator, TransformerMixin):
    """
    This pipeline step will drop any records that contain 0 for 
    "peakPower_W" or "cumEnergy_Wh". Two additional columns will be created:
    "reqChargeTime" and "finishChargeTime".
    """
    def fit(self, X, y=None):
        return self

    def transform(self, X) -> pd.DataFrame:
        X = X.loc[(X["peakPower_W"] != 0) & (X["cumEnergy_Wh"] != 0)]
        X = X.assign(reqChargeTime_h = (X["cumEnergy_Wh"] / X["peakPower_W"]))
        X = X.assign(connectTime = (pd.to_datetime(X["connectTime"])))
        X = X.assign(finishChargeTime = (X["connectTime"] + pd.to_timedelta(X['reqChargeTime_h'], unit='hours').round("s")))
        return X 

class CreateSessionTS(BaseEstimator, TransformerMixin):
    """
    This pipeline step will create a time series for each session. A dataframe
    with 5-min granularity will be returned, with one column, "power_demand_W".
    """
    def __createTS(self, session):
        """
        This helper function takes in a session, with a "connectTime", "finishChargeTime", and 
        a "peakPower_W" column. Function will return a time series at 5-min granularity. 
        """
        date_range = pd.date_range(start=session["connectTime"], end=session["finishChargeTime"], freq="5min")
        temp_df = pd.DataFrame(index=date_range)
        temp_df["power_demand_W"] = session["peakPower_W"] # rename 
        self.rows.append(temp_df)   

    def __init__(self) -> None:
        self.rows = []
        super().__init__()
    
    def fit(self, X, y=None):
        return self 

    def transform(self, X) -> pd.DataFrame:
        X.apply(self.__createTS, axis=1)
        X = pd.concat(self.rows, axis=0).sort_index()
        X = X.resample("5min").sum()
        return X 

class FeatureCreation(BaseEstimator, TransformerMixin):
    """
    This pipeline step will create an "energy_demand_kWh" and "peak_power_W" column. 
    The name of the dataframe's index will be set to "time", and a "day" column will 
    be created with the day of the week at each timestamp. 
    """
    def fit(self, X, y=None):
        return self 

    def transform(self, X) -> pd.DataFrame:
        X["energy_demand_kWh"] = (X["power_demand_W"]/1000)/12
        # for the highest granularity, peak power is equal to the power demand (different for different granularities though)
        X["peak_power_W"] = X["power_demand_W"] 
        X.index.name = "time"
        X["day"] = X.index.day_name()
        return X

class SaveToCsv(BaseEstimator, TransformerMixin):
    """
    This pipeline step takes each dataframe and creates new granularities
    (hourly, daily, and monthly). Each dataframe is saved to a "data/" file. 
    """
    def __init__(self) -> None:
        self.agg_key = {
            "power_demand_W": "mean",
            "energy_demand_kWh": "sum",
            "peak_power_W": "max",
            "day": "first"
        }
        self.dataframe_names = [
            "fivemindemand", 
            "hourlydemand", 
            "dailydemand", 
            "monthlydemand"
        ]
        self.new_dataframes = []
        super().__init__()

    def fit(self, X, y=None):
        hourlydemand = X.resample("1H").agg(self.agg_key).rename(columns={"power_demand_W":"avg_power_demand_W"})
        dailydemand = X.resample("24H").agg(self.agg_key).rename(columns={"power_demand_W":"avg_power_demand_W"})
        monthlydemand = X.resample("1M").agg(self.agg_key).rename(columns={"power_demand_W":"avg_power_demand_W"})
        self.new_dataframes.extend([X, hourlydemand, dailydemand, monthlydemand])
        return self

    def transform(self, X):
        for idx, dataframe in enumerate(self.new_dataframes):
            dataframe.to_csv(f"data/{self.dataframe_names[idx]}.csv")
        return X 

In [90]:
data_grabber = FetchData("Sessions2")
data_grabber.scan_save_all_records()
data_cleaner = CleanData()
cleaned = data_cleaner.clean_raw_data()

df = pd.read_csv("data/raw_data.csv")

# # pipeline
# pipe = Pipeline([
#         ("1", SortDropCast()),
#         ("2", HelperFeatureCreation()),
#         ("3", CreateSessionTS()),
#         ("4", FeatureCreation()),
#         ("5", SaveToCsv()),
# ])

# # preprocess data
# cleaned = pipe.fit_transform(df)
cleaned

Unnamed: 0_level_0,power_demand_W,energy_demand_kWh,peak_power_W,day
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-11-05 10:30:00,6335.0,0.527917,6335.0,Thursday
2020-11-05 10:35:00,6335.0,0.527917,6335.0,Thursday
2020-11-05 10:40:00,6335.0,0.527917,6335.0,Thursday
2020-11-05 10:45:00,6335.0,0.527917,6335.0,Thursday
2020-11-05 10:50:00,6335.0,0.527917,6335.0,Thursday
...,...,...,...,...
2023-01-07 23:55:00,3333.0,0.277750,3333.0,Saturday
2023-01-08 00:00:00,3333.0,0.277750,3333.0,Sunday
2023-01-08 00:05:00,3333.0,0.277750,3333.0,Sunday
2023-01-08 00:10:00,3333.0,0.277750,3333.0,Sunday


In [89]:
pd.read_csv("data/raw_data.csv").sort_values(by="connectTime").tail(50)["power"].iloc[-17]

"[{'power_W': Decimal('6214'), 'timestamp': Decimal('1672878308')}]"