Create cache files of the input data for the seasonal baseline computation. The data for each month and each home will be stored separately. This allows to later read the complete data for individual months without having to read the full set of readings for each home.

In [1]:
%pylab inline

Populating the interactive namespace from numpy and matplotlib


In [2]:
import sys
sys.path.insert(0, '../src')

import pickle
import datetime

import numpy as np
import pandas as pd

import seaborn as sns

from pathlib import Path
from multiprocessing import Pool
from functools import partial

from IdealDataInterface import IdealDataInterface

from config import SENSOR_DATA_FOLDER, CACHE_FOLDER, CPU_HIGH_MEMMORY, CPU_LOW_MEMMORY
from config import EVALUATION_PERIOD, FFILL_LIMIT, SAMPLING_WINDOW_BASELINE
from utils import treatment_control, load_mains
from sampling import data_to_sample_array

In [3]:
# Run plotting styles
%run -i '../src/sns_styles.py'

cmap = sns.color_palette()

In [4]:
# Load the information about when each home was assigned to treatment or control. The start_date column sets 
# the end for the baseline period. This is currently hard-coded into the analysis pipeline. If the estimator
# should be used for other data, the function treatment_control() must be modified to return a similar table.
df_group = treatment_control()

df_group.tail()

Unnamed: 0,homeid,group,start_date,end_date
263,331,treatment,2018-05-15,2018-06-30
264,332,treatment,2018-05-15,2018-06-30
265,334,control,2018-05-15,2018-06-30
266,335,treatment,2018-05-15,2018-06-30
267,333,control,2018-05-15,2018-06-30


In [5]:
# Split into control and treatment group
homeid_control = df_group.loc[df_group['group'] == 'control','homeid']
homeid_treatment = df_group.loc[df_group['group'] == 'treatment','homeid']
homeid_enhanced = df_group.loc[df_group['group'] == 'enhanced','homeid']

print('Found {} homes in the control group'.format(len(homeid_control)))
print('Found {} homes in the treatment group'.format(len(homeid_treatment)))
print('Found {} homes in the enhanced group'.format(len(homeid_enhanced)))

Found 107 homes in the control group
Found 107 homes in the treatment group
Found 39 homes in the enhanced group


In [6]:
# This is were the data will be stored
fpath = CACHE_FOLDER / Path('seasonal_background_estimate/')

if not fpath.is_dir():
    fpath.mkdir()

# The filename for each individual array. The data for each home and each month will be stored in a separate
# file. This way it is possible to only load the data for an individual month.
fname_func = lambda homeid, month: fpath / Path('sample_array_homeid{}_month{}.npz'.format(homeid, month))

print('Skipping files which are already found in {}'.format(fpath))

# Load the electricity readings, create the array to sample from (see Section 4.1. of the documentation on
# Overleaf), and store the result to disk.
def load_sampling_array(homeid):
    # Load the mains electricity readings using the IdealDataInterface
    ts = load_mains(homeid)
      
    # Iterate over each month and create the sampling array
    for month in range(1,13):
        # Skip files that are already loaded
        if fname_func(homeid, month).is_file():
            continue
        
        # Select the respective month and resample to (re-)introduce the missing periods as NaN
        # This is needed for data_to_sample_array()
        tmp = ts[ts.index.month == month].copy().asfreq('1s')

        # If no reading is available during the month skip it
        if tmp.isna().all():
            continue
            
        # Only compute the array if at least seven full days worth of data are available (including missing)
        if (tmp.index.floor('D').max() - tmp.index.ceil('D').min()) < pd.Timedelta(days=7):
            continue
        
        # Compute the sampling array
        try:
            # The window_width is set to zero here as this can be done more efficiently once all the
            # arrays are loaded and combined (see next notebook).
            arr, idx = data_to_sample_array(tmp, window_width=None, copy=False, force_freq=False)
            np.savez_compressed(fname_func(homeid, month), arr=arr, idx=idx)
        except:
            print('Error in home {} for month{}.'.format(homeid, month))
            pass
    
    return

Skipping files which are already found in /disk/scratch/nberline/impact/cache/seasonal_background_estimate


In [7]:
# Define which homes should be loaded.
homes_to_load = list(homeid_control) + list(homeid_treatment) + list(homeid_enhanced)

# Pool() here creates a multiprocessing pool with CPU_HIGH_MEMMORY number of workers to compute load_sampling_array()
# which are passed the elements in homes_to_load as arguments. Note that pool.map() might not work if the function
# that should be computed (in this case load_sampling_array) would return a large array. From what I understood, this
# is somehow related to the maximum size of objects that can be put through some internal 'pipe mechanism' or
# whatever it might be. Bottomline, the result is written to disk directly within the function avoiding the need
# to pass around large arrays.
print('Computing the arrays to sample from. This may take a while..')
with Pool(processes=CPU_HIGH_MEMMORY) as pool:
    pool.map(load_sampling_array, homes_to_load)
print('Done.')

Computing the arrays to sample from. This may take a while..
Done.
