In [0]:
# machine learning
import mlflow
from prophet import Prophet
from sklearn.metrics import mean_absolute_error, mean_squared_error

# spark
from pyspark.sql.functions import *
from pyspark.sql.types import *

# general
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

In [0]:
# job parameter
dbutils.widgets.text('schema_name', 'artemis')
dbutils.widgets.text('max_datetime', '2025-03-31 00:00:00')
# dbutils.widgets.text('trailing_n', '3')

schema_name = dbutils.widgets.get('schema_name')
max_datetime = dbutils.widgets.get('max_datetime')
# trailing_n = int(dbutils.widgets.get('trailing_n'))
# print(f"---------> {trailing_n} | {max_datetime}")


# forecast variables
interval_width = 0.8
forecast_frequency = 'H'
forecast_periods = 8
include_history = True
freq = 'H'
group_cols = ['miner_id','seasonality_mode','changepoint_grid']


# dataset variables
# dt = datetime.fromisoformat(max_datetime)
# min_date = dt - timedelta(days=trailing_n)


In [0]:
experiment_name = '/Users/rchynoweth@invisocorp.com/exps/Forecast_Power_Usage'

try:
    mlflow.create_experiment(experiment_name)
    mlflow.set_experiment(experiment_name)
    print('Experiment created')
except : 
    mlflow.set_experiment(experiment_name)
    print('Experiment set')

In [0]:
spark.sql(f"use {schema_name}")

In [0]:
df = spark.sql(
    f"""
        select 
        time as ds
        , miner_id
        , kwh as y
        from {schema_name}.miner_data
    """
)
display(df)

In [0]:
changepoint_grid = np.linspace(0.5, 0.95, num=12).tolist()  
seasonality_modes = ['additive', 'multiplicative']

# Create the cross product manually
cross_product = [(c, s) for c in changepoint_grid for s in seasonality_modes]

# Create a DataFrame from the cross product
schema = StructType([
    StructField("changepoint_grid", DoubleType(), False),
    StructField("seasonality_mode", StringType(), False)
])

cross_df = spark.createDataFrame(cross_product, schema=schema)

# Cross join your original df with the cross_df
df = df.crossJoin(cross_df)
display(df)

# Detailed Forecasts
Using the provided job parameters, we will perform hyperparameter tuning for all miners to find the optimal `changepoint_prior_scale` and `seasonality_mode` values. 

In [0]:
forecast_output_schema = StructType(
    [
        StructField("ds", TimestampType(), True),
        StructField("trend", DoubleType(), True),
        StructField("yhat_lower", DoubleType(), True),
        StructField("yhat_upper", DoubleType(), True),
        StructField("trend_lower", DoubleType(), True),
        StructField("trend_upper", DoubleType(), True),
        StructField("additive_terms", DoubleType(), True),
        StructField("additive_terms_lower", DoubleType(), True),
        StructField("additive_terms_upper", DoubleType(), True),
        StructField("daily", DoubleType(), True),
        StructField("daily_lower", DoubleType(), True),
        StructField("daily_upper", DoubleType(), True),
        StructField("weekly", DoubleType(), True),
        StructField("weekly_lower", DoubleType(), True),
        StructField("weekly_upper", DoubleType(), True),
        # StructField("hourly", DoubleType(), True),
        # StructField("hourly_lower", DoubleType(), True),
        # StructField("hourly_upper", DoubleType(), True),
        StructField("extra_regressors_additive", DoubleType(), True),
        StructField("extra_regressors_additive_lower", DoubleType(), True),
        StructField("extra_regressors_additive_upper", DoubleType(), True),
        # StructField("is_afternoon", IntegerType(), True),
        StructField("is_afternoon_lower", DoubleType(), True),
        StructField("is_afternoon_upper", DoubleType(), True),
        StructField("is_afternoon_x", DoubleType(), True),
        StructField("is_afternoon_y", DoubleType(), True),
        StructField("multiplicative_terms", DoubleType(), True),
        StructField("multiplicative_terms_lower", DoubleType(), True),
        StructField("multiplicative_terms_upper", DoubleType(), True),
        StructField("yhat", DoubleType(), True),
        StructField("miner_id", IntegerType(), True),
        StructField("changepoint_grid", DoubleType(), True),
        StructField("seasonality_mode", StringType(), True),
        StructField("y", DoubleType(), True),
    ]
)

In [0]:
def is_afternoon(ds):
    dt = pd.to_datetime(ds)
    if dt.hour >= 17 and dt.hour <= 23:
        return 1
    else:
        return 0

