# Data Pipelines

In [25]:
import pandas as pd 
import time
from pandas.tseries.holiday import USFederalHolidayCalendar
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder

# Raw Data

In [26]:
raw_data = pd.read_csv("data/slrpEV11052020-09222022.csv")
raw_data.head()

Unnamed: 0,dcosId,userId,vehicle_model,vehicle_maxChgRate_W,siteId,stationId,connectTime,startChargeTime,Deadline,energyReq_Wh,...,sch_centsPerOverstayHr,Duration,DurationHrs,choice,regular,scheduled,cumEnergy_Wh,peakPower_W,power,lastUpdate
0,24,605,500e,6600,23,7,2020-11-05T10:30:16,2020-11-05T10:31:09,,,...,200.0,0 days 03:43:57,3.73249,REGULAR,1,0,3281.0,6335,"[{'power_W': Decimal('6259'), 'timestamp': Dec...",2020-11-05T14:15:06
1,26,486,Model 3,24000,23,3,2020-11-11T07:39:55,2020-11-11T07:39:59,,,...,200.0,0 days 06:50:07,6.83527,REGULAR,1,0,33458.0,7005,"[{'power_W': Decimal('0'), 'timestamp': Decima...",2020-11-11T14:30:06
2,30,620,Volt,3600,25,12,2020-11-13T16:19:55,2020-11-13T16:20:06,2020-11-14T04:15:00,18400.0,...,300.0,0 days 20:40:02,20.66722,SCHEDULED,0,1,15216.0,3450,"[{'power_W': Decimal('0'), 'timestamp': Decima...",2020-11-14T13:00:08
3,31,618,Bolt EV,7200,23,6,2020-11-14T23:47:06,2020-11-14T23:47:16,,,...,400.0,0 days 02:12:51,2.21416,REGULAR,1,0,14378.0,6889,"[{'power_W': Decimal('6889'), 'timestamp': Dec...",2020-11-15T02:00:07
4,32,623,B-Class Electric Drive,6000,23,9,2020-11-16T11:38:44,2020-11-16T11:42:22,,,...,,0 days 03:12:45,3.21249,REGULAR,1,0,12484.0,6852,"[{'power_W': Decimal('6813'), 'timestamp': Dec...",2020-11-16T14:55:07


## Utility Functions

In [64]:
def round_format_UNIX_time(time_value):
    """
    Function takes in a UNIX time value, and rounds this value up 5 minutes.
    The time value is then converted into the format 'year-month-day hour:minute'.
    """
    
    # round time up 5 minutes 
    time_value = time_value // (5 * 60) * (5*60) + (5*60)
    
    # format time value
    time_value = time.strftime('%Y-%m-%d %H:%M', time.localtime(time_value))
    
    return time_value 

def ohe_day_name(dataframe):
    """
    Function takes in dataframe with a datetime like index, and returns a new dataframe with a
    column 'day', which consists of the name of the day of the week, and one-hot encodes
    the day of the week, with column names "Monday", "Tuesday", "Wednesday", etc.
    """
    dataframe["day"] = dataframe.index.day_name()
    oh_enc = OneHotEncoder()
    temp = pd.DataFrame(oh_enc.fit_transform(dataframe[["day"]]).toarray(), columns = oh_enc.get_feature_names_out(), index = dataframe.index)
    temp.rename(columns={"day_Friday":"Friday", "day_Monday":"Monday","day_Saturday":"Saturday",
                        "day_Sunday":"Sunday","day_Thursday":"Thursday","day_Tuesday":"Tuesday",
                        "day_Wednesday":"Wednesday"},
                inplace = True)
    return pd.concat([dataframe , temp], axis = 1)

def ohe_federal_holiday(dataframe):
    """
    Function takes in a dataframe with a datetime like index, and returns a new dataframe with a column "Federal Holiday" added,
    which is one-hot encoded if the date is a federal holiday. 
    """
    copy = dataframe.copy(deep=True)
    calendar = USFederalHolidayCalendar()
    holidays = calendar.holidays(start=min(dataframe.index), end=max(dataframe.index))
    # round to midnight, compare to holidays index, cast Boolean values to binary
    copy["Federal Holiday"] = dataframe.index.normalize().isin(holidays).astype(int)
    return copy 

