In [1]:
import warnings
warnings.filterwarnings('ignore')
import re
from db.queries import get_sample_rows, get_user_forecast_data, row_to_config, get_predictive_data
import os

In [2]:
os.environ["ENV"] = "DEV"

In [3]:
ufm_df = get_user_forecast_data(databrick_task_id=67)
ufm_df.head()

Unnamed: 0,StartDate,EndDate,Parameters,Region,Status,ForecastMethodID,UserForecastMethodID,CustomerJSON,varJSON,Method,DatabrickID
0,2025-02-01,2029-01-31,"(5,2,0.3,0.3,0.8)",EC,Failed,6,199,"{""CSA"": ["""",""Kimberley"",""Klerksdorp"",""South Af...","{""VariableID"": [""OffPeakConsumption"",""PeakCons...",XGBoost,67


In [4]:
from etl.etl import *

In [5]:
all_prediction_columns = ["PeakConsumption", "StandardConsumption", "OffPeakConsumption","Block1Consumption", "Block2Consumption","Block3Consumption", "Block4Consumption",     "NonTOUConsumption"]

metadata = extract_metadata(ufm_df)
customer_ids = parse_json_column(ufm_df, "CustomerJSON")
variable_ids = parse_json_column(ufm_df, "varJSON", key="VariableID")
columns_mapping = generate_combinations(all_prediction_columns)

logging.info(f"Customer IDs: {customer_ids}")
logging.info(f"Variable IDs: {variable_ids}")
logging.info(f"‚úÖ Total column combinations: {len(columns_mapping)}")

INFO:root:Generated 255 column combinations.
INFO:root:Customer IDs: ['Klerksdorp', 'KSACS - Mmabatho', 'Kimberley', 'KSACS - Klerksdorp', 'KSACS - Soweto', 'South African Development Comm', 'Witbank', 'KSACS - Pretoria', 'Benoni', 'KSACS - Vereeniging', 'KSACS - Randfontein', 'KSACS - Benoni', 'KSACS - Nigel', 'KSACS - Witbank']
INFO:root:Variable IDs: ['PeakConsumption', 'OffPeakConsumption', 'Block1Consumption', 'StandardConsumption']
INFO:root:‚úÖ Total column combinations: 255


In [6]:
selected_columns = find_matching_combination(columns_mapping, all_prediction_columns)

INFO:root:Exact match found for: frozenset({'NonTOUConsumption', 'Block1Consumption', 'StandardConsumption', 'Block2Consumption', 'PeakConsumption', 'Block3Consumption', 'Block4Consumption', 'OffPeakConsumption'})


In [7]:
from dml.dml import *

In [8]:
df = load_and_prepare_data(path=None, ufmd=199, save=False, method="XGBoost")
df.head()


INFO:root:‚úÖ Raw dataset loaded.
INFO:root:‚úÖ Raw dataset cleaned.


Unnamed: 0_level_0,PodID,CustomerID,TariffID,PeakConsumption,StandardConsumption,OffPeakConsumption,Block1Consumption,Block2Consumption,Block3Consumption,Block4Consumption,NonTOUConsumption
ReportingMonth,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1
2020-04-01,1873419,8460296087,MEGAFLEX,651020.0,1926932.0,1981510.0,0.0,0.0,0.0,0.0,0.0
2020-05-01,1873419,8460296087,MEGAFLEX,859170.0,2337768.0,3602408.0,0.0,0.0,0.0,0.0,0.0
2020-06-01,1873419,8460296087,MEGAFLEX,873638.0,2127978.0,2858956.0,0.0,0.0,0.0,0.0,0.0
2020-07-01,1873419,8460296087,MEGAFLEX,1025896.0,2539960.0,3679766.0,0.0,0.0,0.0,0.0,0.0
2020-08-01,1873419,8460296087,MEGAFLEX,1006638.0,2522428.0,4198484.0,0.0,0.0,0.0,0.0,0.0


In [9]:
# df.to_csv("PredictiveInputDataXGBoost.csv")

In [10]:
row = ufm_df.iloc[0]
config = row_to_config(row)

In [11]:
config

ForecastConfig(forecast_method_id=6, forecast_method_name='XGBoost', model_parameters='(5,2,0.3,0.3,0.8)', region='EC', status='Failed', user_forecast_method_id=199, start_date=datetime.date(2025, 2, 1), end_date=datetime.date(2029, 1, 31), databrick_id=67)

In [12]:
if df.empty:
    logging.error("üö´ DataFrame is empty. Check input filters or data source.")
else:
    customer_ids, pod_ids = get_unique_list_of_customer_and_pod(df)

    # These variables should come from user input / config
    StartDate = config.start_date
    EndDate = config.end_date
    Hyper_Parameters = config.model_parameters

    forecast_dates = get_forecast_range(StartDate, EndDate)
    params = extract_xgboost_params(Hyper_Parameters)
    print("Param: ", params)
    # Extract actuals range
    latest_actual_date = df.index.max()
    logging.info(f"üìç Last actuals month in data: {latest_actual_date.strftime('%Y-%m')}")

INFO:root:üßÆ Forecasting for 2141
INFO:root:üìÖ Forecast period: 2025-02-01 00:00:00 to 2029-01-01 00:00:00
INFO:root:üìç Last actuals month in data: 2025-02


Param:  (5.0, 2.0, 0.3, 0.3, 0.8)


In [13]:
from xgboost import XGBRegressor
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.multioutput import MultiOutputRegressor
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.metrics import mean_squared_error,r2_score,make_scorer
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, train_test_split, cross_val_score

In [14]:
# Scaling the features
scaler = StandardScaler()
sim_imputer = SimpleImputer()
consumption_types = ["PeakConsumption", "StandardConsumption", "OffPeakConsumption","Block1Consumption", "Block2Consumption","Block3Consumption", "Block4Consumption",     "NonTOUConsumption"]
cons_types = [col for col in selected_columns if col in consumption_types]

In [15]:
Forecast_Method_Name = "XGBoost"
UFMID = 199
DatabrickID = 67

In [16]:
forecast_dates = pd.date_range(start=StartDate, end=EndDate, freq='MS')[0:]
n_periods = len(forecast_dates)

In [17]:
from xg.utilities import *  
import numpy as np

In [18]:
# Main forecasting loop
for customer_id in df['CustomerID'].unique():
    #print(f"Processing CustomerID: {customer_id}")
    customer_df = df[df["CustomerID"] == customer_id]
    if customer_df.empty:
        #print(f"No data found for CustomerID: {customer_id}")
        continue
    unique_podel_ids = customer_df["PodID"].unique()
    for podel_id in unique_podel_ids:

        # if podel_id == "6582636837":
            RMSE_sum = 0
            R2_sum = 0
            #print(f"Processing PODEL_ID: {podel_id}")

            podel_df = customer_df[customer_df["PodID"] == podel_id]
            if podel_df.empty:
                #print(f"No data found for PODEL_ID: {podel_id}")
                continue
            future_predictions = []
            forecast_df_all_cons = pd.DataFrame(forecast_dates, columns=["ReportingMonth"])
            forecast_df_all_cons["CustomerID"] = customer_id
            forecast_df_all_cons["PodID"] = podel_id
            forecast_df_all_cons["CustomerID"] = forecast_df_all_cons["CustomerID"].astype(int)
            forecast_df_all_cons["PodID"] = forecast_df_all_cons["PodID"].astype(int)
            forecast_df_all_cons["UserForecastMethodID"] = UFMID    
            performance_data = { 'ModelName': Forecast_Method_Name,
                                'CustomerID': str(customer_id),
                                'PodID': str(podel_id),
                                'DataBrickID': int(DatabrickID),   
                                'UserForecastMethodID': int(UFMID)
                                }
            historical_df = customer_df[customer_df["PodID"] == podel_id].copy()            

            # Create lag features for the historical data
            # lag_columns = ['OffpeakConsumption', 'StandardConsumption', 'PeakConsumption']
            lag_columns = selected_columns[2:]
            #if debug:
                #print(f'The lage columns are {lag_columns}')
            podel_df = create_lag_features(podel_df, lag_columns, lags=3)

            # Fill NaN values with 0 or an appropriate imputation method
            for col in [f"{col}_lag{lag}" for col in lag_columns for lag in range(1, 4)]:
                podel_df[col] = pd.to_numeric(podel_df[col], errors='coerce')
            podel_df = podel_df.fillna(0)

            feature_columns = ["Month", "Year"] + [f"{col}_lag{lag}" for col in lag_columns for lag in range(1, 4)]
            for cons_type in cons_types:

                # Prepare feature and target matrices
                X = podel_df[feature_columns].values
                Y = podel_df[cons_type].values

                if podel_df[cons_type].isnull().all():
                    #print(f"No data found for consumption type: {cons_type}")
                    continue

                #display( podel_df)
                if X.shape[0] <= 5:
                    continue
                else:   
                    try:
                        X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

                        X_imputer = SimpleImputer(strategy='mean')
                        Y_imputer = SimpleImputer(strategy='mean')
                        
                        # Impute missing values for X_train and X_test
                        X_train_imputed = X_imputer.fit_transform(X_train)
                        X_test_imputed = X_imputer.transform(X_test)

                        # Scale the training features
                        X_train_scaled = scaler.fit_transform(X_train_imputed)
                        X_test_scaled = scaler.transform(X_test_imputed)

                        # Impute missing values in Y_train and Y_test
                        Y_train_imputed = Y_imputer.fit_transform(Y_train.reshape(-1,1))
                        Y_test_imputed = Y_imputer.transform(Y_test.reshape(-1,1))


                        # Define the parameter grid for XGBRegressor
                        # param_grid = {
                        #     'estimator__n_estimators': [10, 50, 100],
                        #     'estimator__max_depth': [5, 10, 15],
                        #     'estimator__learning_rate': [0.01, 0.1, 0.3],
                        #     'estimator__subsample': [0.8, 0.9, 1],
                        #     'estimator__colsample_bytree': [0.7, 0.8, 1]
                        # }
                        model = XGBRegressor(
                            objective="reg:squarederror",
                            n_estimators=100,         # Number of boosting rounds
                            max_depth=10,             # Maximum tree depth
                            learning_rate=0.1,        # Learning rate
                            subsample=0.9,            # Subsample ratio of training instances
                            colsample_bytree=0.8,     # Subsample ratio of columns
                            enable_categorical=True
                        )

                        # Fit the model to the training data
                        model.fit(X_train_scaled, Y_train_imputed.ravel())  # Flatten Y_train to 1D

                        # Make predictions on the test set
                        Y_pred = model.predict(X_test_scaled)

                    except IndexError as e:
                        print(f"IndexError occurred,there is a mismatch in the no of elements : {e}")
                    except ValueError as e:
                        print(f"ValueError occurred: {e}")
                    except KeyError as e:
                        print(f"KeyError occurred, certain referenced column not found : {e}")
                    except Exception as e:
                        print(f"An unexpected error occurred: {e}")

                    rmse = np.sqrt(mean_squared_error(Y_test_imputed, Y_pred))
                    r2 = r2_score(Y_test_imputed, Y_pred)

                    # if debug:
                    #print(f"Test RMSE: {rmse}")
                    #print(f"Test R¬≤: {r2}")


                    # model.fit(X_train_scaled, Y_train)

                    # Prepare forecast dataframe
                    forecast_df = pd.DataFrame(forecast_dates, columns=["ReportingMonth"])
                    forecast_df["CustomerID"] = customer_id
                    forecast_df["PodID"] = podel_id
                    forecast_df["Month"] = forecast_df["ReportingMonth"].dt.month
                    forecast_df["Year"] = forecast_df["ReportingMonth"].dt.year
                    forecast_df["CustomerID"] = forecast_df["CustomerID"].astype(int)
                    forecast_df["PodID"] = forecast_df["PodID"].astype(int)
                    forecast_df["UserForecastMethodID"] = UFMID

                    # Initialize lag columns in the forecast DataFrame
                    for col in lag_columns:
                        forecast_df[col] = np.nan  # Initialize the consumption columns
                        for lag in range(1, 4):
                            forecast_df[f"{col}_lag{lag}"] = np.nan

                    # Main loop for predictions across each forecasted period
                    for pred_cur_mth in range(n_periods):
                        #print(f"Prediction step {pred_cur_mth}")

                        # Update lag features for current step
                        if pred_cur_mth == 0:
                            # First step: Use historical data to initialize lags
                            forecast_df = create_forecast_lag_features(forecast_df, podel_df, lag_columns, lags=3, step=pred_cur_mth)
                            
                        else:
                            # Subsequent steps: Use previously predicted data to update lags
                            forecast_df = create_forecast_lag_features(forecast_df, forecast_df, lag_columns, lags=3, step=pred_cur_mth)

                        # Convert forecast_df columns to numeric types
                        for col in [f"{col}_lag{lag}" for col in lag_columns for lag in range(1, 4)]:
                            forecast_df[col] = pd.to_numeric(forecast_df[col], errors='coerce')

                        # Select the current row of features for prediction
                        X_forecast = forecast_df.loc[pred_cur_mth, feature_columns].values.reshape(1, -1)

                        # Impute missing values in X_forecast
                        X_forecast_imputed = X_imputer.transform(X_forecast)

                        # Scale the features before making predictions
                        X_forecast_scaled = scaler.transform(X_forecast_imputed)
        
                        # Make prediction
                        prediction = model.predict(X_forecast_scaled)

                        # Store the prediction
                        future_predictions.append(prediction)


                        # Update the forecast DataFrame with the predicted values
                        if len(prediction.shape) == 1:
                            # for idx, col in enumerate(selected_columns[2:]):
                            #     if idx < len(prediction):
                                    forecast_df.loc[pred_cur_mth, cons_type] = prediction
                        else:
                            # for idx, col in enumerate(selected_columns[2:]):
                            #     if idx < prediction.shape[1]:
                                    forecast_df.loc[pred_cur_mth, cons_type] = prediction[0]

                        # Update lag features with the newly predicted values for future steps
                        for lag in range(1, 4):
                            next_step = pred_cur_mth + lag
                            if next_step < len(forecast_df):
                                # for col in selected_columns[2:]:
                                    forecast_df.loc[next_step, f"{cons_type}_lag{lag}"] = forecast_df.loc[pred_cur_mth, cons_type]

                        #print( forecast_df.loc[pred_cur_mth, cons_type])



                    # if cons_type == "PeakConsumption":
                    #     display(forecast_df)

                    # #print(rename_dict)

                    # #print(forecast_df.info())


                lag_columns_to_drop = [f"{col}_lag{lag}" for col in lag_columns for lag in range(1, 4)]

            
                # Combine the specific columns to drop with the dynamic lag columns
                columns_to_drop = ["Month", "Year"] + lag_columns_to_drop

                # Drop the columns from the DataFrame
                # forecast_df = forecast_df.drop(columns=columns_to_drop, axis=1)

                    # display(forecast_df)
                forecast_df_all_cons[cons_type]=forecast_df[cons_type]
                performance_data[f"RMSE_{cons_type}"] = rmse
                performance_data[f"R2_{cons_type}"] = r2


                    # # Now plot historical vs forecasted values for the features of interest
            
                plot_forecast_vs_historical(historical_df, forecast_df,  [cons_type])  

                        
            for cons_type in cons_types:
                if f"RMSE_{cons_type}" in performance_data and f"R2_{cons_type}" in performance_data:
                    RMSE_sum +=  performance_data[f"RMSE_{cons_type}"]
                    R2_sum   +=  performance_data[f"R2_{cons_type}"]

            if len(cons_types) > 0:
                rmse_avg = RMSE_sum/len(cons_types)
                r2_avg = R2_sum/len(cons_types)


            performance_data['RMSE_Avg'] = rmse_avg
            performance_data['R2_Avg'] = r2_avg
            performance_df = pd.DataFrame([performance_data])
 
            forecast_combined_spark_df = forecast_combined_spark_df.withColumn("CustomerID", forecast_combined_spark_df["CustomerID"].cast("bigint"))
            forecast_combined_spark_df = forecast_combined_spark_df.withColumn("PodID", forecast_combined_spark_df["PodID"].cast("bigint"))
            forecast_combined_spark_df = forecast_combined_spark_df.withColumn("UserForecastMethodID",forecast_combined_spark_df["UserForecastMethodID"].cast("bigint"))

                                        # Write the DataFrame to the SQL table
                # #print(forecast_df.columns)

OverflowError: Python int too large to convert to C long