In [0]:
def miner_forecast(pdf):
    miner_id = pdf['miner_id'].iloc[0]
    # pdf = (df.filter(col('miner_id')==10796964)
    #     .filter(col('changepoint_grid')==0.80)
    #     .filter(col('seasonality_mode')=='multiplicative')
    #     .toPandas()
    # )
    # experiment_name = '/Users/rchynoweth@invisocorp.com/Forecast_Power_Usage'
    mlflow.set_experiment(experiment_name)
    with mlflow.start_run(nested=True, run_name=f"Miner_{miner_id}"):
        # ensure proper datetime for filtering
        pdf['ds'] = pd.to_datetime(pdf['ds'])
        pdf['is_afternoon'] = pdf['ds'].apply(is_afternoon)

        # make a copy and filter to the dates we want to use for forecasting
        history_pd = pdf.dropna().copy()
        history_pd = history_pd[history_pd['ds'] <= pd.to_datetime(max_datetime)]
        # history_pd = history_pd[history_pd['ds'] >= pd.to_datetime(min_date)]

        # find the extra columns that we want to keep
        extra_cols = [col for col in history_pd.columns if col not in ['ds', 'y']]

        # get the hyperparameter values from the dataframe
        cp = pdf['changepoint_grid'].iloc[0]
        mode = pdf['seasonality_mode'].iloc[0]

        # create the model
        model = Prophet(
            interval_width=interval_width,
            changepoint_prior_scale=cp,
            seasonality_mode=mode
        )
        model.add_regressor('is_afternoon')
        # model.add_seasonality(name='hourly', period=24, fourier_order=8)
        # train model
        model.fit(history_pd)

        # Forecast data 
        future_pd = model.make_future_dataframe(
            periods=forecast_periods, 
            freq=forecast_frequency, 
            include_history=include_history
        )
        future_pd['is_afternoon'] = future_pd['ds'].apply(is_afternoon)

        forecast_pd = model.predict(future_pd)
        # Add any extra columns back
        for c in extra_cols:
            forecast_pd[c] = history_pd[c].iloc[0]

        forecast_pd = pd.merge(forecast_pd, pdf, on=['ds','miner_id', 'changepoint_grid', 'seasonality_mode'], how='left')

        # Calculate evaluation on the forecasted points only 
        ## Filter both DataFrames first
        forecast_filtered = forecast_pd[forecast_pd['ds'] > pd.to_datetime(max_datetime)]

        ## Perform the join (e.g., inner join on 'ds')
        y_true = forecast_filtered[['y']]
        y_pred = forecast_filtered[['yhat']]
        rmse = mean_squared_error(y_true, y_pred, squared=False)
        rmse_first_four = mean_squared_error(y_true.iloc[:4], y_pred.iloc[:4], squared=False)
        match = ((y_true['y'] > 4.6) == (y_pred['yhat'] > 4.6)).astype(int)
        accuracy = match.sum() / len(match)

        # custom accuracy for when y is above 4.6
        # Filter only the rows where y_true > 4.6
        mask = y_true['y'] > 4.6
        # Now check if y_pred is also > 4.6 in those cases
        correct_predictions = (y_pred.loc[mask, 'yhat'] > 4.6).astype(int)
        # Accuracy is sum of correct predictions divided by number of cases evaluated
        accuracy46 = correct_predictions.sum() / len(correct_predictions) if len(correct_predictions) > 0 else 0

        mlflow.log_param('changepoint_prior_scale', cp)
        mlflow.log_param('seasonality_mode', mode)
        mlflow.log_metric('rmse', rmse)
        mlflow.log_metric('rmse_first_four', rmse_first_four)
        mlflow.log_metric('accuracy', accuracy)
        mlflow.log_metric('accuracy46', accuracy46)
        mlflow.log_param("miner_id", miner_id)
        
        # forecast_pd = forecast_pd.drop(columns=['is_afternoon'])


    return forecast_pd

In [0]:
with mlflow.start_run(run_name="ForecastAllMiners") as parent_run:

  out_df = (df
            # .filter(col('ds')<max_datetime)
            # .filter(col('ds')>min_date.isoformat())
            .groupBy(*group_cols)
            .applyInPandas(miner_forecast, schema=forecast_output_schema)
            .withColumn("yhat_lower", greatest(col("yhat_lower"), lit(0)))
            .withColumn("yhat_upper", greatest(col("yhat_upper"), lit(0)))
            .withColumn('training_time', current_timestamp())
          )


In [0]:
out_df.write.option("mergeSchema", "true").mode('append').saveAsTable('artemis.forecasts_output_v5')

In [0]:

# mlflow
experiment = mlflow.get_experiment_by_name(experiment_name)
experiment_id = experiment.experiment_id

# Search for all runs
runs_df = mlflow.search_runs(
    experiment_ids=[experiment_id],
    filter_string="attributes.status = 'FINISHED'"
    # ,output_format="pandas"
)

In [0]:
summary = (runs_df[runs_df['start_time'] > '2025-05-04']
           .groupby(['params.changepoint_prior_scale', 'params.seasonality_mode'])
           .agg(
               avg_rmse=('metrics.rmse', 'mean'),
               std_rmse=('metrics.rmse', 'std'),
               min_rmse=('metrics.rmse', 'min'),
               max_rmse=('metrics.rmse', 'max'),
               count=('metrics.rmse', 'count'),
               avg_4_rmse=('metrics.rmse_first_four', 'mean'),
               std_4_rmse=('metrics.rmse_first_four', 'std'),
               min_4_rmse=('metrics.rmse_first_four', 'min'),
               max_4_rmse=('metrics.rmse_first_four', 'max'),
               avg_accuracy=('metrics.accuracy', 'mean'),
               std_accuracy=('metrics.accuracy', 'std'),
               min_accuracy=('metrics.accuracy', 'min'),
               max_accuracy=('metrics.accuracy', 'max'),
               avg_accuracy46=('metrics.accuracy46', 'mean'),
               std_accuracy46=('metrics.accuracy46', 'std'),
               min_accuracy46=('metrics.accuracy46', 'min'),
               max_accuracy46=('metrics.accuracy46', 'max')
           )
           .reset_index()
           .sort_values('avg_rmse')
)

display(summary)