# Prepare Train Data

In [1]:
from datetime import datetime, timezone
from datetime import timedelta
from pathlib import Path
import pandas as pd
import pyarrow as pa

In [2]:
CONSUMPTION_DATA_PATH = Path("../data/consumption")
CONSUMPTION_NE5_FILE = CONSUMPTION_DATA_PATH / "NE5_Export.csv"
CONSUMPTION_NE7_FILE = CONSUMPTION_DATA_PATH / "NE7_Export.csv"
TRAIN_DATA_PATH = Path('../data/consumption/final_train.parquet')

METEO_PATH = Path("../data/meteoswiss/reh_nzz.csv")
METEO_STATION = "REH"
METEO_TEMP_PARAMETER = "tre200h0"

TIME_FORMAT = "dd.MM.yyyy"
WINDOW_DAYS = 1
START_DATE = datetime(2010, 1, 1, tzinfo=timezone.utc)
END_DATE = datetime(2022, 1, 1, tzinfo=timezone.utc)

In [3]:
#ne5File = spark.read.csv(CONSUMPTION_NE5_FILE, header=True, inferSchema=True, sep=";")
#ne7File = spark.read.csv(CONSUMPTION_NE7_FILE, header=True, inferSchema=True, sep=";")

# consumption = (ne5File.alias("ne5")
#                       .join(ne7File.alias("ne7"), "Date")
#                       .withColumn("NE5Consumption", f.expr("ne5.Value"))
#                       .withColumn("NE7Consumption", f.expr("ne7.Value"))
#                       .withColumn("Date", f.to_date(f.col("Date"), TIME_FORMAT))
#                       .filter((f.col("Date") >= START_DATE) & (f.col("Date") < END_DATE))
#                       .select("Date", "NE5Consumption", "NE7Consumption")
#               )

consumption = pd.read_csv('https://data.stadt-zuerich.ch/dataset/ewz_stromabgabe_netzebenen_stadt_zuerich/download/ewz_stromabgabe_netzebenen_stadt_zuerich.csv')
consumption['Date'] = pd.to_datetime(consumption['Timestamp'], errors='raise', utc=True)
consumption = consumption[(consumption.Date >= START_DATE) & (consumption.Date < END_DATE)]
consumption.rename(columns={'Value_NE5': 'NE5Consumption', 'Value_NE7': 'NE7Consumption'}, inplace=True)
consumption = consumption.groupby(consumption.Date.dt.date).sum(numeric_only=True)

In [4]:
"""
meteoData = (spark.read.format("delta")
                         .load("/MeteoSwiss/Measurement/Delta")
                         .filter((f.col("Station") == METEO_STATION) & (f.col("Parameter") == METEO_TEMP_PARAMETER))
                         .withColumn("Date", f.to_date(f.col("TimestampUtc")))
                         .groupBy("Date")
                         .agg(f.avg(f.col("Value")).alias("Temperature"))
            )
"""

meteoData = pd.read_csv(METEO_PATH, encoding='iso-8859-1', sep=';')
meteoData = meteoData[meteoData.abbr == METEO_STATION]
meteoData['Date'] = pd.to_datetime(meteoData['time'], format='%Y%m%d%H%M')
meteoData = meteoData.groupby(meteoData.Date.dt.date).agg(Temperature = (METEO_TEMP_PARAMETER, 'mean'))

In [5]:
data = consumption.join(meteoData, "Date").reset_index(drop=False)

In [6]:
# data.write.parquet("/ConsumptionModel/Data/final_train.parquet")
data.to_parquet(TRAIN_DATA_PATH, engine='pyarrow')