In [1]:
# !pip uninstall -y azureml-opendatasets
# !pip install azureml-opendatasets

In [None]:
from datetime import datetime


start_date = datetime(2012, 1, 1, 0, 0)
end_date = datetime(2017, 8, 12, 23, 59)

start_date, end_date

(datetime.datetime(2012, 1, 1, 0, 0), datetime.datetime(2017, 8, 12, 23, 59))

In [None]:
from datetime import timedelta
from dateutil.relativedelta import relativedelta

import math


r = relativedelta(end_date, start_date)
months = r.years * 12 + r.months + math.floor((r.days + 30)/31)

In [None]:
import pandas as pd
from datetime import date

month_dict = dict(enumerate([date(dt.year,dt.month, 1) for dt in pd.date_range(start=start_date, periods=68, freq='M')]))

In [None]:
lat, long = 40.701, -74.009
lat, long

(40.701, -74.009)

In [None]:
from pandas import read_csv


df = read_csv('./nyc_energy.csv').drop(columns=['precip', 'temp'], axis=1)
df['lat'] = lat
df['long'] = long
df.head(5)

Unnamed: 0,timeStamp,demand,lat,long
0,2012-01-01 00:00:00,4937.5,40.701,-74.009
1,2012-01-01 01:00:00,4752.1,40.701,-74.009
2,2012-01-01 02:00:00,4542.6,40.701,-74.009
3,2012-01-01 03:00:00,4357.7,40.701,-74.009
4,2012-01-01 04:00:00,4275.5,40.701,-74.009


In [None]:
from dateutil import parser


df['new_datetime'] = df['timeStamp'].apply(parser.parse)
raw_columns = list(df.columns)
df.head(5)

Unnamed: 0,timeStamp,demand,lat,long,new_datetime
0,2012-01-01 00:00:00,4937.5,40.701,-74.009,2012-01-01 00:00:00
1,2012-01-01 01:00:00,4752.1,40.701,-74.009,2012-01-01 01:00:00
2,2012-01-01 02:00:00,4542.6,40.701,-74.009,2012-01-01 02:00:00
3,2012-01-01 03:00:00,4357.7,40.701,-74.009,2012-01-01 03:00:00
4,2012-01-01 04:00:00,4275.5,40.701,-74.009,2012-01-01 04:00:00


In [None]:
import os.path
import pandas as pd
import numpy as np
from azureml.opendatasets.accessories.location_data import LatLongColumn
from azureml.opendatasets.accessories.location_time_customer_data \
    import LocationTimeCustomerData
from azureml.opendatasets import NoaaIsdWeather
from azureml.opendatasets.environ import PandasEnv

In [None]:
# !if [ ! -d "./temp" ]; then mkdir temp; fi

In [None]:
import glob
temp_csvs = glob.glob(os.path.join("./temp", "*.csv"))

In [None]:
temp_csvs.sort()

In [None]:
m_dict = {'i_date': [], 'j_date':[]}

i_date = start_date
for m in range(months):
    m_dict['i_date'].append(i_date)
    j_date = i_date + relativedelta(months=1) - timedelta(milliseconds=1)
    i_date += relativedelta(months=1)
    m_dict['j_date'].append(j_date)

In [None]:
len(m_dict['i_date']), len(m_dict['j_date'])

(68, 68)

In [None]:
m_df = pd.DataFrame(m_dict)

In [None]:
len(temp_csvs)

67

In [None]:
if len(temp_csvs)!=0:
    temp_dates = [pd.to_datetime(t[-23:-13]).date() for t in temp_csvs]
    print(temp_dates)
else:
    temp_dates = []

