In [1]:
%load_ext autoreload
%autoreload 2

import pandas as pd
from rtdip_sdk.pipelines.execute import PipelineJob, PipelineStep, PipelineTask, PipelineJobExecute
from src.rtdip_sources import OpenMeteoHistoricalWeatherSource, MultiFileSource
from src.rtdip_destinations import DiskDestination
from src.rtdip_transformations import FunctionTransformer

from src.feature_extraction import prepare_dataset
from src.training import train

from src.settings import DATA_DIR, LAT_COL, LON_COL

import warnings
warnings.filterwarnings("ignore")

In [2]:
scroodge_meta = pd.read_parquet(DATA_DIR / "scrooge_metadata.parquet")

# Gather external data

The only external data I used were weather data from the Open Meteo API.  
We can use a RTDIP Job to download the data and store them in a file.  
Check out the OpenMeteoHistoricalWeatherSource and DiskDestination components I have developed for this purpose.

Let's now define the pipeline and run it

In [3]:
weather_variables = ["temperature_2m", "relative_humidity_2m", "wind_speed_10m", "wind_direction_10m",
                     "direct_radiation_instant", "diffuse_radiation_instant", "direct_normal_irradiance_instant",
                     "global_tilted_irradiance_instant"]
latitude = scroodge_meta[LAT_COL].values[0]
longitude = scroodge_meta[LON_COL].values[0]
start_date = "2017-12-31"
end_date = "2019-01-01"
timezone = "America/New_York"

get_weather_data_step = PipelineStep(
    name="get_weather_data",
    description="Download weather data from Open Meteo",
    component=OpenMeteoHistoricalWeatherSource,
    component_parameters={
        "latitude": latitude,
        "longitude": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "variables": weather_variables,
        "timezone": timezone,
        "data_frequency": "hourly"
    },
    provide_output_to_step=["dump_weather_data"]
)

dump_weather_data_step = PipelineStep(
    name="dump_weather_data",
    description="Store weather data to disk",
    component=DiskDestination,
    component_parameters={
        "path": DATA_DIR / "scrooge_weather.parquet",
        "file_type": "parquet"
    },
    depends_on_step=["get_weather_data"]
)

external_data_steps = [get_weather_data_step, dump_weather_data_step]

external_data_task = PipelineTask(
    name="external_data",
    description="Fetch external data",
    step_list=external_data_steps,
    batch_task=True
)

pipeline_job = PipelineJob(
    name="test_job",
    description="test_job", 
    version="0.0.1",
    task_list=[external_data_task]
)

In [4]:
pipeline = PipelineJobExecute(pipeline_job)

result = pipeline.run()

get_weather_data
dump_weather_data


# Modeling

For the modeling part I adopted a two step model.  
The first step deals with the temperature dependence of the electricity consumption and fits a linear model with a kink.  
After the kink the consumption is zero.  
The second model takes as an input the residuals of the first and fits a LightGBM regressor to them.  

The model is implemented in the `ElectricityModel` class in `src/model.py`  

In order to run the training step using RTDIP I developed additional sources and transformers wrapping custom python functions:
- `MultiFileSource` : this one loads a list of files. It remedies the fact that there is no joining possibility within the RTDIP components.
- `FunctionTransformer` : this one wraps a custom python function.

So, let's define our training pipeline and run it

In [5]:
scrooge_electricity_path = DATA_DIR / "scrooge_bldg.parquet"
scrooge_weather_path = DATA_DIR / "scrooge_weather.parquet"
scrooge_meta_path = DATA_DIR / "scrooge_metadata.parquet"

load_raw_data_step = PipelineStep(
    name="load_raw_data",
    description="Load raw data",
    component=MultiFileSource,
    component_parameters={
        "file_confs": [
            (str(scrooge_weather_path), "parquet"),
            (str(scrooge_electricity_path), "parquet"),
            (str(scrooge_meta_path), "parquet")
        ],
    },
    provide_output_to_step=["feature_extraction"]
)


feature_extraction_step = PipelineStep(
    name="feature_extraction",
    description="Extract features",
    component=FunctionTransformer,
    component_parameters={
        "function": prepare_dataset,
    },
    provide_output_to_step=["dump_features", "training"],
    depends_on_step=["load_raw_data"]
)

