<table style="width:100%; background-color: #EBF5FB">
  <tr>
    <td style="border: 1px solid #CFCFCF">
      <b>Household data: Processing Notebook</b>
      <ul>
        <li><a href="main.ipynb">Main Notebook</a></li>
        <li>Processing Notebook</li>
      </ul>
      <br>This Notebook is part of the <a href="http://data.open-power-system-data.org/household_data">Household Data Package</a> of <a href="http://open-power-system-data.org">Open Power System Data</a>.
    </td>
  </tr>
</table>

# Table of Contents
* [1. Introductory Notes](#1.-Introductory-Notes)
* [2. Settings](#2.-Settings)
	* [2.1 Set version number and recent changes](#2.1-Set-version-number-and-recent-changes)
	* [2.2 Import Python libraries](#2.2-Import-Python-libraries)
	* [2.3 Set directories](#2.3-Set-directories)
	* [2.4 Set up a log](#2.4-Set-up-a-log)
	* [2.5 Select timerange](#2.5-Select-timerange)
	* [2.6 Select download source](#2.6-Select-download-source)
	* [2.7 Select household subset](#2.7-Select-household-subset)
* [3. Download](#3.-Download)
* [4. Read](#4.-Read)
	* [4.1 Preparations](#4.1-Preparations)
	* [4.2 Reading loop](#4.2-Reading-loop)
	* [4.3 Save raw data](#4.3-Save-raw-data)
* [5. Processing](#5.-Processing)
	* [5.1 Validate Data](#5.1-Validate-Data)
    * [5.2 Aggregate Index equidistantly](#5.2-Aggregate-Index-equidistantly)
    * [5.3 Missing Data Handling](#5.3-Missing-Data-Handling)
    * [5.4 Aggregate hourly and 15min interval data](#5.4-Aggregate-hourly-and-15min-interval-data)
    * [5.5 Insert a column with Central European Time](#5.5-Insert-a-column-with-Central-European-Time)
* [6. Create metadata](#6.-Create-metadata)
* [7. Write data to disk](#7.-Write-data-to-disk)
	* [7.1 Limit time range](#7.1-Limit-time-range)
    * [7.2 Convert the Central European Time column to ISO-8601](#7.2-Convert-the-Central-European-Time-column-to-ISO-8601)
    * [7.3 Different shapes](#7.3-Different-shapes)
    * [7.4 Write to SQL-database](#7.4-Write-to-SQL-database)
    * [7.5 Write to Excel](#7.5-Write-to-Excel)
    * [7.6 Write to CSV](#7.6-Write-to-CSV)
    * [7.7 Write checksums.txt](#7.7-Write-checksums.txt)

# 1. Introductory Notes

This Notebook handles missing data, performs calculations and aggragations and creates the output files.

# 2. Settings

## 2.1 Set version number and recent changes
Executing this script till the end will create a new version of the data package.
The Version number specifies the local directory for the data <br>
We include a note on what has been changed.

In [None]:
version = '2020-04-15'
changes = 'Second release of all CoSSMic households'

## 2.2 Import Python libraries

In [None]:
# Python modules
import os
import numpy as np
import pandas as pd
import json
import yaml
import sqlite3
import itertools
import hashlib
import pytz
from shutil import copyfile
from datetime import datetime, date, timedelta, time

# Reload modules with execution of any code, to avoid having to restart
# the kernel after editing timeseries_scripts
%load_ext autoreload
%autoreload 2

## 2.3 Set directories

In [None]:
# make sure the working directory is this file's directory
try:
    os.chdir(home_path)
except NameError:
    home_path = os.getcwd()

config_path = os.path.join(home_path, 'conf')
data_path = os.path.join(home_path, 'household_data', version, 'original_data')
out_path = os.path.join(home_path, 'household_data', version)
temp_path = os.path.join(home_path, 'household_data', 'temp')
os.makedirs(out_path, exist_ok=True)
os.makedirs(temp_path, exist_ok=True)
os.chdir(temp_path)

## 2.4 Setup logging and scripts

In [None]:
import logging.config

logging_file = os.path.join(config_path, 'logging.cfg')
logging.config.fileConfig(logging_file)

# Scripts from household repository package
from household.download import download
from household.read import read
from household.tools import update_sets
from household.validation import validate
from household.visualization import visualize
from household.imputation import make_equidistant, fill_nan, resample_markers
from household.make_json import make_json

# Additional verbosity like printing out additional CSV files to verify feed integrity
verbose = False

## 2.5 Select timerange

Select the time range to read and process data. <br>
*Default: all data.*

Type `None` to process all available data.

In [None]:
start_from_user = None  # i.e. date(2017, 1, 1)
end_from_user = None  # i.e. date(2017, 1, 31)

## 2.6 Select download source

The raw data can be downloaded as a zip file from the OPSD Server. To do this, specify an archive version to use, that has been cached on the OPSD server as input.

In [None]:
archive_version = '2020-04-15'

## 2.7 Select household subset

Optionally, specify a subset of households to process. <br>
The next cell prints the available sources and datasets.

In [None]:
with open(os.path.join(config_path, 'households.yml'), 'r') as f:
    households = yaml.load(f.read(), Loader=yaml.FullLoader)
for household_name, household_data in households.items():
    household_id = household_name.replace(' ', '').lower()
    household_data['id'] = household_id
    household_data['name'] = household_name
    print(yaml.dump({household_name: list(household_data['series'].keys())}, default_flow_style=False))

Copy from its output and paste to following cell to get the right format.

Type `subset = None` to ignore any subset selection and include all data.

In [None]:
subset = yaml.load('''
Example household:
- example_series
''', Loader=yaml.FullLoader)

subset = None

Now eliminate households and feeds not in subset.

In [None]:
if subset:  # eliminate households and feeds not in subset
    households = {household_name: {k: v for k, v in households[household_name].items()}
                  for household_name, series_list in subset.items()}
    
    for household_name in subset.keys():
        series_ignore = []
        for series_name in households[household_name]['series']:
            if not series_name in subset[household_name]:
                series_ignore.append(series_name)
    
    for series_name in series_ignore:
        del households[household_name]['series'][series_name]

# 3. Download

This section: download raw data to process.

If the original data does not exist, it will be downloaded from the OPSD Server and extracted in a local directory

In [None]:
download(out_path, version=archive_version)

# 4. Read

This section: Read each downloaded file into a pandas-DataFrame and merge data from different sources if it has the same time resolution.

## 4.1 Preparations
Set the title of the rows at the top of the data used to store metadata internally. The order of this list determines the order of the levels in the resulting output.

In [None]:
headers = ['region', 'household', 'type', 'unit', 'feed']

## 4.2 Reading loop

Loop through households and feeds to do the reading

In [None]:
# For each source in the household dictionary
household_data = {}
for household in households.values():
    data = read(household['name'], household['dir'], household['region'], household['type'], 
                household['series'], headers, 
                start_from_user=start_from_user,
                end_from_user=end_from_user)
    
    household_data[household['id']] = data


## 4.3 Validate and save raw data

With the raw data being the unvalidated measurements of a research prototype under development, certain steps should be taken, to remove clearly incorrect measured data points or react to other events, such as the counter reset of an energy meter.

Save the validated DataFrames to disk. This way you have the raw data to fall back to if something goes wrong in the remainder of this notebook without having to repeat the previos steps.

In [None]:
os.makedirs('raw_data', exist_ok=True)
for household in households.values():
    data = validate(household, household_data[household['id']], config_dir=config_path, verbose=verbose)
    data.columns.names = headers
    data.to_pickle(os.path.join('raw_data', household['id']+'.pickle'))


Load the data series saved above

In [None]:
household_data = {}
for household in households.values():
    household_data[household['id']] = pd.read_pickle(os.path.join('raw_data', household['id']+'.pickle'))


# 5. Processing

This section: Handle missing data and aggregation to a regular, equidistant index, as well as 15 and 60 minute intervals.


## 5.1 Aggregate Index equidistantly

Most of the acquired household data comes in irregular time intervals and is highly unsynchronized for different data series.

To improve readability and comparability, regular intervals will be assumed and interpolated according to existing values. Gaps greater than 15 minutes will be left untouched.

In [None]:
os.makedirs('fixed_data', exist_ok=True)
for household in households.values():
    data = make_equidistant(household, household_data[household['id']], 1)
    data.to_pickle(os.path.join('fixed_data', household['id']+'.pickle'))


Load the equidistant data series

In [None]:
household_data = {}
for household in households.values():
    household_data[household['id']] = pd.read_pickle(os.path.join('fixed_data', household['id']+'.pickle'))

## 5.2 Missing Data Handling

Patch missing data. At this stage, only small gaps (up to 1 hour) are filled by linear interpolation. This catched most of the missing data due to daylight savings time transitions or failed radio transmissions, while filling bigger gaps with values from the prior day or prior week.

The exact locations of missing data are stored in the `data_nan` DataFrames.

Where data has been interpolated, it is marked in a new column `comment`. For eaxample the comment `residential_004_pv;` means that in the original data, there is a gap in the solar generation timeseries from the Resident 4 in the time period where the marker appears.

Patch the datasets and display the location of missing Data in the original data.

In [None]:
os.makedirs('filled_data', exist_ok=True)
for household in households.values():
    data, data_nan = fill_nan(household_data[household['id']], household['name'], headers, config_dir=config_path)
    data.to_pickle(os.path.join('filled_data', household['id']+'.pickle'))
    
    writer = pd.ExcelWriter(os.path.join('filled_data', household['id']+'_NaN.xlsx'))
    data_nan.to_excel(writer, 'NaN')
    writer.save()
    
    if verbose:
        visualize(data)

Load all patched data sets

In [None]:
data_sets = {}
with open(os.path.join(config_path, 'households.yml'), 'r') as f:
    households_full = yaml.load(f.read(), Loader=yaml.FullLoader)

for household_name in households_full:
    household_id = household_name.replace(' ', '').lower()

    #data_file = os.path.join('raw_data', household_id+'.pickle')
    #if os.path.isfile(data_file):
    #    data = pd.read_pickle(data_file)
    #    update_sets('raw', data, data_sets)

    data_file = os.path.join('filled_data', household_id+'.pickle')
    if os.path.isfile(data_file):
        data = pd.read_pickle(data_file)
        update_sets('1min', data, data_sets)

## 5.3 Aggregate hourly and 15min interval data

As some data comes in 1-minute, other (older series) in 3-minute intervals, a harmonization can be done. With most billing intervals being at 15-minute and 60-minute ranges, a resampling to those intervals can be done to improve the overview over the data.

The marker column is resampled separately in such a way that all information on where data has been interpolated is preserved.

To do this, condense the marker column to both 15 and 60 minutes resolution, resample the series and update the condensed markers

In [None]:
data = data_sets['1min']
minute = data.index[0].minute + (15 - data.index[0].minute % 15)
hour = data.index[0].hour
if(minute > 59):
    minute = 0
    hour += 1
start_15 = data.index[0].replace(hour=hour, minute=minute, second=0)

minute = data.index[-1].minute + (15 - data.index[-1].minute % 15)
hour = data.index[-1].hour
if(minute > 59):
    minute = 0
    hour += 1
end_15 = data.index[-1].replace(hour=hour, minute=minute, second=0)

index_15 = pd.date_range(start=start_15, end=end_15, freq='15min')
marker_15 = data['interpolated'].groupby(
    pd.Grouper(freq='15min', closed='left', label='left')
    ).agg(resample_markers).reindex(index_15)

data_sets['15min'] = data.resample('15min').last()
data_sets['15min']['interpolated'] = marker_15

In [None]:
start_60 = data.index[0].replace(minute=0, second=0)
end_60 = data.index[-1].replace(minute=0, second=0)
index_60 = pd.date_range(start=start_60, end=end_60, freq='60min')
marker_60 = data['interpolated'].groupby(
    pd.Grouper(freq='60min', closed='left', label='left')
    ).agg(resample_markers).reindex(index_60)

data_sets['60min'] = data.resample('60min').last()
data_sets['60min']['interpolated'] = marker_60

## 5.4 Insert a column with Central European Time

The index column of th data sets defines the start of the timeperiod represented by each row of that data set in **UTC** time. We include an additional column for the **CE(S)T** Central European (Summer-) Time, as this might help aligning the output data with other data sources.

In [None]:
info_cols = {'utc': 'utc_timestamp',
             'cet': 'cet_cest_timestamp',
             'marker': 'interpolated'}

In [None]:
for res_key, df in data_sets.items():
    if df.empty:
        continue
    if df.index.tzinfo is None or df.index.tzinfo.utcoffset(df.index) is None:
        df.index = df.index.tz_localize('UTC')
    df.index.rename(info_cols['utc'], inplace=True)
    df.insert(0, info_cols['cet'], df.index.tz_convert('Europe/Berlin'))

Create a final savepoint

In [None]:
os.makedirs('final_data', exist_ok=True)
for res_key, data_set in data_sets.items():
    data_set.to_pickle(os.path.join('final_data', res_key+'.pickle'))

In [None]:
data_sets = {}
#data_sets['raw'] = pd.read_pickle(os.path.join('final_data', 'raw.pickle'))
data_sets['1min'] = pd.read_pickle(os.path.join('final_data', '1min.pickle'))
data_sets['15min'] = pd.read_pickle(os.path.join('final_data', '15min.pickle'))
data_sets['60min'] = pd.read_pickle(os.path.join('final_data', '60min.pickle'))

## 6. Create metadata

This section: create the metadata, both general and column-specific. All metadata we be stored as a JSON file.

In [None]:
# change to out_path directory
os.chdir(out_path)
os.getcwd()

In [None]:
make_json(data_sets, info_cols, version, changes, headers)

# 7. Write data to disk

This section: Save as [Data Package](http://data.okfn.org/doc/tabular-data-package) (data in CSV, metadata in JSON file). All files are saved in the directory of this notebook. Alternative file formats (SQL, XLSX) are also exported. Takes about 1 hour to run.

## 7.1 Limit time range

Cut off the data outside of [start_from_user:end_from_user]

In [None]:
for res_key, df in data_sets.items():
    # First, convert userinput to UTC time to conform with data_set.index
    if start_from_user:
        start_from_user = (
            pytz.timezone('Europe/Berlin')
            .localize(datetime.combine(start_from_user, time()))
            .astimezone(pytz.timezone('UTC')))
    if end_from_user and 'min' in res_key:
        end_from_user = (
            pytz.timezone('Europe/Berlin')
            .localize(datetime.combine(end_from_user, time()))
            .astimezone(pytz.timezone('UTC'))
            # appropriate offset to inlude the end of period
            + timedelta(days=1, minutes=-int(res_key[:res_key.index('min')])))
    # Then cut off the data_set
    data_sets[res_key] = df.loc[start_from_user:end_from_user, :]

## 7.2 Convert the Central European Time column to ISO-8601

Convert the UTC, as well as the Central European Time indexes to ISO-8601

In [None]:
for res_key, df in data_sets.items():
    df.iloc[:,0] = df.iloc[:,0].dt.strftime('%Y-%m-%dT%H:%M:%S%z')

## 7.3 Different shapes

Data are provided in three different "shapes": 
- SingleIndex (easy to read for humans, compatible with datapackage standard, small file size)
  - Fileformat: CSV, SQLite
- MultiIndex (easy to read into GAMS, not compatible with datapackage standard, small file size)
  - Fileformat: CSV, Excel
- Stacked (compatible with data package standard, large file size, many rows, too many for Excel) 
  - Fileformat: CSV

The different shapes need to be created internally befor they can be saved to files. Takes about 1 minute to run.

In [None]:
data_sets_singleindex = {}
data_sets_multiindex = {}
data_sets_stacked = {}
for res_key, df in data_sets.items():
    # MultIndex
    data_sets_multiindex[res_key + '_multiindex'] = df
    
    # SingleIndex
    df_singleindex = df.copy()
    # use first 3 levels of multiindex to create singleindex
    df_singleindex.columns = [
        col[0] if col[0] in info_cols.values()
        else next(iter([l for l in col if l in df.columns.get_level_values('region')] or []), None) + \
                '_' + next(iter([l for l in col if l in df.columns.get_level_values('household')] or []), None) + \
                '_' + next(iter([l for l in col if l in df.columns.get_level_values('feed')] or []), None)
        for col in df.columns.values]
    
    data_sets_singleindex[res_key + '_singleindex'] = df_singleindex
    
    # Stacked
    stacked = df.copy()
    stacked.drop(info_cols['cet'], axis=1, inplace=True)
    stacked.columns = stacked.columns.droplevel(['region', 'type', 'unit'])
    stacked = stacked.transpose().stack(dropna=True).to_frame(name='data')
    data_sets_stacked[res_key + '_stacked'] = stacked

## 7.4 Write to SQL-database

This file format is required for the filtering function on the OPSD website. This takes about 30 seconds to complete.

In [None]:
for res_key, df in data_sets_singleindex.items():
    if res_key.startswith('raw'):
        continue
    
    df_sql = df.copy()
    df_sql.index = df_sql.index.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    table = 'household_data_' + res_key
    df_sql.to_sql(table, sqlite3.connect('household_data.sqlite'),
                  if_exists='replace', index_label=info_cols['utc'])

## 7.5 Write to Excel

Writing the full tables to Excel takes extremely long. As a workaround, only the first 5 rows are exported. The rest of the data can than be inserted manually from the `_multindex.csv` files.

In [None]:
writer = pd.ExcelWriter('household_data.xlsx')
for res_key, df in data_sets_multiindex.items():
    if res_key.startswith('raw') or res_key.startswith('1min'):
        # Excel max sheet size is 1048576 rows, while raw and 1min resolution data has a lot more
        continue
    
    df_csv = df.copy()
    df_csv.index = df_csv.index.strftime('%Y-%m-%dT%H:%M:%SZ')
    
    df_csv.to_excel(writer, res_key.split('_')[0], float_format='%.3f',
                    merge_cells=True)
writer.save()

## 7.6 Write to CSV

In [None]:
# itertoools.chain() allows iterating over multiple dicts at once
for res_key, df in itertools.chain(
        data_sets_singleindex.items(),
        data_sets_multiindex.items(),
        data_sets_stacked.items()
    ):
    filename = 'household_data_' + res_key + '.csv'
    df.to_csv(filename, float_format='%.3f',
              date_format='%Y-%m-%dT%H:%M:%SZ')

## 7.7 Write checksums.txt

We publish SHA-checksums for the outputfiles on GitHub to allow verifying the integrity of outputfiles on the OPSD server.

In [None]:
def get_sha_hash(path, blocksize=65536):
    sha_hasher = hashlib.sha256()
    with open(path, 'rb') as f:
        buffer = f.read(blocksize)
        while len(buffer) > 0:
            sha_hasher.update(buffer)
            buffer = f.read(blocksize)
        return sha_hasher.hexdigest()

files = os.listdir(out_path)

# Create checksums.txt in the output directory
with open('checksums.txt', 'w') as f:
    for file_name in files:
        if file_name.split('.')[-1] in ['csv', 'sqlite', 'xlsx']:
            file_hash = get_sha_hash(file_name)
            f.write('{},{}\n'.format(file_name, file_hash))

# Copy the file to root directory from where it will be pushed to GitHub,
# leaving a copy in the version directory for reference
copyfile('checksums.txt', os.path.join(home_path, 'checksums.txt'))