In [1]:
import warnings
import datetime
warnings.filterwarnings("ignore")

import os
import gc
import pickle

import numpy as np
import pandas as pd
import polars as pl
import plotly.express as px
from catboost import CatBoostRegressor
from sklearn.ensemble import VotingRegressor
import lightgbm as lgb

In [2]:
class DataStorage:
    root = "/kaggle/input/predict-energy-behavior-of-prosumers"

    data_cols = [
        "target",
        "county",
        "is_business",
        "product_type",
        "is_consumption",
        "datetime",
        "row_id",
    ]
    client_cols = [
        "product_type",
        "county",
        "eic_count",
        "installed_capacity",
        "is_business",
        "date",
    ]
    forecast_weather_cols = [
        "latitude",
        "longitude",
        "hours_ahead",
        "temperature",
        "dewpoint",
        "cloudcover_high",
        "cloudcover_low",
        "cloudcover_mid",
        "cloudcover_total",
        "10_metre_u_wind_component",
        "10_metre_v_wind_component",
        "forecast_datetime",
        "direct_solar_radiation",
        "surface_solar_radiation_downwards",
        "snowfall",
        "total_precipitation",
    ]
    historical_weather_cols = [
        "datetime",
        "temperature",
        "dewpoint",
        "rain",
        "snowfall",
        "surface_pressure",
        "cloudcover_total",
        "cloudcover_low",
        "cloudcover_mid",
        "cloudcover_high",
        "windspeed_10m",
        "winddirection_10m",
        "shortwave_radiation",
        "direct_solar_radiation",
        "diffuse_radiation",
        "latitude",
        "longitude",
    ]
    location_cols = ["longitude", "latitude", "county"]
    target_cols = [
        "target",
        "county",
        "is_business",
        "product_type",
        "is_consumption",
        "datetime",
    ]

    def __init__(self):
        self.df_data = pl.read_csv(
            os.path.join(self.root, "train.csv"),
            columns=self.data_cols,
            try_parse_dates=True,
        )
        self.df_client = pl.read_csv(
            os.path.join(self.root, "client.csv"),
            columns=self.client_cols,
            try_parse_dates=True,
        )
        self.df_forecast_weather = pl.read_csv(
            os.path.join(self.root, "forecast_weather.csv"),
            columns=self.forecast_weather_cols,
            try_parse_dates=True,
        )
        self.df_historical_weather = pl.read_csv(
            os.path.join(self.root, "historical_weather.csv"),
            columns=self.historical_weather_cols,
            try_parse_dates=True,
        )
        self.df_weather_station_to_county_mapping = pl.read_csv(
            os.path.join(self.root, "weather_station_to_county_mapping.csv"),
            columns=self.location_cols,
            try_parse_dates=True,
        )
        
        self.df_target = self.df_data.select(self.target_cols)
        


        self.schema_data = self.df_data.schema
        self.schema_client = self.df_client.schema
        self.schema_forecast_weather = self.df_forecast_weather.schema
        self.schema_historical_weather = self.df_historical_weather.schema
        self.schema_target = self.df_target.schema

        self.df_weather_station_to_county_mapping = (
            self.df_weather_station_to_county_mapping.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
        )

    def update_with_new_data(
        self,
        df_new_client,
        df_new_forecast_weather,
        df_new_historical_weather,
        df_new_target
    ):
        df_new_client = pl.from_pandas(
            df_new_client[self.client_cols], schema_overrides=self.schema_client
        )
        df_new_forecast_weather = pl.from_pandas(
            df_new_forecast_weather[self.forecast_weather_cols],
            schema_overrides=self.schema_forecast_weather,
        )
        df_new_historical_weather = pl.from_pandas(
            df_new_historical_weather[self.historical_weather_cols],
            schema_overrides=self.schema_historical_weather,
        )
        df_new_target = pl.from_pandas(
            df_new_target[self.target_cols], schema_overrides=self.schema_target
        )

        self.df_client = pl.concat([self.df_client, df_new_client]).unique(
            ["date", "county", "is_business", "product_type"]
        )
        self.df_forecast_weather = pl.concat(
            [self.df_forecast_weather, df_new_forecast_weather]
        ).unique(["forecast_datetime", "latitude", "longitude", "hours_ahead"])
        self.df_historical_weather = pl.concat(
            [self.df_historical_weather, df_new_historical_weather]
        ).unique(["datetime", "latitude", "longitude"])
        self.df_target = pl.concat([self.df_target, df_new_target]).unique(
            ["datetime", "county", "is_business", "product_type", "is_consumption"]
        )

    def preprocess_test(self, df_test):
        df_test = df_test.rename(columns={"prediction_datetime": "datetime"})
        df_test = pl.from_pandas(
            df_test[self.data_cols[1:]], schema_overrides=self.schema_data
        )
        return df_test

In [3]:
import holidays
class FeaturesGeneratorConsumptionNormalized:
    def __init__(self, data_storage):
        self.data_storage = data_storage
        self.estonian_holidays = list(holidays.country_holidays('EE', years=range(2021, 2026)).keys())
        
    def _add_general_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
                pl.col("datetime").dt.hour().alias("hour"),
                pl.col("datetime").dt.day().alias("day"),
                pl.col("datetime").dt.weekday().alias("weekday"),
                pl.col("datetime").dt.month().alias("month"),
                pl.col("datetime").dt.year().alias("year"),
            )
            .with_columns(
                pl.concat_str(
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    separator="_",
                ).alias("segment"),
            )
            .with_columns(
                (np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
                (np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
                (np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
                (np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
            )
        )
        return df_features
    def _add_client_features(self, df_features):
        df_client = self.data_storage.df_client

        df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=2)).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left",
        )
        return df_features
        
    def _add_forecast_weather_features(self, df_features):
        df_forecast_weather = self.data_storage.df_forecast_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )
        df_forecast_weather = (
            df_forecast_weather.rename({"forecast_datetime": "datetime"})
            .filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
            .drop("hours_ahead")
            .with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        df_forecast_weather_local = (
            df_forecast_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [0, 7 * 24]:
            df_forecast_weather_local = df_forecast_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
            )
            df_features = df_features.join( #unique list?
                df_forecast_weather_local,
                on=["county", "datetime"],
                how="left",
                suffix=f"_forecast_local_{hours_lag}h",
            )
            
            
        u_comp = df_features['10_metre_u_wind_component']
        v_comp = df_features['10_metre_v_wind_component']
        for hours_lag in [0, 168]:
            df_features = df_features.with_columns(
                (np.sqrt(u_comp ** 2 + v_comp ** 2)).alias(f"windspeed_10m_forecast_local_{hours_lag}h"),
        
            )
        return df_features
    
    def _add_historical_weather_features(self, df_features):
        df_historical_weather = self.data_storage.df_historical_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_historical_weather = (
            df_historical_weather.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        
        df_historical_weather_local = (
            df_historical_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )
        
        df_historical_weather_local = df_historical_weather_local.with_columns(
            (pl.col("rain") / 1000 + pl.col("snowfall") / 100).alias('total_precipitation'))

        for hours_lag in [2 * 24, 7 * 24]:
            df_features = df_features.join(
                df_historical_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_historical_local_{hours_lag}h",
            )
        return df_features
    
    def _rename_columns(self, df_features):
        for_list = ['temperature', 'dewpoint',
       'cloudcover_high', 'cloudcover_low', 'cloudcover_mid',
       'cloudcover_total', '10_metre_u_wind_component',
       '10_metre_v_wind_component', 'direct_solar_radiation',
       'surface_solar_radiation_downwards', 'snowfall', 'total_precipitation']
        
        hist_list = ['rain', 'surface_pressure', 'windspeed_10m',
                     'winddirection_10m', 'shortwave_radiation', 'diffuse_radiation']
        
        for col in for_list:
            df_features = df_features.rename({col: f"{col}_forecast_local_0h"})
            
        for col in hist_list:
            df_features = df_features.rename({col: f"{col}_historical_local_48h"})
        return df_features
        
    def _add_target_features(self, df_features):
        df_target = self.data_storage.df_target
        df_client = self.data_storage.df_client
        
        df_target_all_type_sum = (
            df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
            .sum()
            .drop("product_type")
        )

        df_target_all_county_type_sum = (
            df_target.group_by(["datetime", "is_business", "is_consumption"])
            .sum()
            .drop("product_type", "county")
        )

        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
            365*24, 367*24, 372*24
        ]:
            df_features = df_features.join(
                df_client.with_columns(
                    (pl.col("date") + pl.duration(days=int(hours_lag/24))).cast(pl.Date)
                ),
                on=["county", "is_business", "product_type", "date"],
                how="left", suffix=f"_{hours_lag}h"
            )
            
            df_features = df_features.join(
                df_target.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_{hours_lag}h"}),
                on=[
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    "datetime",
                ],
                how="left",
            )
#             df_features = df_features.with_columns((
#             pl.col(f"target_{hours_lag}h") / pl.col(f"eic_count_{hours_lag}h")).alias(f"new_target_{hours_lag}h"))
            
        columns_to_calculate_mean = [f'target_{hours}h' for hours in [
            2 * 24, 3 * 24, 4 * 24, 5 * 24, 6 * 24, 7 * 24, 8 * 24
        ]]
        df_features = df_features.with_columns(df_features.select(columns_to_calculate_mean).mean(axis=1).fill_null(strategy='forward').alias('norm_target'))
        
        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
            365*24, 367*24, 372*24
        ]:
            df_features = df_features.with_columns((
            pl.col(f"target_{hours_lag}h") / pl.col(f"norm_target")).alias(f"new_target_{hours_lag}h"))
        