def resample_df(df, granularity):
    """
    Function takes in a dataframe with columns "power_demand", "energy_demand",
    "peak_power", and "day". Given a pd.resample()-like resample code (ex. "5min", "1H", "24H", "1M", etc.),
    returns the resampled dataframe where "power_demand" is aggregated by mean, "energy_demand" is aggregated by sum, and
    "peak_power" is aggregated by max. Columns are also renamed by aggregate function. 
    """
    resample_code = {
        "power_demand": "mean",
        "energy_demand": "sum",
        "peak_power_demand": "max",
        "day": "first"
    }
    
    df = df.resample(granularity).agg(resample_code)
    
    df.rename(
        columns={
        "power_demand": "mean_power_demand",
        "energy_demand": "total_energy_demand",
        },
        inplace=True
    )
    
    return df 

## Data Cleaning Pipelines

In [42]:
class CleanSession(BaseEstimator, TransformerMixin):

    pattern = r"(\[?\{'power_W':\sDecimal\(')|('timestamp':\sDecimal\(')|('\)\}?\]?)"

    def fit(self, X, y=None):
        return self
    
    def transform(self, X):
    
        # pattern match, remove pattern instances, cast to int
        power_and_time = X["power"].str.replace(self.pattern, "", regex=True)
        power_and_time = power_and_time.str.split(', ')
        power_and_time = power_and_time.apply(lambda lst: [int(val) for val in lst])

        # extract power and time values, unnest data, round time values to clean 5-minute intervals
        power_vals = power_and_time.apply(lambda x: x[::2]).explode()
        time_vals = power_and_time.apply(lambda x: x[1::2]).explode().apply(round_format_UNIX_time)

        # create df w/ time and power
        temp = pd.DataFrame({"time": time_vals, "power_demand": power_vals})

        # join w/ original dataframe
        return X.join(temp)

In [43]:
class ExtractUpsampleGroupby5Min(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):

        # extract time and power fields, group and sort
        new_X = X[["time", "power_demand"]]
        new_X = new_X.groupby("time").sum()
        new_X = new_X.sort_values(by="time")
        
        # upsample to 5-min bins, will impute 0 for missing times
        new_X.index = pd.to_datetime(new_X.index)
        new_X = new_X.resample("5min").sum()
        
        return new_X

In [44]:
class CreateDayName(BaseEstimator, TransformerMixin):
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        new_X = X.copy(deep=True)
        
        new_X.index = pd.to_datetime(new_X.index)
        new_X["day"] = new_X.index.day_name()
        
        return new_X

In [45]:
class OHEDaysAndHolidays(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        
        new_X = ohe_federal_holiday(X)
        new_X = ohe_day_name(new_X)
        
        return new_X

In [46]:
class CreateEnergyAndPeakPowerDemand(BaseEstimator, TransformerMixin):

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        
        new_X = X.copy(deep=True)
        
        new_X["energy_demand"] = new_X["power_demand"]/1000 # convert to kW
        new_X["energy_demand"] = new_X["energy_demand"]/12 # convert to kWh
        
        new_X["peak_power_demand"] = new_X["power_demand"] # for 5-minute granularities, peak power equal to power demand
        return new_X

## Run Pipeline

In [59]:
# create pipelines
pipe = Pipeline([
    ("clean_session", CleanSession()),
    ("clean_time", ExtractUpsampleGroupby5Min()),
    ("day_name", CreateDayName()),
    ("create_energy_&_peak_power", CreateEnergyAndPeakPowerDemand())
])

In [60]:
# create 5-min demand
fivemindemand = pipe.fit_transform(raw_data)
fivemindemand.head()

Unnamed: 0_level_0,power_demand,day,energy_demand,peak_power_demand
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2020-11-05 10:40:00,6259,Thursday,0.521583,6259
2020-11-05 10:45:00,6269,Thursday,0.522417,6269
2020-11-05 10:50:00,6298,Thursday,0.524833,6298
2020-11-05 10:55:00,6318,Thursday,0.5265,6318
2020-11-05 11:00:00,6335,Thursday,0.527917,6335


In [67]:
# create different granularities
hourlydemand = resample_df(fivemindemand, "1H",)
dailydemand = resample_df(fivemindemand, "24H") 
monthlydemand = resample_df(fivemindemand, "1M")

In [68]:
# save to csv
fivemindemand.to_csv("data/5mindemand.csv")
hourlydemand.to_csv("data/hourlydemand.csv")
dailydemand.to_csv("data/dailydemand.csv")
monthlydemand.to_csv("data/monthlydemand.csv")