In [20]:
import os
import pandas as pd
import numpy as np
import matplotlib.pylab as plt
import pyarrow.parquet as pq 
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.impute import KNNImputer
from sklearn.preprocessing import LabelEncoder

In [22]:
def get_file_path(filename, starting_directory='.'):
    paths = []
    for root, directories, files in os.walk(starting_directory):
        for file in files:
            if file == filename:
                paths.append(os.path.join(root, file))
                return paths
    raise FileNotFoundError(f"Could not find file {filename} in directory {starting_directory}")
get_file_path('X_train_observed.parquet')

['./A/X_train_observed.parquet']

# Data Exploration

In [10]:
# Load the metadata of the parquet files
metadata_a = pq.read_metadata('A/X_train_observed.parquet')
metadata_b = pq.read_metadata('B/X_train_observed.parquet')
metadata_c = pq.read_metadata('C/X_train_observed.parquet')

get_file_path

# Get the schema of the parquet files
schema_a = metadata_a.schema
schema_b = metadata_b.schema
schema_c = metadata_c.schema

print("Schema for file A:")
print(schema_a)
print("\nSchema for file B:")
print(schema_b)
print("\nSchema for file C:")
print(schema_c)

Schema for file A:
<pyarrow._parquet.ParquetSchema object at 0x1460cdb40>
required group field_id=-1 schema {
  optional int64 field_id=-1 date_forecast (Timestamp(isAdjustedToUTC=false, timeUnit=microseconds, is_from_converted_type=false, force_set_converted_type=false));
  optional float field_id=-1 absolute_humidity_2m:gm3;
  optional float field_id=-1 air_density_2m:kgm3;
  optional float field_id=-1 ceiling_height_agl:m;
  optional float field_id=-1 clear_sky_energy_1h:J;
  optional float field_id=-1 clear_sky_rad:W;
  optional float field_id=-1 cloud_base_agl:m;
  optional float field_id=-1 dew_or_rime:idx;
  optional float field_id=-1 dew_point_2m:K;
  optional float field_id=-1 diffuse_rad:W;
  optional float field_id=-1 diffuse_rad_1h:J;
  optional float field_id=-1 direct_rad:W;
  optional float field_id=-1 direct_rad_1h:J;
  optional float field_id=-1 effective_cloud_cover:p;
  optional float field_id=-1 elevation:m;
  optional float field_id=-1 fresh_snow_12h:cm;
  optional

In [36]:
# Load the data
df_A = pq.read_table('A/X_train_observed.parquet').to_pandas()
df_B = pq.read_table('B/X_train_observed.parquet').to_pandas()
df_C = pq.read_table('C/X_train_observed.parquet').to_pandas()

# Inspect the data
#print(df_A.info())
#print(df_B.info())
#print(df_C.info())

1749    1.0
1750    1.0
1751    1.0
1752    1.0
1753    1.0
1754    1.0
1755    1.0
Name: snow_drift:idx, dtype: float32

# Data Preprocessing

In [None]:
# Class for general feature processing
class FeatureProcessingClass():
    def __init__(self):
        self.categorical_features = ['dew_or_rime:idx',
                                    'is_day:idx', 
                                    'is_in_shadow:idx', 
                                    'precip_type_5min:idx', 
                                    'snow_drift:idx']


    ###--- METHODS FOR JOINING DATASETS ---###


    def create_new_column_names(self, df: pd.DataFrame, suffix: str, columns_no_change: list[str]):
        '''
        Change column names by given suffix, keep columns_no_change, and return back the data

        PARAMS:
        - df: data
        - suffix: suffixes to add to column names
        - columns_no_change: list of column names who should not be changed
        '''
        df.columns = [col + suffix 
                      if col not in columns_no_change
                      else col
                      for col in df.columns
                      ]
        return df
    

    ###--- METHODS FOR CREATING/MODIFYING FEATURES ---###


    def impute_missing_values(self, df: pd.DataFrame):
        '''
        Impute missing values with mean of the two closest non-NaN values

        PARAMS:
        - df: pandas data
        '''
        # Create a KNNImputer
        imputer = KNNImputer(n_neighbors=2)

        # Fit and transform the DataFrame
        df_imputed = imputer.fit_transform(df)
        df_imputed = pd.DataFrame(df_imputed, columns=df.columns)

        return df_imputed

    
    def create_time_features(self, df: pd.DataFrame):
        '''
        Create data features based on datetime column

        PARAMS:
        - df: data
        '''

        # Check if 'date_forecast' column exists
        if 'date_forecast' not in df.columns:
            raise ValueError("DataFrame does not have 'date_forecast' column")

        # Try to convert 'date_forecast' to datetime
        try:
            df['date_forecast'] = pd.to_datetime(df['date_forecast'])
        except Exception as e:
            raise ValueError("Cannot convert 'date_forecast' to datetime: " + str(e))

        # time period features
        df['date'] = df['date_forecast'].dt.normalize()
        df['year'] = df['date_forecast'].dt.year
        df['quarter'] = df['date_forecast'].dt.quarter
        df['month'] = df['date_forecast'].dt.month
        df['week'] = df['date_forecast'].dt.isocalendar().week
        df['hour'] = df['date_forecast'].dt.hour

        # day features
        df['day_of_year'] = df['date_forecast'].dt.day_of_year
        df['day'] = df['date_forecast'].dt.day
        df['weekday'] = df['date_forecast'].dt.weekday

        # boolean features
        df['is_weekend'] = df['weekday'] > 5

        return df
    

    def convert_to_categorical(self, df: pd.DataFrame, categorical_features: list[str]):
        '''
        Convert columns to categorical dtype

        PARAMS:
        - df: data
        - catagorical_features: list of columns to convert to categorical
        '''
        return  df[categorical_features].apply(lambda x: x.astype('category'))
    

    def convert_to_numerical(self, df: pd.DataFrame, numerical_features: list[str]):
        '''
        Convert columns to numerical dtype
        
        PARAMS:
        - df: data
        - numerical_features: list of columns to convert to numerical
        '''
        return  df[numerical_features].apply(lambda x: x.astype('float'))
    

    def create_lag_features(self, df: pd.DataFrame, lag_features: list[str], lag_values: list[int]):
        '''
        Create lag features for given columns

        PARAMS:
        - df: data
        - lag_features: list of columns to create lag features for
        - lag_values: list of lag values
        '''
        # checking that not any lag features is categorical
        if any(feature in lag_features for feature in self.categorical_features):
            catagorical_feature_list = [feature for feature in lag_features if feature in self.categorical_features]
            raise ValueError(f"Cannot create lag features for categorical features {catagorical_feature_list}")

        for feature in lag_features:
            for lag in lag_values:
                df[feature + '_lag_' + str(lag)] = df[feature].shift(lag)
        return df
    

    ###--- CALL METHOD FOR INITIALISING CLASS AS FUNCTION ---###


    def __call__(self, data, client, historical_weather, forecast_weather, electricity, gas):
        '''Processing of features from all datasets, merge together and return features for dataframe df '''
        # Create features for relevant dataset
        data = self.create_data_features(data)
        client = self.create_client_features(client)
        historical_weather = self.create_historical_weather_features(historical_weather)
        forecast_weather = self.create_forecast_weather_features(forecast_weather)
        electricity = self.create_electricity_features(electricity)
        gas = self.create_gas_features(gas)
        
        # 🔗 Merge all datasets into one df 🔗
        df = data.merge(client, how='left', on = self.client_join)
        df = df.merge(historical_weather, how='left', on = self.weather_join)
        df = df.merge(forecast_weather, how='left', on = self.weather_join)
        df = df.merge(electricity, how='left', on = self.electricity_join)
        df = df.merge(gas, how='left', on = self.gas_join)
        
        # Change columns to categorical for XGBoost
        df[self.category_columns] = df[self.category_columns].astype('category')
        return df
    