#         df_features = df_features.with_columns((
#             pl.col(f"eic_count") / pl.col(f"norm_target")).alias(f"eic_count_normalized"))
        

        for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
            df_features = df_features.join(
                df_target_all_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
                on=["county", "is_business", "is_consumption", "datetime"],
                how="left",
            )

            df_features = df_features.join(
                df_target_all_county_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
                on=["is_business", "is_consumption", "datetime"],
                how="left",
                suffix=f"_all_county_type_sum_{hours_lag}h",
            )

        cols_for_stats = [
            f"new_target_{hours_lag}h" for hours_lag in [2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
           ]
        ]
        df_features = df_features.with_columns(
            df_features.select(cols_for_stats).mean(axis=1).alias(f"new_target_mean"),
            df_features.select(cols_for_stats)
            .transpose()
            .std()
            .transpose()
            .to_series()
            .alias(f"new_target_std"),
        )

        for target_prefix, lag_numerator, lag_denominator in [
            ("new_target", 24 * 7, 24 * 14),
            ("new_target", 24 * 2, 24 * 9),
            ("new_target", 24 * 3, 24 * 10),
            ("new_target", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 7, 24 * 14),
            ("target_all_county_type_sum", 24 * 2, 24 * 3),
            ("target_all_county_type_sum", 24 * 7, 24 * 14),
        ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    / (pl.col(f"{target_prefix}_{lag_denominator}h") + 1e-3)
                ).alias(f"{target_prefix}_ratio_{lag_numerator}_{lag_denominator}")
            )
            
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    - (pl.col(f"{target_prefix}_{lag_denominator}h"))
                ).alias(f"{target_prefix}_difference_{lag_numerator}_{lag_denominator}")
            )
        return df_features
    
    def _normalize_target_columns(self, df_features):
        columns_to_calculate_mean = [f'target_{hours}h' for hours in [2 * 24, 3 * 24, 4 * 24, 5 * 24, 6 * 24, 7 * 24, 8 * 24]]
        df_features = df_features.with_columns(df_features.select(columns_to_calculate_mean).mean(axis=1).fill_null(strategy='forward').alias('norm_target'))
        
        
        
        columns_to_norm = [column for column in df_features.columns if ('target' in column and column != 'norm_target') ]
        for column in columns_to_norm:
            df_features = df_features.with_columns((pl.col(column) / pl.col('norm_target')).alias(f'{column}_normalized'))

        df_features = df_features.with_columns((pl.col('eic_count') / pl.col('norm_target')).alias(f'eic_count_normalized'))
        
        return df_features
    
    def _add_holidays_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.when(pl.col("date").is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday"),
                pl.when((pl.col("date") - datetime.timedelta(days=7)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_7d"),
                pl.when((pl.col("date") - datetime.timedelta(days=2)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_2d")
            )
        )
        return df_features
    
    def _add_imbalance_features(self, df_features):
        imb_list_forecast_0 = [
                      'dewpoint_forecast_local_0h',
                      'temperature_forecast_local_0h',
                      'direct_solar_radiation_forecast_local_0h',
                      'surface_solar_radiation_downwards_forecast_local_0h',
                      'total_precipitation_forecast_local_0h']
        imb_list_forecast_168 = [
                      'dewpoint_forecast_local_168h',
                      'temperature_forecast_local_168h',
                      'direct_solar_radiation_forecast_local_168h',
                      'surface_solar_radiation_downwards_forecast_local_168h',
                      'total_precipitation_forecast_local_168h']
        for feature2, feature3 in zip(imb_list_forecast_0, imb_list_forecast_168):
            df_features = df_features.with_columns(((pl.col(feature2) - pl.col(feature3)) / (pl.col(feature2) + pl.col(feature3))).cast(pl.Float32).alias(f"{feature2}_{feature3}_imbalance"))
        return df_features

    def _reduce_memory_usage(self, df_features):
        df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
        return df_features
    
    def _drop_columns(self, df_features):
        df_features = df_features.drop(
            "date", "datetime", "hour", "dayofyear", 'block', 'forecast_date', 'eic_count_120h','eic_count_144h','eic_count_168h',
            'eic_count_192h','eic_count_216h','eic_count_240h','eic_count_264h','eic_count_288h','eic_count_312h','eic_count_336h','eic_count_48h',
            'eic_count_72h','eic_count_8760h','eic_count_8808h','eic_count_8928h','eic_count_96h','installed_capacity_120h',
            'installed_capacity_144h','installed_capacity_168h','installed_capacity_192h','installed_capacity_216h','installed_capacity_240h','installed_capacity_264h',
            'installed_capacity_288h','installed_capacity_312h','installed_capacity_336h','installed_capacity_48h','installed_capacity_72h','installed_capacity_8760h',
            'installed_capacity_8808h','installed_capacity_8928h','installed_capacity_96h',
            'target_120h','target_144h','target_168h','target_192h','target_216h','target_240h',
            'target_264h','target_288h','target_312h','target_336h', 'target_360h,','target_48h','target_72h','target_8760h','target_8808h',
            'target_8928h','target_96h','target_all_county_type_sum_168h','target_all_county_type_sum_336h','target_all_county_type_sum_48h','target_all_county_type_sum_72h','target_all_county_type_sum_difference_168_336',
            'target_all_county_type_sum_difference_48_72','target_all_county_type_sum_ratio_168_336','target_all_county_type_sum_ratio_48_72','target_all_type_sum_168h','target_all_type_sum_336h','target_all_type_sum_48h',
            'target_all_type_sum_72h','target_all_type_sum_difference_168_336','target_all_type_sum_difference_48_72','target_all_type_sum_ratio_168_336','target_all_type_sum_ratio_48_72',
            'installed_capacity', 'day', '10_metre_u_wind_component_forecast_local_0h', '10_metre_v_wind_component_forecast_local_0h',
            '10_metre_u_wind_component_forecast_local_168h', '10_metre_v_wind_component_forecast_local_168h'
            'winddirection_10m_historical_local_168h', 'winddirection_10m_historical_local_48h', #???
            'cloudcover_high_forecast_local_0h',
            'cloudcover_high_forecast_local_168h',
            'cloudcover_high_historical_local_168h',
            'cloudcover_high_historical_local_48h',
            'cloudcover_low_forecast_local_0h',
            'cloudcover_low_forecast_local_168h',
            'cloudcover_low_historical_local_168h',
            'cloudcover_low_historical_local_48h',
            'cloudcover_mid_forecast_local_0h',
            'cloudcover_mid_forecast_local_168h',
            'cloudcover_mid_historical_local_168h',
            'cloudcover_mid_historical_local_48h',
            'cloudcover_total_forecast_local_0h',
            'cloudcover_total_forecast_local_168h',
            'cloudcover_total_historical_local_168h',
            'cloudcover_total_historical_local_48h',
            )
        return df_features
    
    
    def _to_pandas(self, df_features, y):
        cat_cols = [
            "county",
            "is_business",
            "product_type",
            "is_consumption",
            "segment",
        ]

        if y is not None:
            df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
        else:
            df_features = df_features.to_pandas()

        df_features = df_features.set_index("row_id")
        df_features[cat_cols] = df_features[cat_cols].astype("category")

        return df_features

    def generate_features(self, df_prediction_items):
        if "target" in df_prediction_items.columns:
            df_prediction_items, y = (
                df_prediction_items.drop("target"),
                df_prediction_items.select("target"),
            )
        else:
            y = None

        df_features = df_prediction_items.with_columns(
            pl.col("datetime").cast(pl.Date).alias("date"),
        )

        for add_features in [
            self._add_general_features,
            self._add_client_features,
            self._add_forecast_weather_features,
            #self._add_electricity_and_gas_features,
            self._add_historical_weather_features,
            self._rename_columns,
            self._add_target_features,
            #self._normalize_target_columns,
            self._add_holidays_features,
            #self._add_historical_weather_advanced_features,
            #self._add_forecast_ratio_features,
            #self._reduce_memory_usage,
            #self._select_features,
            self._add_imbalance_features,
            self._reduce_memory_usage,
            self._drop_columns
        ]:
            df_features = add_features(df_features)

        df_features = self._to_pandas(df_features, y)
        if y is not None:
            df_features['normalized_target'] = df_features['target'] / df_features['norm_target']

        return df_features

In [4]:
import datetime
import holidays
from itertools import combinations

class FeaturesGeneratorProductionNormalized:
    def __init__(self, data_storage):
        self.data_storage = data_storage
        self.estonian_holidays = list(holidays.country_holidays('EE', years=range(2021, 2026)).keys())

    def _add_general_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
                pl.col("datetime").dt.hour().alias("hour"),
                pl.col("datetime").dt.day().alias("day"),
                pl.col("datetime").dt.weekday().alias("weekday"),
                pl.col("datetime").dt.month().alias("month"),
                pl.col("datetime").dt.year().alias("year"),
            )
            .with_columns(
                pl.concat_str(
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    separator="_",
                ).alias("segment"),
            )
            .with_columns(
                (np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
                (np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
                (np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
                (np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
            )
        )
        return df_features

    def _add_client_features(self, df_features):
        df_client = self.data_storage.df_client

        df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=2)).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left",
        )
        return df_features

    def _add_forecast_weather_features(self, df_features):
        df_forecast_weather = self.data_storage.df_forecast_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_forecast_weather = (
            df_forecast_weather.rename({"forecast_datetime": "datetime"})
            .filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
            .drop("hours_ahead")
            .with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        
        df_forecast_weather_date = (
            df_forecast_weather.group_by("datetime").mean().drop("county")
        )

        df_forecast_weather_local = (
            df_forecast_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [0, 7 * 24]:
            df_features = df_features.join(
                df_forecast_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_forecast_{hours_lag}h",
            )
            df_features = df_features.join(
                df_forecast_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_forecast_local_{hours_lag}h",
            )

        return df_features
        
    
    def _add_historical_weather_features(self, df_features):
        df_historical_weather = self.data_storage.df_historical_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_historical_weather = (
            df_historical_weather.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )

        df_historical_weather_date = (
            df_historical_weather.group_by("datetime").mean().drop("county")
        )

        df_historical_weather_local = (
            df_historical_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [2 * 24, 7 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )
            df_features = df_features.join(
                df_historical_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_historical_local_{hours_lag}h",
            )

        for hours_lag in [1 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag),
                    pl.col("datetime").dt.hour().alias("hour"),
                )
                .filter(pl.col("hour") <= 10)
                .drop("hour"),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )

        return df_features
    
        
    def _add_forecast_ratio_features(self, df_features):
        u_comp = df_features['10_metre_u_wind_component']
        v_comp = df_features['10_metre_v_wind_component']
        hours_lag = 0
        df_features = df_features.with_columns(
            (np.sqrt(u_comp ** 2 + v_comp ** 2)).alias(f"windspeed_10m_forecast_local_{hours_lag}h"),
        
        )
        angles = np.angle(v_comp.to_numpy()*1j + u_comp.to_numpy(), deg=True)
        df_features = df_features.with_columns(
            pl.Series(angles).alias("winddirection_10m_forecast_local_0h")
        )
    
        for (feature, lag_numerator, lag_denominator) in [('windspeed_10m', 0, 24 * 2),
                                                          ('winddirection_10m', 0, 24 * 2),
                                                         ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    / (pl.col(f"{feature}_historical_local_{lag_denominator}h") + 1e-3)
                ).alias(f"{feature}_forecast_to_hist_ratio_{lag_numerator}_{lag_denominator}")
            )
            '''
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    - (pl.col(f"{feature}_historical_local_{lag_denominator}h"))
                ).alias(f"{feature}_forecast_to_hist_difference_{lag_numerator}_{lag_denominator}")
            )'''
        print('ok')
        return df_features

        

    def _add_target_features(self, df_features):
        df_target = self.data_storage.df_target
        '''
        df_target_all_type_sum = (
            df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
            .sum()
            .drop("product_type")
        )

        df_target_all_county_type_sum = (
            df_target.group_by(["datetime", "is_business", "is_consumption"])
            .sum()
            .drop("product_type", "county")
        )'''
        
        df_client = self.data_storage.df_client

        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
        ]:
            
            df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=int(hours_lag/24))).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left", suffix=f"_{hours_lag}h"
        )
            
            df_features = df_features.join(
                df_target.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_{hours_lag}h"}),
                on=[
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    "datetime",
                ],
                how="left",
            )
            df_features = df_features.with_columns((
            pl.col(f"target_{hours_lag}h") / pl.col(f"installed_capacity_{hours_lag}h")).alias(f"new_target_{hours_lag}h"))
        '''
        for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
            df_features = df_features.join(
                df_target_all_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
                on=["county", "is_business", "is_consumption", "datetime"],
                how="left",
            )

            df_features = df_features.join(
                df_target_all_county_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
                on=["is_business", "is_consumption", "datetime"],
                how="left",
                suffix=f"_all_county_type_sum_{hours_lag}h",
            )'''

        cols_for_stats = [
            f"new_target_{hours_lag}h" for hours_lag in [2 * 24, 3 * 24, 4 * 24, 5 * 24]
        ]
        df_features = df_features.with_columns(
            #df_features.select(cols_for_stats).mean(axis=1).alias(f"target_mean"),
            df_features.select(cols_for_stats).mean_horizontal().alias(f"new_target_mean"),
            df_features.select(cols_for_stats)
            .transpose()
            .std()
            .transpose()
            .to_series()
            .alias(f"new_target_std"),
        )

        for target_prefix, lag_numerator, lag_denominator in [
            ("new_target", 24 * 7, 24 * 14),
            ("new_target", 24 * 2, 24 * 9),
            ("new_target", 24 * 3, 24 * 10),
            ("new_target", 24 * 2, 24 * 3),
            #("target_all_type_sum", 24 * 2, 24 * 3),
            #("target_all_type_sum", 24 * 7, 24 * 14),
            #("target_all_county_type_sum", 24 * 2, 24 * 3),
            #("target_all_county_type_sum", 24 * 7, 24 * 14),
        ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    / (pl.col(f"{target_prefix}_{lag_denominator}h") + 1e-3)
                ).alias(f"{target_prefix}_ratio_{lag_numerator}_{lag_denominator}")
            )
            
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    - (pl.col(f"{target_prefix}_{lag_denominator}h"))
                ).alias(f"{target_prefix}_difference_{lag_numerator}_{lag_denominator}")
            )

        return df_features
    
    def _add_imbalance_features(self, df_features):
        imb_list = [#'installed_capacity',
                      'cloudcover_high',
                      'cloudcover_low',
                      'direct_solar_radiation',
                      'surface_solar_radiation_downwards',
                      'total_precipitation',
                      'direct_solar_radiation_forecast_local_0h',
                      'surface_solar_radiation_downwards_forecast_local_0h',
                      'total_precipitation_forecast_local_0h',
                      'cloudcover_low_forecast_168h',
                      'total_precipitation_forecast_local_168h',
                      'cloudcover_mid_historical_local_48h',
                      'diffuse_radiation_historical_168h',
                      'cloudcover_mid_historical_local_168h',
                      'diffuse_radiation_historical_local_168h',
                      'new_target_48h', 'new_target_168h']
        for feature1 in imb_list:
            for feature2 in imb_list:
                if feature1 != feature2:
                    df_features = df_features.with_columns((pl.col(feature1) / (pl.col(feature2) + 1e-5)).cast(pl.Float32).alias(f"{feature1}_{feature2}_imbalance"))
        return df_features
    
    def _add_holidays_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.when(pl.col("date").is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday"),
                pl.when((pl.col("date") - datetime.timedelta(days=7)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_7d"),
                pl.when((pl.col("date") - datetime.timedelta(days=2)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_2d")
            )
        )
        return df_features
    
    def _select_features(self, df):
        df = df.with_columns(
            (pl.col('installed_capacity') * pl.col('surface_solar_radiation_downwards') / (pl.col('temperature') + 273.15)).alias('hz'))
        selected_features = ['county', 'year', 'month', 'day', 'is_business', 'product_type',
       'weekday', 'segment', 'sin(hour)', 'cos(hour)', 'eic_count','installed_capacity', 'direct_solar_radiation','direct_solar_radiation_forecast_local_0h',
       'surface_solar_radiation_downwards_forecast_local_0h','cloudcover_low_forecast_168h','cloudcover_mid_historical_local_168h','diffuse_radiation_historical_local_168h', 'new_target_72h',
       'new_target_96h', 'new_target_120h', 'new_target_144h', 'new_target_168h','new_target_192h', 'new_target_216h', 'new_target_288h', 'new_target_336h',
       'new_target_mean', 'new_target_std','new_target_difference_168_336', 'new_target_difference_48_216','cloudcover_high_cloudcover_mid_historical_local_48h_imbalance',
       'cloudcover_low_total_precipitation_forecast_local_0h_imbalance','cloudcover_low_new_target_48h_imbalance','direct_solar_radiation_cloudcover_low_imbalance',
       'direct_solar_radiation_total_precipitation_imbalance','direct_solar_radiation_total_precipitation_forecast_local_0h_imbalance','direct_solar_radiation_cloudcover_low_forecast_168h_imbalance',
       'surface_solar_radiation_downwards_cloudcover_high_imbalance','surface_solar_radiation_downwards_cloudcover_low_imbalance',
       'surface_solar_radiation_downwards_total_precipitation_imbalance','surface_solar_radiation_downwards_direct_solar_radiation_forecast_local_0h_imbalance','surface_solar_radiation_downwards_total_precipitation_forecast_local_0h_imbalance',
       'surface_solar_radiation_downwards_new_target_168h_imbalance','total_precipitation_cloudcover_low_imbalance','total_precipitation_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_new_target_48h_imbalance', #'target_all_county_type_sum_336h', 'target_all_county_type_sum_48h', 'target_all_type_sum_168h',
       #'direct_solar_radiation_forecast_local_0h_installed_capacity_imbalance', 'surface_solar_radiation_downwards_installed_capacity_imbalance', 'direct_solar_radiation_installed_capacity_imbalance','cloudcover_low_installed_capacity_imbalance'
       'direct_solar_radiation_forecast_local_0h_cloudcover_low_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_forecast_local_0h_imbalance',
       'direct_solar_radiation_forecast_local_0h_cloudcover_low_forecast_168h_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_forecast_local_168h_imbalance',
       'direct_solar_radiation_forecast_local_0h_new_target_48h_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_forecast_local_0h_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_forecast_local_168h_imbalance',
       #'total_precipitation_forecast_local_0h_installed_capacity_imbalance',
       'total_precipitation_forecast_local_0h_surface_solar_radiation_downwards_forecast_local_0h_imbalance',
       'total_precipitation_forecast_local_0h_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_forecast_local_0h_cloudcover_mid_historical_local_48h_imbalance',
       'total_precipitation_forecast_local_0h_diffuse_radiation_historical_local_168h_imbalance',
       'cloudcover_low_forecast_168h_cloudcover_high_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_forecast_local_0h_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_forecast_local_168h_imbalance',
       'total_precipitation_forecast_local_168h_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_forecast_local_168h_cloudcover_mid_historical_local_168h_imbalance',
       'diffuse_radiation_historical_168h_total_precipitation_imbalance',
       'diffuse_radiation_historical_168h_surface_solar_radiation_downwards_forecast_local_0h_imbalance',
       'diffuse_radiation_historical_168h_new_target_168h_imbalance',
       'diffuse_radiation_historical_local_168h_new_target_48h_imbalance',
       'new_target_48h_cloudcover_low_imbalance',
       'new_target_48h_direct_solar_radiation_imbalance',
       'new_target_48h_surface_solar_radiation_downwards_imbalance',
       'new_target_48h_total_precipitation_imbalance',
       'new_target_48h_total_precipitation_forecast_local_0h_imbalance',
       'new_target_48h_total_precipitation_forecast_local_168h_imbalance',
       'new_target_48h_cloudcover_mid_historical_local_48h_imbalance',
       'new_target_48h_diffuse_radiation_historical_168h_imbalance',
       'new_target_48h_diffuse_radiation_historical_local_168h_imbalance',
       'new_target_48h_new_target_168h_imbalance',
       'new_target_168h_cloudcover_low_imbalance',
       'new_target_168h_surface_solar_radiation_downwards_imbalance',
       'new_target_168h_total_precipitation_imbalance',
       'new_target_168h_total_precipitation_forecast_local_0h_imbalance',
       'new_target_168h_total_precipitation_forecast_local_168h_imbalance',
       'new_target_168h_diffuse_radiation_historical_168h_imbalance', #'installed_capacity_cloudcover_low_imbalance','installed_capacity_direct_solar_radiation_imbalance',
       #'installed_capacity_total_precipitation_imbalance','installed_capacity_direct_solar_radiation_forecast_local_0h_imbalance','installed_capacity_total_precipitation_forecast_local_0h_imbalance',
       #'installed_capacity_total_precipitation_forecast_local_168h_imbalance'
       'new_target_168h_diffuse_radiation_historical_local_168h_imbalance', 'is_consumption', "date",'row_id', 'hz'] #"datetime", "hour", "dayofyear", 'block', 'forecast_date'
        intersection = set(selected_features).intersection(df.columns)
        
        new_df = df[selected_features]
        #new_df = df[set(selected_featu]
        del df
        gc.collect()
        return new_df
        
    def _reduce_memory_usage(self, df_features):
        df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
        return df_features

    def _drop_columns(self, df_features):
        df_features = df_features.drop(
            "date", "datetime", "hour", "dayofyear", 'block', 'forecast_date'
        )
        return df_features

    def _to_pandas(self, df_features, y):
        cat_cols = [
            "county",
            "is_business",
            "product_type",
            "is_consumption",
            "segment",
        ]

        if y is not None:
            df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
        else:
            df_features = df_features.to_pandas()

        df_features = df_features.set_index("row_id")
        df_features[cat_cols] = df_features[cat_cols].astype("category")

        return df_features

    def generate_features(self, df_prediction_items):
        if "target" in df_prediction_items.columns:
            df_prediction_items, y = (
                df_prediction_items.drop("target"),
                df_prediction_items.select("target"),
            )
        else:
            y = None

        df_features = df_prediction_items.with_columns(
            pl.col("datetime").cast(pl.Date).alias("date"),
        )

        for add_features in [
            self._add_general_features,
            self._add_client_features,
            self._add_forecast_weather_features,
            self._add_historical_weather_features,
            self._add_target_features,
            self._add_holidays_features,
            self._add_forecast_ratio_features,
            #self._reduce_memory_usage,
            #self._add_imbalance_features,
            #self._select_features,
            self._reduce_memory_usage,
            self._drop_columns,
        ]:
            df_features = add_features(df_features)

        df_features = self._to_pandas(df_features, y)
        if y is not None:
            df_features['normalized_target'] = df_features['target'] / df_features['installed_capacity']
            df_features= df_features.drop(columns=['target'])
        return df_features

In [5]:
import datetime
import holidays
from itertools import combinations

class FeaturesGeneratorProduction:
    def __init__(self, data_storage):
        self.data_storage = data_storage
        self.estonian_holidays = list(holidays.country_holidays('EE', years=range(2021, 2026)).keys())

    def _add_general_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
                pl.col("datetime").dt.hour().alias("hour"),
                pl.col("datetime").dt.day().alias("day"),
                pl.col("datetime").dt.weekday().alias("weekday"),
                pl.col("datetime").dt.month().alias("month"),
                pl.col("datetime").dt.year().alias("year"),
            )
            .with_columns(
                pl.concat_str(
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    separator="_",
                ).alias("segment"),
            )
            .with_columns(
                (np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
                (np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
                (np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
                (np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
            )
        )
        return df_features

    def _add_client_features(self, df_features):
        df_client = self.data_storage.df_client

        df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=2)).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left",
        )
        return df_features

    def _add_forecast_weather_features(self, df_features):
        df_forecast_weather = self.data_storage.df_forecast_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_forecast_weather = (
            df_forecast_weather.rename({"forecast_datetime": "datetime"})
            .filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
            .drop("hours_ahead")
            .with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        
        df_forecast_weather_date = (
            df_forecast_weather.group_by("datetime").mean().drop("county")
        )

        df_forecast_weather_local = (
            df_forecast_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [0, 7 * 24]:
            df_features = df_features.join(
                df_forecast_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_forecast_{hours_lag}h",
            )
            df_features = df_features.join(
                df_forecast_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_forecast_local_{hours_lag}h",
            )

        return df_features
        
    
    def _add_historical_weather_features(self, df_features):
        df_historical_weather = self.data_storage.df_historical_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_historical_weather = (
            df_historical_weather.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )

        df_historical_weather_date = (
            df_historical_weather.group_by("datetime").mean().drop("county")
        )

        df_historical_weather_local = (
            df_historical_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [2 * 24, 7 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )
            df_features = df_features.join(
                df_historical_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_historical_local_{hours_lag}h",
            )

        for hours_lag in [1 * 24]:
            df_features = df_features.join(
                df_historical_weather_date.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag),
                    pl.col("datetime").dt.hour().alias("hour"),
                )
                .filter(pl.col("hour") <= 10)
                .drop("hour"),
                on="datetime",
                how="left",
                suffix=f"_historical_{hours_lag}h",
            )

        return df_features
    
        
    def _add_forecast_ratio_features(self, df_features):
        u_comp = df_features['10_metre_u_wind_component']
        v_comp = df_features['10_metre_v_wind_component']
        hours_lag = 0
        df_features = df_features.with_columns(
            (np.sqrt(u_comp ** 2 + v_comp ** 2)).alias(f"windspeed_10m_forecast_local_{hours_lag}h"),
        
        )
        angles = np.angle(v_comp.to_numpy()*1j + u_comp.to_numpy(), deg=True)
        df_features = df_features.with_columns(
            pl.Series(angles).alias("winddirection_10m_forecast_local_0h")
        )
    
        for (feature, lag_numerator, lag_denominator) in [('windspeed_10m', 0, 24 * 2),
                                                          ('winddirection_10m', 0, 24 * 2),
                                                         ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    / (pl.col(f"{feature}_historical_local_{lag_denominator}h") + 1e-3)
                ).alias(f"{feature}_forecast_to_hist_ratio_{lag_numerator}_{lag_denominator}")
            )
            '''
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    - (pl.col(f"{feature}_historical_local_{lag_denominator}h"))
                ).alias(f"{feature}_forecast_to_hist_difference_{lag_numerator}_{lag_denominator}")
            )'''
        print('ok')
        return df_features

        

    def _add_target_features(self, df_features):
        df_target = self.data_storage.df_target

        df_target_all_type_sum = (
            df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
            .sum()
            .drop("product_type")
        )

        df_target_all_county_type_sum = (
            df_target.group_by(["datetime", "is_business", "is_consumption"])
            .sum()
            .drop("product_type", "county")
        )

        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
        ]:
            df_features = df_features.join(
                df_target.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_{hours_lag}h"}),
                on=[
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    "datetime",
                ],
                how="left",
            )

        for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
            df_features = df_features.join(
                df_target_all_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
                on=["county", "is_business", "is_consumption", "datetime"],
                how="left",
            )

            df_features = df_features.join(
                df_target_all_county_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
                on=["is_business", "is_consumption", "datetime"],
                how="left",
                suffix=f"_all_county_type_sum_{hours_lag}h",
            )

        cols_for_stats = [
            f"target_{hours_lag}h" for hours_lag in [2 * 24, 3 * 24, 4 * 24, 5 * 24]
        ]
        df_features = df_features.with_columns(
            #df_features.select(cols_for_stats).mean(axis=1).alias(f"target_mean"),
            df_features.select(cols_for_stats).mean_horizontal().alias(f"target_mean"),
            df_features.select(cols_for_stats)
            .transpose()
            .std()
            .transpose()
            .to_series()
            .alias(f"target_std"),
        )

        for target_prefix, lag_numerator, lag_denominator in [
            ("target", 24 * 7, 24 * 14),
            ("target", 24 * 2, 24 * 9),
            ("target", 24 * 3, 24 * 10),
            ("target", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 7, 24 * 14),
            ("target_all_county_type_sum", 24 * 2, 24 * 3),
            ("target_all_county_type_sum", 24 * 7, 24 * 14),
        ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    / (pl.col(f"{target_prefix}_{lag_denominator}h") + 1e-3)
                ).alias(f"{target_prefix}_ratio_{lag_numerator}_{lag_denominator}")
            )
            
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    - (pl.col(f"{target_prefix}_{lag_denominator}h"))
                ).alias(f"{target_prefix}_difference_{lag_numerator}_{lag_denominator}")
            )

        return df_features
    
    def _add_imbalance_features(self, df_features):
        imb_list = ['installed_capacity',
                      'cloudcover_high',
                      'cloudcover_low',
                      'direct_solar_radiation',
                      'surface_solar_radiation_downwards',
                      'total_precipitation',
                      'direct_solar_radiation_forecast_local_0h',
                      'surface_solar_radiation_downwards_forecast_local_0h',
                      'total_precipitation_forecast_local_0h',
                      'cloudcover_low_forecast_168h',
                      'total_precipitation_forecast_local_168h',
                      'cloudcover_mid_historical_local_48h',
                      'diffuse_radiation_historical_168h',
                      'cloudcover_mid_historical_local_168h',
                      'diffuse_radiation_historical_local_168h',
                      'target_48h', 'target_168h']
        for feature1 in imb_list:
            for feature2 in imb_list:
                if feature1 != feature2:
                    df_features = df_features.with_columns((pl.col(feature1) / (pl.col(feature2) + 1e-5)).cast(pl.Float32).alias(f"{feature1}_{feature2}_imbalance"))
        return df_features
    
    def _add_holidays_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.when(pl.col("date").is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday"),
                pl.when((pl.col("date") - datetime.timedelta(days=7)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_7d"),
                pl.when((pl.col("date") - datetime.timedelta(days=2)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_2d")
            )
        )
        return df_features
    
    def _select_features(self, df):
        selected_features = ['county', 'year', 'month', 'day', 'is_business', 'product_type',
       'weekday', 'segment', 'sin(hour)', 'cos(hour)', 'eic_count','installed_capacity', 'direct_solar_radiation','direct_solar_radiation_forecast_local_0h',
       'surface_solar_radiation_downwards_forecast_local_0h','cloudcover_low_forecast_168h','cloudcover_mid_historical_local_168h','diffuse_radiation_historical_local_168h', 'target_72h',
       'target_96h', 'target_120h', 'target_144h', 'target_168h','target_192h', 'target_216h', 'target_288h', 'target_336h','target_all_county_type_sum_48h', 'target_all_type_sum_168h',
       'target_all_county_type_sum_336h', 'target_mean', 'target_std','target_difference_168_336', 'target_difference_48_216','installed_capacity_cloudcover_low_imbalance','installed_capacity_direct_solar_radiation_imbalance',
       'installed_capacity_total_precipitation_imbalance','installed_capacity_direct_solar_radiation_forecast_local_0h_imbalance','installed_capacity_total_precipitation_forecast_local_0h_imbalance',
       'installed_capacity_total_precipitation_forecast_local_168h_imbalance','cloudcover_high_cloudcover_mid_historical_local_48h_imbalance','cloudcover_low_installed_capacity_imbalance',
       'cloudcover_low_total_precipitation_forecast_local_0h_imbalance','cloudcover_low_target_48h_imbalance','direct_solar_radiation_installed_capacity_imbalance','direct_solar_radiation_cloudcover_low_imbalance',
       'direct_solar_radiation_total_precipitation_imbalance','direct_solar_radiation_total_precipitation_forecast_local_0h_imbalance','direct_solar_radiation_cloudcover_low_forecast_168h_imbalance',
       'surface_solar_radiation_downwards_installed_capacity_imbalance','surface_solar_radiation_downwards_cloudcover_high_imbalance','surface_solar_radiation_downwards_cloudcover_low_imbalance',
       'surface_solar_radiation_downwards_total_precipitation_imbalance','surface_solar_radiation_downwards_direct_solar_radiation_forecast_local_0h_imbalance','surface_solar_radiation_downwards_total_precipitation_forecast_local_0h_imbalance',
       'surface_solar_radiation_downwards_target_168h_imbalance','total_precipitation_cloudcover_low_imbalance','total_precipitation_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_target_48h_imbalance',
       'direct_solar_radiation_forecast_local_0h_installed_capacity_imbalance',
       'direct_solar_radiation_forecast_local_0h_cloudcover_low_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_forecast_local_0h_imbalance',
       'direct_solar_radiation_forecast_local_0h_cloudcover_low_forecast_168h_imbalance',
       'direct_solar_radiation_forecast_local_0h_total_precipitation_forecast_local_168h_imbalance',
       'direct_solar_radiation_forecast_local_0h_target_48h_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_forecast_local_0h_imbalance',
       'surface_solar_radiation_downwards_forecast_local_0h_total_precipitation_forecast_local_168h_imbalance',
       'total_precipitation_forecast_local_0h_installed_capacity_imbalance',
       'total_precipitation_forecast_local_0h_surface_solar_radiation_downwards_forecast_local_0h_imbalance',
       'total_precipitation_forecast_local_0h_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_forecast_local_0h_cloudcover_mid_historical_local_48h_imbalance',
       'total_precipitation_forecast_local_0h_diffuse_radiation_historical_local_168h_imbalance',
       'cloudcover_low_forecast_168h_cloudcover_high_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_forecast_local_0h_imbalance',
       'cloudcover_low_forecast_168h_total_precipitation_forecast_local_168h_imbalance',
       'total_precipitation_forecast_local_168h_cloudcover_low_forecast_168h_imbalance',
       'total_precipitation_forecast_local_168h_cloudcover_mid_historical_local_168h_imbalance',
       'diffuse_radiation_historical_168h_total_precipitation_imbalance',
       'diffuse_radiation_historical_168h_surface_solar_radiation_downwards_forecast_local_0h_imbalance',
       'diffuse_radiation_historical_168h_target_168h_imbalance',
       'diffuse_radiation_historical_local_168h_target_48h_imbalance',
       'target_48h_cloudcover_low_imbalance',
       'target_48h_direct_solar_radiation_imbalance',
       'target_48h_surface_solar_radiation_downwards_imbalance',
       'target_48h_total_precipitation_imbalance',
       'target_48h_total_precipitation_forecast_local_0h_imbalance',
       'target_48h_total_precipitation_forecast_local_168h_imbalance',
       'target_48h_cloudcover_mid_historical_local_48h_imbalance',
       'target_48h_diffuse_radiation_historical_168h_imbalance',
       'target_48h_diffuse_radiation_historical_local_168h_imbalance',
       'target_48h_target_168h_imbalance',
       'target_168h_cloudcover_low_imbalance',
       'target_168h_surface_solar_radiation_downwards_imbalance',
       'target_168h_total_precipitation_imbalance',
       'target_168h_total_precipitation_forecast_local_0h_imbalance',
       'target_168h_total_precipitation_forecast_local_168h_imbalance',
       'target_168h_diffuse_radiation_historical_168h_imbalance',
       'target_168h_diffuse_radiation_historical_local_168h_imbalance', 'is_consumption', "date",'row_id'] #"datetime", "hour", "dayofyear", 'block', 'forecast_date'
        intersection = set(selected_features).intersection(df.columns)
        
        new_df = df[selected_features]
        #new_df = df[set(selected_featu]
        del df
        gc.collect()
        return new_df
        
    def _reduce_memory_usage(self, df_features):
        df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
        return df_features

    def _drop_columns(self, df_features):
        df_features = df_features.drop(
            "date", "datetime", "hour", "dayofyear", 'block', 'forecast_date'
        )
        return df_features

    def _to_pandas(self, df_features, y):
        cat_cols = [
            "county",
            "is_business",
            "product_type",
            "is_consumption",
            "segment",
        ]

        if y is not None:
            df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
        else:
            df_features = df_features.to_pandas()

        df_features = df_features.set_index("row_id")
        df_features[cat_cols] = df_features[cat_cols].astype("category")

        return df_features

    def generate_features(self, df_prediction_items):
        if "target" in df_prediction_items.columns:
            df_prediction_items, y = (
                df_prediction_items.drop("target"),
                df_prediction_items.select("target"),
            )
        else:
            y = None

        df_features = df_prediction_items.with_columns(
            pl.col("datetime").cast(pl.Date).alias("date"),
        )

        for add_features in [
            self._add_general_features,
            self._add_client_features,
            self._add_forecast_weather_features,
            self._add_historical_weather_features,
            self._add_target_features,
            self._add_holidays_features,
            self._add_forecast_ratio_features,
            self._reduce_memory_usage,
            #self._add_imbalance_features,
            #self._select_features,
            #self._reduce_memory_usage,
            self._drop_columns,
        ]:
            df_features = add_features(df_features)

        df_features = self._to_pandas(df_features, y)

        return df_features

In [6]:
import holidays
class FeaturesGeneratorConsumption:
    def __init__(self, data_storage):
        self.data_storage = data_storage
        self.estonian_holidays = list(holidays.country_holidays('EE', years=range(2021, 2026)).keys())
        
    def _add_general_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.col("datetime").dt.ordinal_day().alias("dayofyear"),
                pl.col("datetime").dt.hour().alias("hour"),
                pl.col("datetime").dt.day().alias("day"),
                pl.col("datetime").dt.weekday().alias("weekday"),
                pl.col("datetime").dt.month().alias("month"),
                pl.col("datetime").dt.year().alias("year"),
            )
            .with_columns(
                pl.concat_str(
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    separator="_",
                ).alias("segment"),
            )
            .with_columns(
                (np.pi * pl.col("dayofyear") / 183).sin().alias("sin(dayofyear)"),
                (np.pi * pl.col("dayofyear") / 183).cos().alias("cos(dayofyear)"),
                (np.pi * pl.col("hour") / 12).sin().alias("sin(hour)"),
                (np.pi * pl.col("hour") / 12).cos().alias("cos(hour)"),
            )
        )
        return df_features
    def _add_client_features(self, df_features):
        df_client = self.data_storage.df_client

        df_features = df_features.join(
            df_client.with_columns(
                (pl.col("date") + pl.duration(days=2)).cast(pl.Date)
            ),
            on=["county", "is_business", "product_type", "date"],
            how="left",
        )
        return df_features
        
    def _add_forecast_weather_features(self, df_features):
        df_forecast_weather = self.data_storage.df_forecast_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )
        df_forecast_weather = (
            df_forecast_weather.rename({"forecast_datetime": "datetime"})
            .filter((pl.col("hours_ahead") >= 22) & pl.col("hours_ahead") <= 45)
            .drop("hours_ahead")
            .with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        df_forecast_weather_local = (
            df_forecast_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )

        for hours_lag in [0, 7 * 24]:
            df_forecast_weather_local = df_forecast_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
            )
            df_features = df_features.join( #unique list?
                df_forecast_weather_local,
                on=["county", "datetime"],
                how="left",
                suffix=f"_forecast_local_{hours_lag}h",
            )

        return df_features
    
    def _add_historical_weather_features(self, df_features):
        df_historical_weather = self.data_storage.df_historical_weather
        df_weather_station_to_county_mapping = (
            self.data_storage.df_weather_station_to_county_mapping
        )

        df_historical_weather = (
            df_historical_weather.with_columns(
                pl.col("latitude").cast(pl.datatypes.Float32),
                pl.col("longitude").cast(pl.datatypes.Float32),
            )
            .join(
                df_weather_station_to_county_mapping,
                how="left",
                on=["longitude", "latitude"],
            )
            .drop("longitude", "latitude")
        )
        
        df_historical_weather_local = (
            df_historical_weather.filter(pl.col("county").is_not_null())
            .group_by("county", "datetime")
            .mean()
        )
        
        df_historical_weather_local = df_historical_weather_local.with_columns(
            (pl.col("rain") / 1000 + pl.col("snowfall") / 100).alias('total_precipitation'))

        for hours_lag in [2 * 24, 7 * 24]:
            df_features = df_features.join(
                df_historical_weather_local.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ),
                on=["county", "datetime"],
                how="left",
                suffix=f"_historical_local_{hours_lag}h",
            )
        return df_features
    
    def _add_forecast_ratio_features(self, df_features):
        u_comp = df_features['10_metre_u_wind_component_forecast_local_0h']
        v_comp = df_features['10_metre_v_wind_component_forecast_local_0h']
        hours_lag = 0
        df_features = df_features.with_columns(
            (np.sqrt(u_comp ** 2 + v_comp ** 2)).alias(f"windspeed_10m_forecast_local_{hours_lag}h"),
        
        )
        angles = np.angle(v_comp.to_numpy()*1j + u_comp.to_numpy(), deg=True)
        df_features = df_features.with_columns(
            pl.Series(angles).alias("winddirection_10m_forecast_local_0h")
        )
    
        for (feature, lag_numerator, lag_denominator) in [('windspeed_10m', 0, 24 * 2),
                                                          ('winddirection_10m', 0, 24 * 2),
                                                         ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    / (pl.col(f"{feature}_historical_local_{lag_denominator}h") + 1e-3)
                ).alias(f"{feature}_forecast_to_hist_ratio_{lag_numerator}_{lag_denominator}")
            )
            '''
            df_features = df_features.with_columns(
                (
                    pl.col(f"{feature}_forecast_local_{lag_numerator}h")
                    - (pl.col(f"{feature}_historical_local_{lag_denominator}h"))
                ).alias(f"{feature}_forecast_to_hist_difference_{lag_numerator}_{lag_denominator}")
            )'''
        return df_features
    
    def _rename_columns(self, df_features):
        for_list = ['temperature', 'dewpoint',
       'cloudcover_high', 'cloudcover_low', 'cloudcover_mid',
       'cloudcover_total', '10_metre_u_wind_component',
       '10_metre_v_wind_component', 'direct_solar_radiation',
       'surface_solar_radiation_downwards', 'snowfall', 'total_precipitation']
        
        hist_list = ['rain', 'surface_pressure', 'windspeed_10m',
                     'winddirection_10m', 'shortwave_radiation', 'diffuse_radiation']
        
        for col in for_list:
            df_features = df_features.rename({col: f"{col}_forecast_local_0h"})
            
        for col in hist_list:
            df_features = df_features.rename({col: f"{col}_historical_local_48h"})
        return df_features

        
    def _add_target_features(self, df_features):
        df_target = self.data_storage.df_target

        df_target_all_type_sum = (
            df_target.group_by(["datetime", "county", "is_business", "is_consumption"])
            .sum()
            .drop("product_type")
        )

        df_target_all_county_type_sum = (
            df_target.group_by(["datetime", "is_business", "is_consumption"])
            .sum()
            .drop("product_type", "county")
        )

        for hours_lag in [
            2 * 24,
            3 * 24,
            4 * 24,
            5 * 24,
            6 * 24,
            7 * 24,
            8 * 24,
            9 * 24,
            10 * 24,
            11 * 24,
            12 * 24,
            13 * 24,
            14 * 24,
            365*24, 367*24, 372*24
        ]:
            df_features = df_features.join(
                df_target.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_{hours_lag}h"}),
                on=[
                    "county",
                    "is_business",
                    "product_type",
                    "is_consumption",
                    "datetime",
                ],
                how="left",
            )

        for hours_lag in [2 * 24, 3 * 24, 7 * 24, 14 * 24]:
            df_features = df_features.join(
                df_target_all_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_type_sum_{hours_lag}h"}),
                on=["county", "is_business", "is_consumption", "datetime"],
                how="left",
            )

            df_features = df_features.join(
                df_target_all_county_type_sum.with_columns(
                    pl.col("datetime") + pl.duration(hours=hours_lag)
                ).rename({"target": f"target_all_county_type_sum_{hours_lag}h"}),
                on=["is_business", "is_consumption", "datetime"],
                how="left",
                suffix=f"_all_county_type_sum_{hours_lag}h",
            )

        cols_for_stats = [
            f"target_{hours_lag}h" for hours_lag in [2 * 24, 3 * 24, 4 * 24, 5 * 24]
        ]
        df_features = df_features.with_columns(
            df_features.select(cols_for_stats).mean(axis=1).alias(f"target_mean"),
            df_features.select(cols_for_stats)
            .transpose()
            .std()
            .transpose()
            .to_series()
            .alias(f"target_std"),
        )

        for target_prefix, lag_numerator, lag_denominator in [
            ("target", 24 * 7, 24 * 14),
            ("target", 24 * 2, 24 * 9),
            ("target", 24 * 3, 24 * 10),
            ("target", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 2, 24 * 3),
            ("target_all_type_sum", 24 * 7, 24 * 14),
            ("target_all_county_type_sum", 24 * 2, 24 * 3),
            ("target_all_county_type_sum", 24 * 7, 24 * 14),
        ]:
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    / (pl.col(f"{target_prefix}_{lag_denominator}h") + 1e-3)
                ).alias(f"{target_prefix}_ratio_{lag_numerator}_{lag_denominator}")
            )
            
            df_features = df_features.with_columns(
                (
                    pl.col(f"{target_prefix}_{lag_numerator}h")
                    - (pl.col(f"{target_prefix}_{lag_denominator}h"))
                ).alias(f"{target_prefix}_difference_{lag_numerator}_{lag_denominator}")
            )
            
        df_features = df_features.with_columns(
            (pl.col("target_48h") / pl.col("eic_count")).alias("target_eic_ratio_48"),
            (pl.col("target_168h") / pl.col("eic_count")).alias("target_eic_ratio_168")
        )

        return df_features
    
    def _add_holidays_features(self, df_features):
        df_features = (
            df_features.with_columns(
                pl.when(pl.col("date").is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday"),
                pl.when((pl.col("date") - datetime.timedelta(days=7)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_7d"),
                pl.when((pl.col("date") - datetime.timedelta(days=2)).is_in(self.estonian_holidays)).then(1).otherwise(0).alias("country_holiday_2d")
            )
        )
        return df_features

    def _reduce_memory_usage(self, df_features):
        df_features = df_features.with_columns(pl.col(pl.Float64).cast(pl.Float32))
        return df_features
    
    def _drop_columns(self, df_features):
        df_features = df_features.drop(
            "date", "datetime", "hour", "dayofyear", 'block', 'forecast_date'
        )
        return df_features
    
    
    def _to_pandas(self, df_features, y):
        cat_cols = [
            "county",
            "is_business",
            "product_type",
            "is_consumption",
            "segment",
        ]

        if y is not None:
            df_features = pd.concat([df_features.to_pandas(), y.to_pandas()], axis=1)
        else:
            df_features = df_features.to_pandas()

        df_features = df_features.set_index("row_id")
        df_features[cat_cols] = df_features[cat_cols].astype("category")

        return df_features

    def generate_features(self, df_prediction_items):
        if "target" in df_prediction_items.columns:
            df_prediction_items, y = (
                df_prediction_items.drop("target"),
                df_prediction_items.select("target"),
            )
        else:
            y = None

        df_features = df_prediction_items.with_columns(
            pl.col("datetime").cast(pl.Date).alias("date"),
        )

        for add_features in [
            self._add_general_features,
            self._add_client_features,
            self._add_forecast_weather_features,
            self._add_historical_weather_features,
            self._rename_columns,
            self._add_forecast_ratio_features,
            self._add_target_features,
            self._add_holidays_features,
            self._reduce_memory_usage,
            self._drop_columns
        ]:
            df_features = add_features(df_features)

        df_features = self._to_pandas(df_features, y)
        #df_features = self._add_holidays_as_binary_features(df_features)

        return df_features

In [7]:
cons_storage = DataStorage()
consumption_mask = cons_storage.df_data.select(pl.col('is_consumption')).cast(pl.Boolean).to_numpy().flatten()
cons_storage.df_data = cons_storage.df_data.filter(consumption_mask)
prod_storage = DataStorage()
prod_storage.df_data = prod_storage.df_data.filter(~consumption_mask)

features_generator_prod = FeaturesGeneratorProduction(data_storage=prod_storage)
features_generator_prod_normalized = FeaturesGeneratorProductionNormalized(data_storage=prod_storage)
features_generator_cons = FeaturesGeneratorConsumption(data_storage=cons_storage)
features_generator_cons_normalized = FeaturesGeneratorConsumptionNormalized(data_storage=cons_storage)
df_train_features_prod = features_generator_prod.generate_features(prod_storage.df_data)
df_train_features_cons = features_generator_cons.generate_features(cons_storage.df_data)
df_train_features_prod_normalized = features_generator_prod_normalized.generate_features(prod_storage.df_data)
df_train_features_cons_normalized = features_generator_cons_normalized.generate_features(cons_storage.df_data)

df_train_features_prod = df_train_features_prod[df_train_features_prod['target'].notnull()]
df_train_features_cons = df_train_features_cons[df_train_features_cons['target'].notnull()]
df_train_features_prod_normalized = df_train_features_prod_normalized[df_train_features_prod_normalized['normalized_target'].notnull()]
df_train_features_cons_normalized = df_train_features_cons_normalized[df_train_features_cons_normalized['normalized_target'].notnull()]

ok
ok


In [8]:
from sklearn.ensemble import VotingRegressor
from lightgbm import LGBMRegressor
class ProdNormalizedModel:
    def __init__(self, random_state=0):
        self.model_parameters = {
            "n_estimators": 3000,
            "learning_rate": 0.01,
            "colsample_bytree": 0.9,
            "colsample_bynode": 0.6,
            #'use_best_model': True,
            #"lambda": 3,
            "max_depth": 9,
            "num_leaves": 143,
            "min_data_in_leaf": 55,
            "objective": "mape",
            "device": "gpu",
            "metric": "mape",
            #"early_stopping_round": 200,
            #"random_state" : random_state
        }
        #self.model_production = lgb.LGBMRegressor(**self.model_parameters, random_state=0)
        
        self.model_production = VotingRegressor(
            [
                (
                    f"prod_lgb_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(10)
            ])

    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_production.fit(
                X=df_train.drop(columns=['normalized_target', "is_consumption", "segment"]),
                y=df_train["normalized_target"], eval_set = (df_val.drop(columns=['normalized_target', "is_consumption", "segment"]), df_val["normalized_target"]))
            
        else:
            self.model_production.fit(
                X=df_train.drop(columns=['normalized_target', "is_consumption", "segment"]),
                y=df_train["normalized_target"]
            )
            

    def predict(self, df_features):
        predictions = self.model_production.predict(df_features.drop(columns=['is_consumption', 'segment'])).clip(0)
        return predictions

In [9]:
from lightgbm import LGBMRegressor
class ProdModel:
    def __init__(self, random_state=0):
        self.model_parameters = {
            "n_estimators": 5000,
            "learning_rate": 0.03,
            "colsample_bytree": 0.9,
            "colsample_bynode": 0.6,
            #'use_best_model': True,
            #"lambda": 3,
            "max_depth": 9,
            "num_leaves": 143,
            "min_data_in_leaf": 55,
            "objective": "mae",
            "device": "gpu",
            "metric": "mae",
            #"early_stopping_round": 200,
            #"random_state" : random_state
        }

        self.model_production = VotingRegressor(
            [
                (
                    f"prod_lgb_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(3) #10
            ])

    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_production.fit(
                X=df_train.drop(columns=['target', "is_consumption", "segment"]),
                y=df_train["target"])#, eval_set = (df_val.drop(columns=['target', "is_consumption", "segment"]), df_val["target"])
            
        else:
            self.model_production.fit(
                X=df_train.drop(columns=['target', "is_consumption", "segment"]),
                y=df_train["target"]
            )
            

    def predict(self, df_features):
        predictions = self.model_production.predict(df_features.drop(columns=['is_consumption', 'segment'])).clip(0)
        return predictions

In [10]:
class ConsBusiModel:
    def __init__(self, random_state=0):
        self.model_parameters = {
            "n_estimators": 3000,
            "learning_rate": 0.02,
            "colsample_bytree": 0.9,
            "colsample_bynode": 0.6,
            #'use_best_model': True,
            #"lambda": 3,
            "max_depth": 8,
            "num_leaves": 291,
            "min_data_in_leaf": 131,
            "objective": "mae",
            "device": "gpu",
            "metric": "mae",
            #'early_stopping_rounds': 200, 
            #"random_state" : random_state
        }
        self.feats = ['county', 'day', 'weekday', 'month', 'sin(dayofyear)',
       'cos(dayofyear)', 'sin(hour)', 'cos(hour)', 'eic_count',
       'installed_capacity', 'temperature_forecast_local_0h',
       'dewpoint_forecast_local_0h', 'cloudcover_low_forecast_local_0h',
       '10_metre_u_wind_component_forecast_local_0h',
       '10_metre_v_wind_component_forecast_local_0h',
       'direct_solar_radiation_forecast_local_0h',
       'surface_solar_radiation_downwards_forecast_local_0h',
       'snowfall_forecast_local_0h',
       'total_precipitation_forecast_local_0h',
       'temperature_forecast_local_168h', 'dewpoint_forecast_local_168h',
       'direct_solar_radiation_forecast_local_168h',
       'surface_solar_radiation_downwards_forecast_local_168h',
       'temperature_historical_local_48h',
       'dewpoint_historical_local_48h',
       'surface_pressure_historical_local_48h',
       'winddirection_10m_historical_local_48h',
       'shortwave_radiation_historical_local_48h',
       'direct_solar_radiation_historical_local_48h',
       'temperature_historical_local_168h',
       'dewpoint_historical_local_168h',
       'surface_pressure_historical_local_168h',
       'cloudcover_low_historical_local_168h',
       'winddirection_10m_historical_local_168h',
       'shortwave_radiation_historical_local_168h',
       'direct_solar_radiation_historical_local_168h',
       'diffuse_radiation_historical_local_168h', 'target_48h',
       'target_72h', 'target_96h', 'target_120h', 'target_144h',
       'target_168h', 'target_192h', 'target_216h', 'target_240h',
       'target_264h', 'target_288h', 'target_312h', 'target_336h',
       'target_8760h', 'target_8928h', 'target_all_county_type_sum_48h',
       'target_all_type_sum_72h', 'target_all_county_type_sum_72h',
       'target_all_county_type_sum_168h', 'target_all_type_sum_336h',
       'target_all_county_type_sum_336h', 'target_mean', 'target_std',
       'target_ratio_168_336', 'target_difference_168_336',
       'target_ratio_48_216', 'target_difference_48_216',
       'target_ratio_72_240', 'target_difference_72_240',
       'target_difference_48_72',
       'target_all_county_type_sum_ratio_168_336',
       'target_all_county_type_sum_difference_168_336',
       'target_eic_ratio_48', 'target_eic_ratio_168',
       'country_holiday', 'country_holiday_2d', 'country_holiday_7d']
        self.model_cons_busi = VotingRegressor(
            [
                (
                    f"cons_busi_lgb_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(3)
            ])

    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_cons_busi.fit(
                X=df_train[self.feats],
                y=df_train["target"])#, eval_set = (df_val[self.feats], df_val["target"])
            
        else:
            self.model_cons_busi.fit(
                X=df_train[self.feats],
                y=df_train["target"]
            )

    def predict(self, df_features):
        predictions = self.model_cons_busi.predict(df_features[self.feats]).clip(0)
        return predictions

In [11]:
class ConsNoBusiModel:
    def __init__(self,random_state=0):
        self.model_parameters = {
            "n_estimators": 3000,
            "learning_rate": 0.06,
            "colsample_bytree": 0.9,
            "colsample_bynode": 0.6,
            #'use_best_model': True,
            #"lambda": 3,
            "max_depth": 10,
            "num_leaves": 71,
            "min_data_in_leaf": 200,
            "objective": "mae",
            "device": "gpu",
            "metric": "mae",
            #'early_stopping_rounds': 200, 
            #"random_state" : random_state
        }
        self.model_cons_no_busi = VotingRegressor(
            [
                (
                    f"cons_no_busi_{i}",
                    lgb.LGBMRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(3)
            ])

    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_cons_no_busi.fit(
                X=df_train.drop(columns=['target', 'is_consumption', 'is_business']),
                y=df_train["target"])#, eval_set = (df_val.drop(columns=['target', 'is_consumption', 'is_business']), df_val["target"])
            
        else:
            self.model_cons_no_busi.fit(
                X=df_train.drop(columns=['target', 'is_consumption', 'is_business']),
                y=df_train["target"]
            )

    def predict(self, df_features):
        predictions = self.model_cons_no_busi.predict(df_features.drop(columns=['is_consumption', 'is_business'])).clip(0)
        return predictions

In [12]:
from catboost import CatBoostRegressor
class ConsBusiNormalizedModel:
    def __init__(self):
        cat_cols = ['county', 'is_business', 'product_type', 'is_consumption']
            
        self.model_parameters = {
            "n_estimators": 5000,
            "learning_rate": 0.01,#4,
            #"colsample_bytree": 0.9,
            #"colsample_bynode": 0.6,
            "max_depth": 7,
            "random_strength": 1.2,
            "cat_features": cat_cols,
            #"num_leaves": 71,
            #"min_data_in_leaf": 200,
            "loss_function": "MAE",
            "task_type": "GPU",
            "eval_metric": "MAE",
            #"random_state" : 7
        }


        #self.model_consumption = LGBMRegressor(**self.model_parameters)
        self.model_consumption = VotingRegressor(
            [
                (
                    f"cons_busi_lgb_{i}",
                    CatBoostRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(10)
            ])
    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_consumption.fit(
                X=df_train.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']),
                y=df_train["normalized_target"], eval_set = (df_val.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']), df_val["normalized_target"])
            )
        else:
            self.model_consumption.fit(
                X=df_train.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']),
                y=df_train["normalized_target"]
            )

    def predict(self, df_features):
        predictions = np.zeros(len(df_features))

        predictions = self.model_consumption.predict(
            df_features.drop(columns=['segment', 'eic_count', 'year', 'norm_target'])
        ).clip(0)

        return predictions

In [13]:
from catboost import CatBoostRegressor
class ConsNoBusiNormalizedModel:
    def __init__(self):
        cat_cols = ['county', 'is_business', 'product_type', 'is_consumption']
            
        self.model_parameters = {
            "n_estimators": 5000,
            "learning_rate": 0.01,#4,
            #"colsample_bytree": 0.9,
            #"colsample_bynode": 0.6,
            "max_depth": 3,
            "random_strength":3,
            "cat_features": cat_cols,
            #"num_leaves": 71,
            #"min_data_in_leaf": 200,
            "loss_function": "MAE",
            "task_type": "GPU",
            "eval_metric": "MAE",
            #"random_state" : 7
        }


        #self.model_consumption = LGBMRegressor(**self.model_parameters)
        self.model_consumption = VotingRegressor(
            [
                (
                    f"cons_busi_lgb_{i}",
                    CatBoostRegressor(**self.model_parameters, random_state=i),
                )
                for i in range(10)
            ])
    def fit(self, df_train, df_val=None):
        if df_val is not None:
            self.model_consumption.fit(
                X=df_train.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']),
                y=df_train["normalized_target"], eval_set = (df_val.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']), df_val["normalized_target"])
            )
        else:
            self.model_consumption.fit(
                X=df_train.drop(columns=['normalized_target', 'target', 'segment', 'eic_count', 'year', 'norm_target']),
                y=df_train["normalized_target"]
            )

    def predict(self, df_features):
        predictions = np.zeros(len(df_features))

        predictions = self.model_consumption.predict(
            df_features.drop(columns=['segment', 'eic_count', 'year', 'norm_target'])
        ).clip(0)

        return predictions

In [14]:
def train_online(df, mode):
    if mode == 'prod':
        prod_mask = df['is_consumption'] == 0
        train_df = df[prod_mask]
    elif mode == 'no_busi':
        no_busi_mask = (df['is_business'] == 0) & (df['is_consumption'] == 1)
        train_df = df[no_busi_mask]
    elif mode == 'busi':
        busi_mask = (df['is_business'] == 1) & (df['is_consumption'] == 1)
        train_df = df[busi_mask]
    return train_df

In [15]:
import enefit

env = enefit.make_env()
iter_test = env.iter_test()
'''
year = df.year
month = df.month
month = df.day
VAL_MASK = ((year == 2023)& (month == 5) & (day > 27))

check_prod = df_train_features_prod[VAL_MASK]
check_cons = df_train_features_cons[VAL_MASK]
all_df = np.concat(check_prod, check_cons)
'''
targets = pd.read_csv('/kaggle/input/predict-energy-behavior-of-prosumers/train.csv')
targets = targets[pd.to_datetime(targets['datetime']) >= pd.to_datetime("2023-05-28")]
#targets=targets.dropna()
len(targets)

12480

In [16]:
counter = 1147
preds_all = np.array([])
import time
start = time.time()
for (
    df_test, 
    df_new_target, 
    df_new_client, 
    df_new_historical_weather,
    df_new_forecast_weather, 
    df_new_electricity_prices, 
    df_new_gas_prices,
    df_sample_prediction
) in iter_test:
    start = time.time()
    cons_storage.update_with_new_data(
            df_new_client=df_new_client,
            df_new_forecast_weather=df_new_forecast_weather,
            df_new_historical_weather=df_new_historical_weather,
            df_new_target=df_new_target[df_new_target['is_consumption'] == 1]
    )
    
    prod_storage.update_with_new_data(
            df_new_client=df_new_client,
            df_new_forecast_weather=df_new_forecast_weather,
            df_new_historical_weather=df_new_historical_weather,
            df_new_target=df_new_target[df_new_target['is_consumption'] == 0]
    )
    
    print(time.time() - start)
    if df_test['currently_scored'].all() == 0:
        df_sample_prediction["target"] = 0
        env.predict(df_sample_prediction)
        continue
        
    consumption_mask = (df_test['is_consumption'] == 1)
    global_busi_mask = (df_test['is_consumption'] == 1) & (df_test['is_business'] == 1)
    global_no_busi_mask = (df_test['is_consumption'] == 1) & (df_test['is_business'] == 0)
    
    if counter == 1147:
        counter = 0
        
        df_train_features_prod = features_generator_prod.generate_features(prod_storage.df_data)
        df_train_features_cons = features_generator_cons.generate_features(cons_storage.df_data)
        df_train_features_prod = df_train_features_prod[df_train_features_prod['target'].notnull()]
        df_train_features_cons = df_train_features_cons[df_train_features_cons['target'].notnull()]
        df_train_features_prod_normalized = features_generator_prod_normalized.generate_features(prod_storage.df_data)
        df_train_features_prod_normalized = df_train_features_prod_normalized[df_train_features_prod_normalized['normalized_target'].notnull()]
        df_train_features_cons_normalized = features_generator_cons_normalized.generate_features(cons_storage.df_data)
        df_train_features_cons_normalized = df_train_features_cons_normalized[df_train_features_cons_normalized['normalized_target'].notnull()]
        
        train_df = train_online(df_train_features_prod, 'prod')
        prod_model = ProdModel()
        prod_model.fit(train_df)
        
        train_df = train_online(df_train_features_prod_normalized, 'prod')
        prod_normalized_model = ProdNormalizedModel()
        prod_normalized_model.fit(train_df)

        train_df = train_online(df_train_features_cons, 'busi')
        cons_busi_model = ConsBusiModel()
        cons_busi_model.fit(train_df)
        
        train_df = train_online(df_train_features_cons_normalized, 'busi')
        cons_busi_normalized_model = ConsBusiNormalizedModel()
        cons_busi_normalized_model.fit(train_df)

        train_df = train_online(df_train_features_cons, 'no_busi')
        cons_no_busi_model = ConsNoBusiModel()
        cons_no_busi_model.fit(train_df)
        
        train_df = train_online(df_train_features_cons_normalized, 'no_busi')
        cons_no_busi_normalized_model = ConsNoBusiNormalizedModel()
        cons_no_busi_normalized_model.fit(train_df)

    df_test_prod = prod_storage.preprocess_test(df_test[~consumption_mask])
    df_test_prod_normalized = prod_storage.preprocess_test(df_test[~consumption_mask])
    df_test_cons = cons_storage.preprocess_test(df_test[consumption_mask])
    df_test_cons_normalized = cons_storage.preprocess_test(df_test[consumption_mask])

    df_test_features_cons = features_generator_cons.generate_features(df_test_cons)
    df_test_features_prod = features_generator_prod.generate_features(df_test_prod)
    df_test_features_prod_normalized = features_generator_prod_normalized.generate_features(df_test_prod_normalized)
    df_test_features_cons_normalized = features_generator_cons_normalized.generate_features(df_test_cons_normalized)
    
    print(time.time() - start)
    busi_mask = (df_test_features_cons_normalized['is_business'] == 1)
    prod_normalized_preds = prod_normalized_model.predict(df_test_features_prod_normalized)
    prod_normalized_preds = prod_normalized_preds * df_test_features_prod_normalized['installed_capacity']
    nans = prod_normalized_preds.isna()
    if nans.sum() > 0:
        prod_normalized_preds[nans] = prod_model.predict(df_test_features_prod[nans])
    cons_busi_normalized_preds = cons_busi_normalized_model.predict(df_test_features_cons_normalized[busi_mask])
    cons_busi_normalized_preds = cons_busi_normalized_preds * df_test_features_cons_normalized[busi_mask]['norm_target']
    nans = cons_busi_normalized_preds.isna()
    if nans.sum() > 0:
        cons_busi_normalized_preds[nans] = cons_busi_model.predict(df_test_features_cons[busi_mask & nans])
    
    cons_no_busi_normalized_preds = cons_no_busi_normalized_model.predict(df_test_features_cons_normalized[~busi_mask])
    cons_no_busi_normalized_preds = cons_no_busi_normalized_preds * df_test_features_cons_normalized[~busi_mask]['norm_target']
    nans = cons_no_busi_normalized_preds.isna()
    if nans.sum() > 0:
        cons_no_busi_normalized_preds[nans] = cons_no_busi_model.predict(df_test_features_cons[~busi_mask & nans])
    preds = np.zeros(len(df_test))
    
    preds[~consumption_mask] = prod_normalized_preds
    preds[global_no_busi_mask] = cons_no_busi_normalized_preds
    preds[global_busi_mask] = cons_busi_normalized_preds
    #print(preds)
    #preds_all = np.concatenate((preds_all, preds))
    #print(time.time() - start)
    df_sample_prediction["target"] = preds #preds #model.predict(df_test_features)
    counter += 1
    env.predict(df_sample_prediction)
print(time.time() - start)

This version of the API is not optimized and should not be used to estimate the runtime of your code on the hidden test set.
3.808190107345581
4.149472951889038
3.8647372722625732
3.8691442012786865
3.902517557144165