dump_features_step = PipelineStep(
    name="dump_features",
    description="Store weather data to disk",
    component=DiskDestination,
    component_parameters={
        "path": DATA_DIR / "features.parquet",
        "file_type": "parquet"
    },
    depends_on_step=["feature_extraction"]
)

training_step = PipelineStep(
    name="training",
    description="Train model",
    component=FunctionTransformer,
    component_parameters={
        "function": train,
    },
    provide_output_to_step=["dump_model"],
    depends_on_step=["feature_extraction"]
)

dump_model_step = PipelineStep(
    name="dump_model",
    description="Store model to disk",
    component=DiskDestination,
    component_parameters={
        "path": DATA_DIR / "models.pickle",
        "file_type": "pickle"
    },
    depends_on_step=["training"]
)

modeling_steps = [load_raw_data_step, feature_extraction_step, dump_features_step, training_step, dump_model_step]

modeling_task = PipelineTask(
    name="modeling",
    description="Modeling",
    step_list=modeling_steps,
    batch_task=True
)

modeling_job = PipelineJob(
    name="test_job",
    description="test_job", 
    version="0.0.1",
    task_list=[modeling_task]
)

In [8]:
# execute the modeling job

pipeline = PipelineJobExecute(modeling_job)
result = pipeline.run()


load_raw_data
feature_extraction
Finished feature extraction
Index(['timestamp', 'out.electricity.heating_hp_bkup.energy_consumption',
       'out.electricity.heating.energy_consumption',
       'out.electricity.plug_loads.energy_consumption', 'temperature_2m',
       'relative_humidity_2m', 'wind_speed_10m', 'wind_direction_10m',
       'direct_radiation_instant', 'diffuse_radiation_instant',
       ...
       'temperature_setpoint_ratio_diff_ratio_over_previous_lag3_offset2',
       'decimal_hour_cat', 'month_cat', 'dayofweek_cat', 'temperature_cat',
       'holiday', 'days_from_holiday', 'days_to_holiday', 'heating_total',
       'heating_and_plugs'],
      dtype='object', length=428)
['bldg_id']
dump_features
training
Starting training
Number of features: 28
Fitting model for variable: heating_total
Fitting model for variable: out.electricity.plug_loads.energy_consumption
dump_model


In [14]:
import pickle
from src.prepare_submission import prepare

with open(DATA_DIR / "models.pickle", "rb") as fh:
    models = pickle.load(fh)

dataset = pd.read_parquet(DATA_DIR/"features.parquet")

prepare(models)

pd.read_csv("submission.csv")

Finished feature extraction
Index(['timestamp', 'out.electricity.heating_hp_bkup.energy_consumption',
       'out.electricity.heating.energy_consumption',
       'out.electricity.plug_loads.energy_consumption', 'temperature_2m',
       'relative_humidity_2m', 'wind_speed_10m', 'wind_direction_10m',
       'direct_radiation_instant', 'diffuse_radiation_instant',
       ...
       'temperature_setpoint_ratio_diff_ratio_over_previous_lag3_offset2',
       'decimal_hour_cat', 'month_cat', 'dayofweek_cat', 'temperature_cat',
       'holiday', 'days_from_holiday', 'days_to_holiday', 'heating_total',
       'heating_and_plugs'],
      dtype='object', length=428)
['bldg_id']
['bldg_id']
heating_total                                                              float64
out.electricity.plug_loads.energy_consumption                              float64
temperature_2m_lag12                                                       float32
wind_speed_10m_rolling_mean_lag1_window_size4                  

Unnamed: 0,timestamp,party_cost
0,2018-12-28 20:15:00,0.221412
1,2018-12-28 20:30:00,0.214707
2,2018-12-28 20:45:00,0.207453
3,2018-12-28 21:00:00,0.206058
4,2018-12-28 21:15:00,0.195291
5,2018-12-28 21:30:00,0.193309
6,2018-12-28 21:45:00,0.195087
7,2018-12-28 22:00:00,0.197503
8,2018-12-28 22:15:00,0.111531
9,2018-12-28 22:30:00,0.113221
