In [1]:
import logging
from pathlib import Path

import pandas as pd
from pandas.api.types import CategoricalDtype
from tqdm import tqdm


def datadir(*args):
    return Path.cwd().parent.joinpath("data", *args).resolve()


path_heeten_data = datadir("GridFlexHeetenDataset.csv")  # 61.49GB
target_time_range = [
    ["2018-08-01 00:00:00+00:00", "2019-08-01 00:00:00+00:00"],
    ["2019-08-01 00:00:00+00:00", "2020-08-01 00:00:00+00:00"],
]  # The data is from 2018-08-01T01:59:00+02:00 to 2020-08-31T23:58:00+02:00
target_measurement = [
    "UNC_KW",
    "TOTAL_KW",
    "EXPORT_KW",
    "IMPORT_KW",
    "PV_KW",
]

In [23]:
_cat_house = CategoricalDtype(
    categories=[f"House{i}" for i in range(1, 78)], ordered=True
)  # We disregard "HouseTest", so "HouseTest" will resolve to NaN
_cat_appliance = CategoricalDtype(
    categories=["SMARTMETER", "PVMETER", "BATTERY"], ordered=True
)
_cat_measurement = CategoricalDtype(
    categories=[
        "BATTERY_EXPORT_KW",
        "BATTERY_IMPORT_KW",
        "BATTERY_KW",
        "BATTERY_TARGET_KW",
        "BATTERY_TARGET_MODE",
        "CHARGE_MODE",
        "CURRENT_PHASE_1",
        "CURRENT_PHASE_2",
        "CURRENT_PHASE_3",
        "EXPORT_KW",
        "EXPORT_KWH",
        "GAS_USAGE_M3",
        "IMPORT_KW",
        "IMPORT_KWH",
        "MAX_BATTERY_KW",
        "MIN_BATTERY_KW",
        "MOMENTARY_EXPORT_KW",
        "MOMENTARY_IMPORT_KW",
        "MOMENTARY_PV_KW",
        "OPERATIONAL_STATE",
        "PV_KW",
        "PV_KWH",
        "REQ_CHARGE_MODE",
        "STATE_OF_CHARGE",
        "TOTAL_KW",
        "TOTAL_KWH",
        "UNC_KW",
    ],
    ordered=True,
)
dtype = {
    "timestamp-local": str,
    "house": _cat_house,
    "appliance": _cat_appliance,
    "measurement": _cat_measurement,
    "value": "float64",
}
names = ["timestamp-local", "house", "appliance", "measurement", "value"]
# Mixed time offsets
# CET: +01:00 (https://en.wikipedia.org/wiki/Central_European_Time)
# CEST: +02:00 (https://en.wikipedia.org/wiki/Central_European_Summer_Time)
small_df = pd.read_csv(
    path_heeten_data,
    nrows=10,
    header=0,
    names=names,
    dtype=dtype,
)
small_df.set_index(
    pd.to_datetime(small_df["timestamp-local"], utc=True).rename("timestamp"),
    inplace=True,
)
small_df.insert(1, "year", small_df.index.year)
small_df
# small_df.groupby("house", observed=True).describe()
# small_df.loc["2018-08-01 00:00:00+00:00":"2018-08-01 12:08:00+00:00"]
# small_df.query("'2018-08-01 02:00:00+02:00' <= index <= '2018-08-01 02:05:00+02:00'")  # " <= index <= ".join(map(lambda x: "'" + x + "'", target_time_range[0]))

Unnamed: 0_level_0,timestamp-local,year,house,appliance,measurement,value
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2018-07-31 23:59:00+00:00,2018-08-01T01:59:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.037
2018-08-01 00:00:00+00:00,2018-08-01T02:00:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.037
2018-08-01 00:01:00+00:00,2018-08-01T02:01:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.033
2018-08-01 00:02:00+00:00,2018-08-01T02:02:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.041
2018-08-01 00:03:00+00:00,2018-08-01T02:03:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.034
2018-08-01 00:04:00+00:00,2018-08-01T02:04:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.033
2018-08-01 00:05:00+00:00,2018-08-01T02:05:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.025
2018-08-01 00:06:00+00:00,2018-08-01T02:06:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.028
2018-08-01 00:07:00+00:00,2018-08-01T02:07:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.028
2018-08-01 00:08:00+00:00,2018-08-01T02:08:00+02:00,2018,House6,BATTERY,BATTERY_KW,-0.028


In [24]:
try:
    small_df.sort_index().loc[
        target_time_range[0][0] : target_time_range[0][1]
    ].to_parquet(
        "test",
        engine="pyarrow",
        partition_cols=["year", "house", "measurement"],
    )
except Exception as e:
    logging.exception(e, small_df.sort_index(), exc_info=False)

## Filter and save to parquet

In [None]:
output_path_2018 = datadir("Heeten_2018")
output_path_2019 = datadir("Heeten_2019")

# small_df.to_csv(output_path_2018, mode="a", header=False) if output_path_2018.is_file() else small_df.to_csv(output_path_2018)
with pd.read_csv(
    path_heeten_data,
    chunksize=10_000,
    index_col=["timestamp"],
    parse_dates=["timestamp"],
    dtype=dtype,
) as reader:
    for chunk in tqdm(reader):
        if chunk["measurement"].isin(target_measurement).any():
            # remove rows containing NaN (i.e. "HouseTest") in `house` column
            chunk.dropna(subset=["house"], inplace=True)
            # picks up row containing target_measurement in `measurement` column
            chunk.query("measurement in @target_measurement", inplace=True)

            if chunk.empty:
                continue

            # TODO:
            # - async?
            try:
                chunk.sort_index().loc[
                    target_time_range[0][0] : target_time_range[0][1]
                ].to_parquet(
                    output_path_2018,
                    engine="pyarrow",
                    partition_cols=["house", "measurement"],
                )
                chunk.sort_index().loc[
                    target_time_range[1][0] : target_time_range[1][1]
                ].to_parquet(
                    output_path_2019,
                    engine="pyarrow",
                    partition_cols=["house", "measurement"],
                )
            except Exception as e:
                logging.exception(e, chunk.sort_index(), exc_info=False)
            # Write to 2018
            # if output_path_2018.is_file():
            #     chunk.sort_index().loc[
            #         target_time_range[0][0] : target_time_range[0][1]
            #     ].to_csv(output_path_2018, mode="a", header=False)
            # else:
            #     chunk.sort_index().loc[
            #         target_time_range[0][0] : target_time_range[0][1]
            #     ].to_csv(output_path_2018)

            # Write to 2019
            # if output_path_2019.is_file():
            #     chunk.sort_index().loc[
            #         target_time_range[1][0] : target_time_range[1][1]
            #     ].to_csv(output_path_2019, mode="a", header=False)
            # else:
            #     chunk.sort_index().loc[
            #         target_time_range[1][0] : target_time_range[1][1]
            #     ].to_csv(output_path_2019)

## Read parquet

In [None]:
# output_path_2018 = datadir("Heeten_2018")
# df = pd.read_parquet(output_path_2018, columns=["house", "appliance", "measurement", "value"])
df.info()