[datetime.date(2012, 1, 1), datetime.date(2012, 2, 1), datetime.date(2012, 3, 1), datetime.date(2012, 4, 1), datetime.date(2012, 5, 1), datetime.date(2012, 6, 1), datetime.date(2012, 7, 1), datetime.date(2012, 8, 1), datetime.date(2012, 9, 1), datetime.date(2012, 10, 1), datetime.date(2012, 11, 1), datetime.date(2012, 12, 1), datetime.date(2013, 1, 1), datetime.date(2013, 2, 1), datetime.date(2013, 3, 1), datetime.date(2013, 4, 1), datetime.date(2013, 5, 1), datetime.date(2013, 6, 1), datetime.date(2013, 7, 1), datetime.date(2013, 8, 1), datetime.date(2013, 9, 1), datetime.date(2013, 10, 1), datetime.date(2013, 11, 1), datetime.date(2013, 12, 1), datetime.date(2014, 1, 1), datetime.date(2014, 2, 1), datetime.date(2014, 3, 1), datetime.date(2014, 4, 1), datetime.date(2014, 5, 1), datetime.date(2014, 6, 1), datetime.date(2014, 7, 1), datetime.date(2014, 8, 1), datetime.date(2014, 9, 1), datetime.date(2014, 10, 1), datetime.date(2014, 11, 1), datetime.date(2014, 12, 1), datetime.date(2015

In [None]:
import pickle

In [None]:
def start_data_enrichment(num1, num2):
    print('[%s] Start enriching...' % datetime.now())
    report_joined = {}
    for id, row in m_df.iloc[num1:num2].iterrows():
        i_date, j_date = row.i_date, row.j_date 
        if row.i_date in temp_dates:
            print(row.i_date)
        else:
            df1 = df[(df['new_datetime'] >= i_date) & (df['new_datetime'] <= j_date)].copy()
            df1['idx'] = list(range(len(df1.index)))
            df1 = df1.set_index('idx')

            energy = LocationTimeCustomerData(
                df1,
                LatLongColumn('lat', 'long'),
                'new_datetime')

            weather = NoaaIsdWeather(
                cols=["temperature", "precipTime", "precipDepth", "snowDepth"],
                start_date=i_date,
                end_date=j_date)

            weather_enricher = weather.get_enricher()
            new_energy, processed_weather = weather_enricher.enrich_customer_data_no_agg(
                customer_data_object=energy,
                location_match_granularity=5, # higher for high join success rate, lower for performance.
                time_round_granularity='hour')
            
            # ---=== Begin of cusomtized aggregation ===---
            
            processed_weather.data['precipDepth'] = processed_weather.data['precipDepth'].apply(
                lambda x: np.nan if x == 9999 else x)
            processed_weather.data['precipTime'] = processed_weather.data['precipTime'].apply(
                lambda x: np.nan if x == 99 else x)

            processed_weather.data['precipDepth/precipTime'] = \
            processed_weather.data[['precipDepth', 'precipTime']].apply(
                lambda x: np.nan if (
                    pd.isna(x[0]) or pd.isna(x[1]) or x[1] == 0.0) else (x[0] / x[1]), axis=1)
            
            aggregations = {
                "temperature": "mean",
                "snowDepth": "mean",
                "precipDepth/precipTime": "mean",
                "precipDepth": "max",
                "precipTime": "max"}
            
            public_rankgroup = processed_weather.id

            public_join_time = [
                s for s in list(processed_weather.data.columns)
                if s.startswith('ds_join_time')][0]
            print(public_join_time)
            customer_rankgroup = weather_enricher.location_selector.customer_rankgroup

            customer_join_time = [
                s for s in list(new_energy.data.columns)
                if s.startswith('customer_join_time')][0]
            
            # processed_weather.data[public_join_time].unique()
            # print(list(processed_weather.data[public_join_time].unique()))
            with open('date_list.pkl', 'wb') as f:
                pickle.dump(list(processed_weather.data[public_join_time].unique()), f) 

            weather_df_grouped = processed_weather.data.groupby(
                by=[public_rankgroup, public_join_time]).agg(aggregations)
            
            joined_dataset = new_energy.data.merge(
                weather_df_grouped,
                left_on=[customer_rankgroup, customer_join_time],
                right_on=[public_rankgroup, public_join_time],
                how='left')

            final_df = joined_dataset[raw_columns + [
                "temperature", "precipTime", "precipDepth", "snowDepth", "precipDepth/precipTime"]]

            report_joined[i_date] = final_df.describe()
            
            # ---=== End of customized aggregation ===---
            
            fn = './temp/nyc_energy_enriched_%s.csv' % i_date
            final_df.to_csv(fn)

In [None]:
start_data_enrichment(0, 5)

[2023-03-06 23:16:51.721686] Start enriching...
2012-01-01 00:00:00
2012-02-01 00:00:00
2012-03-01 00:00:00
2012-04-01 00:00:00
2012-05-01 00:00:00


  if row.i_date in temp_dates:


In [None]:
start_data_enrichment(5, 10)

[2023-03-06 23:16:53.640500] Start enriching...
2012-06-01 00:00:00
2012-07-01 00:00:00
2012-08-01 00:00:00
2012-09-01 00:00:00
2012-10-01 00:00:00


  if row.i_date in temp_dates:


In [None]:
start_data_enrichment(10, None)

[2023-03-06 23:18:56.236738] Start enriching...
2012-11-01 00:00:00
2012-12-01 00:00:00
2013-01-01 00:00:00
2013-02-01 00:00:00
2013-03-01 00:00:00
2013-04-01 00:00:00
2013-05-01 00:00:00
2013-06-01 00:00:00
2013-07-01 00:00:00
2013-08-01 00:00:00
2013-09-01 00:00:00
2013-10-01 00:00:00
2013-11-01 00:00:00
2013-12-01 00:00:00
2014-01-01 00:00:00
2014-02-01 00:00:00
2014-03-01 00:00:00
2014-04-01 00:00:00
2014-05-01 00:00:00
2014-06-01 00:00:00
2014-07-01 00:00:00
2014-08-01 00:00:00
2014-09-01 00:00:00
2014-10-01 00:00:00
2014-11-01 00:00:00
2014-12-01 00:00:00
2015-01-01 00:00:00
2015-02-01 00:00:00
2015-03-01 00:00:00
2015-04-01 00:00:00
2015-05-01 00:00:00
2015-06-01 00:00:00
2015-07-01 00:00:00
2015-08-01 00:00:00
2015-09-01 00:00:00
2015-10-01 00:00:00
2015-11-01 00:00:00
2015-12-01 00:00:00
2016-01-01 00:00:00
2016-02-01 00:00:00
2016-03-01 00:00:00
2016-04-01 00:00:00
2016-05-01 00:00:00
2016-06-01 00:00:00
2016-07-01 00:00:00
2016-08-01 00:00:00
2016-09-01 00:00:00
2016-10-01 0

  if row.i_date in temp_dates:


[Info] read from /tmp/tmplm5rhtgp/https%3A/%2Fazureopendatastorage.azurefd.net/isdweatherdatacontainer/ISDWeather/year=2017/month=8/part-00000-tid-5175975263333548477-7aeb615b-cca2-42d5-99cc-5022a6daac8d-2609-8.c000.snappy.parquet
[Info] read from /tmp/tmplm5rhtgp/https%3A/%2Fazureopendatastorage.azurefd.net/isdweatherdatacontainer/ISDWeather/year=2017/month=8/part-00002-tid-5175975263333548477-7aeb615b-cca2-42d5-99cc-5022a6daac8d-2607-8.c000.snappy.parquet
[Info] read from /tmp/tmplm5rhtgp/https%3A/%2Fazureopendatastorage.azurefd.net/isdweatherdatacontainer/ISDWeather/year=2017/month=8/part-00004-tid-5175975263333548477-7aeb615b-cca2-42d5-99cc-5022a6daac8d-2610-8.c000.snappy.parquet
[Info] read from /tmp/tmplm5rhtgp/https%3A/%2Fazureopendatastorage.azurefd.net/isdweatherdatacontainer/ISDWeather/year=2017/month=8/part-00006-tid-5175975263333548477-7aeb615b-cca2-42d5-99cc-5022a6daac8d-2611-8.c000.snappy.parquet
[Info] read from /tmp/tmplm5rhtgp/https%3A/%2Fazureopendatastorage.azurefd.n

In [None]:
all = pd.DataFrame([])

In [None]:
for tmp_csv in glob.glob(os.path.join("./temp", "*.csv")):
    final_df = pd.read_csv(tmp_csv)
    all = pd.concat([all, final_df])
    all.to_csv('./nyc_energy_enriched.csv')

print('[%s] End enriching...' % datetime.now())

[2023-03-06 23:21:30.647595] End enriching...


In [None]:
final_df_list = [pd.read_csv(tmp_csv) for tmp_csv in glob.glob(os.path.join("./temp", "*.csv"))]

In [None]:
fdf = pd.concat(final_df_list)

In [None]:
fdf.to_csv(os.path.join('data','nyc_energy_data.csv'))