# Install Packages



In [0]:
%pip install prophet
%pip install mlflow

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


# Import Libraries


In [0]:
import pandas as pd
import numpy as np
import prophet
import mlflow
import datetime
import pyspark
import itertools
from pyspark.sql import SparkSession
from prophet import Prophet, serialize
from prophet.diagnostics import cross_validation, performance_metrics
from prophet.plot import plot_cross_validation_metric
import matplotlib.pyplot as plt
from matplotlib.dates import (
    MonthLocator,
    num2date,
    AutoDateLocator,
    AutoDateFormatter,
)
from pyspark.sql.functions import pandas_udf, ceil, when, lit
from pyspark.sql.types import (
    StructType,
    StructField,
    TimestampType,
    DoubleType,
    StringType,
    IntegerType,
)
import logging
import json

np.random.seed(42)

# Function: Get Customer Data


In [0]:
def get_customer_volume_data(
    start_date: str, end_date: str, date_column: str
) -> pyspark.sql.DataFrame:
    """Get daily customer count data for deposit accounts"""
    return spark.sql(f"select * from discovery.new_customer_counts_test")

# Function: Fit Prophet Model

In [0]:
def fit_new_prophet_model(
    df: pd.DataFrame,
    tuned_params: dict,
) -> Prophet:
    """Takes a dataframe, institution id and feature label (long-format df) and returns a trained prophet model"""
    model = Prophet(**tuned_params)
    model.add_seasonality(name="monthly", period=30.5, fourier_order=5)
    return model.fit(df)

# Function: Fit Prophet Model


In [0]:
def extract_params(pr_model) -> dict:
    """Get parameters of a prophet model"""
    return {attr: getattr(pr_model, attr) for attr in serialize.SIMPLE_ATTRIBUTES}

# Function: Process Data Columns


In [0]:
def get_path_and_colnames(
    df: pd.DataFrame, client_name_colname: str
) -> (pd.DataFrame, str):
    """Strip leading/trailing whitespace from Institution name column"""
    df[client_name_colname] = df[client_name_colname].str.strip()
    return (
        df,
        df[client_name_colname].head(1).to_numpy()[0]
    )

# Function: Create Prophet Input


In [0]:
def get_prophet_input(
    df: pd.DataFrame, date_column: str, feature_value_column: str
) -> pd.DataFrame:
    """Transform forecast input from data to prophet forecast input"""
    return (
        df.rename(columns={date_column: "ds", feature_value_column: "y"})[["ds", "y"]]
        .groupby("ds")
        .agg({"ds": "first", "y": "sum"})
    )

# Function: Get Training Data Metrics


In [0]:
def get_training_data_metrics(df, training_data_column: str = "y"):
    """Get some training data metrics: unique, min, max, mean, sd, range"""
    min_training_value = df[training_data_column].min()
    max_training_value = df[training_data_column].max()
    return (
        df[training_data_column].nunique(),
        min_training_value,
        max_training_value,
        pd.to_numeric(df[training_data_column]).mean(),
        pd.to_numeric(df[training_data_column]).std(),
        max_training_value - min_training_value,
    )

# Functions: Hyperparameter Grid Search


In [0]:
def get_prophet_hyperparam_grid(
    hyperparamter_grid: dict = {
        "changepoint_prior_scale": [0.05, 0.1, 0.5],
        "seasonality_prior_scale": [0.01, 0.1, 1.0, 10.0],
        "seasonality_mode": ["additive", "multiplicative"],
    }
) -> dict:
    """Define hyperparameter grid"""
    return hyperparamter_grid

In [0]:
def get_prophet_hyperparam_powerset(hyperparamter_grid: dict):
    """Generate all combinations of hyperparameters to iterate over"""
    return [
        dict(zip(hyperparamter_grid.keys(), v))
        for v in itertools.product(*hyperparamter_grid.values())
    ]

In [0]:
def run_hyperparams_grid_search(
    df: pd.DataFrame,
    all_params: dict,
    cv_horizon: str = "21 days",
    cv_period: str = None,
    cv_initial: str = None,
) -> dict:
    """Use cross validation to evaluate all hyperparameter combinations (tuned_params),
    pick the top performing model based on RMSE, and reeturn those hyperparameters"""
    df = df.copy()
    rmses = []
    for params in all_params:
        m = Prophet(**params)
        m.add_seasonality(name="monthly", period=30.5, fourier_order=5)
        m.fit(df)
        df_cv = cross_validation(
            m,
            horizon=cv_horizon,
            period=cv_period,
            initial=cv_initial,
            parallel="processes",
        )
        performance_df = performance_metrics(df_cv, rolling_window=1)
        rmses.append(performance_df["rmse"].values[0])
    # Get the best parameters
    results_df = pd.DataFrame(all_params)
    results_df["rmse"] = rmses
    results_df = results_df.sort_values("rmse", ascending=True)
    return results_df[list(all_params[0].keys())].iloc[0].to_dict()

# Function: Future Forecast Dataframe


In [0]:
def create_future_forecast_dataframe(
    m: Prophet,
    horizon: int,
    date_column: str,
    client_name: str,
    client_name_column: str = "TradingName",
    prophet_date_column: str = "ds",
    date_format: str = "%Y-%m-%d",
):
    """Get forecast future dataframe"""
    df = m.predict(m.make_future_dataframe(periods=horizon))
    df[date_column] = df[prophet_date_column].dt.strftime(date_format)
    df[client_name_column] = client_name
    return df

# Function: Validate and get Forecast


In [0]:
def validate_and_get_forecast_metrics(
    m: Prophet,
    training_data_metrics: tuple,
    cv_horizon: str = "21 days",
    cv_period: str = None,
    cv_initial: str = None,
    metric_keys: list = ["mse", "rmse", "mae", "mape", "mdape", "coverage"],
    training_metrics_keys: list = [
        "unique_training_values",
        "min_training_value",
        "max_training_value",
        "mean_training_value",
        "stddev_training_value",
        "range_training_values",
    ],
) -> (pd.DataFrame, dict):
    """Get cross validation and training data metrics"""
    metrics_raw = cross_validation(
        model=m,
        horizon=cv_horizon,
    )
    cv_metrics = performance_metrics(metrics_raw)
    metrics = {**{k: cv_metrics[k].mean() for k in metric_keys}}
    for i, training_metrics_key in enumerate(training_metrics_keys):
        metrics[training_metrics_key] = training_data_metrics[i]
    return metrics_raw, metrics

# Function: Log outputs & Graphs


In [0]:
def get_forecast_and_log_outputs(
    df: pd.DataFrame,
    m: Prophet,
    metrics_raw: pd.DataFrame,
    artifact_path: str,
    metrics: dict,
    params: dict,
    forecast_plot_filename: str = "forecast.png",
    components_plot_filename: str = "components.png",
    cross_validation_plot_filename: str = "cross_validation.png",
) -> pd.DataFrame:
    """Get forecast data and log figures, params and metrics pertaining to them"""
    df = df.copy()
    forecast_df = m.predict(df)
    mlflow.log_figure(m.plot(forecast_df), forecast_plot_filename)
    mlflow.log_figure(m.plot_components(forecast_df), components_plot_filename)
    mlflow.log_figure(
        plot_cross_validation_metric(metrics_raw, metric="mape"),
        cross_validation_plot_filename,
    )
    mlflow.prophet.log_model(m, artifact_path=artifact_path)
    mlflow.log_params(params)
    mlflow.log_metrics(metrics)
    return forecast_df

In [0]:
def generate_experiment_output(
    df: pd.DataFrame,
    m: Prophet,
    run_id: str,
    forecast_label: str,
    client_name: str,
    unique_training_values: int,
    forecast_label_header: str = "forecast_label",
    client_name_header: str = "TradingName",
    unique_training_header: str = "UniqueTrainingValues",
    run_id_header: str = "RunId",
) -> pd.DataFrame:
    """Add output columns for table to forecast output in UDF"""
    df = df.copy()
    df["y"] = m.history["y"]
    df[forecast_label_header] = forecast_label
    df[client_name_header] = client_name
    df[unique_training_header] = unique_training_values
    df[run_id_header] = run_id
    return df

# Function: Train and Optimise Prophet Models


In [0]:
def create_train_and_evaluate_prophet(
    train_start: str,
    train_end: str,
    horizon: int,
    forecast_label: str,
    output_columns: list = [
        "forecast_label",
        "TradingName",
        "ds",
        "y",
        "yhat",
        "yhat_lower",
        "yhat_upper",
        "UniqueTrainingValues",
        "RunId",
    ],
    client_name_colname: str = "TradingName",
    date_column: str = "Date",
) -> pd.DataFrame:
    def train_and_evaluate_prophet(df):
        """Train prophet model based on date-value pair dataframe"""
        df, client_name = get_path_and_colnames(
            df, client_name_colname,
        )
        artifact_path = f"{forecast_label}_{client_name}_{train_start}_{train_end}_{horizon}"
        df = get_prophet_input(df, date_column, forecast_label)
        with mlflow.start_run(
            run_name=artifact_path,
            description=f"A Prophet forecasting run for {artifact_path}\n",
        ) as run:
            try:
                training_data_metrics = get_training_data_metrics(df)
                if training_data_metrics[0] > 10 and df.shape[0] > 100:
                    all_hyperparams = get_prophet_hyperparam_powerset(
                        get_prophet_hyperparam_grid()
                    )
                    tuned_hyperparams = run_hyperparams_grid_search(df, all_hyperparams)
                else:
                    tuned_hyperparams = {"changepoint_prior_scale": 0.05}

                m = fit_new_prophet_model(
                    df,
                    tuned_hyperparams,
                )
                params = extract_params(m)
                future_df = create_future_forecast_dataframe(
                    m, horizon, date_column, client_name
                )
                metrics_raw, metrics = validate_and_get_forecast_metrics(
                    m, training_data_metrics
                )
                forecast_df = get_forecast_and_log_outputs(
                     future_df, m, metrics_raw, artifact_path, metrics, params
                )
                forecast_df = generate_experiment_output(
                    forecast_df,
                    m,
                    run.info.run_id,
                    forecast_label,
                    client_name,
                    training_data_metrics[0],
                )
                return forecast_df[output_columns]
            except Exception as e:
                mlflow.set_tag("exception", "True")
                mlflow.set_tag("error_message", e)
                return pd.DataFrame(columns=output_columns)

    return train_and_evaluate_prophet

# Run Prophet Pipeline In PySpark


In [0]:
def run_forecast_pipeline(
    df: pyspark.sql.DataFrame,
    aggregation_columns: list,
    train_start: str,
    train_end: str,
    horizon_days: int,
    forecast_label: str,
    data_layer: str = "discovery",
    client_name_column: str = "TradingName",
    output_table: str = "customer_forecast_demo",
) -> pyspark.sql.DataFrame:
    spark = SparkSession.builder.appName(f"{forecast_label}_run").getOrCreate()
    forecast_schema = StructType(
        [
            StructField("forecast_label", StringType(), True),
            StructField(client_name_column, StringType(), True),
            StructField("ds", TimestampType(), True),
            StructField("y", DoubleType(), True),
            StructField("yhat", DoubleType(), True),
            StructField("yhat_lower", DoubleType(), True),
            StructField("yhat_upper", DoubleType(), True),
            StructField("UniqueTrainingValues", IntegerType(), True),
            StructField("RunId", StringType(), True),
        ]
    )
    forecasts_df = (
        df.groupBy(aggregation_columns)
        .applyInPandas(
            create_train_and_evaluate_prophet(
                train_start,
                train_end,
                horizon_days,
                forecast_label
            ),
            schema=forecast_schema,
        )
    )
    forecasts_df.write.mode("overwrite").saveAsTable(f"{data_layer}.{output_table}")
    return forecasts_df

# Main Code


In [0]:
current_date = "2024-12-03"
horizon_days = 90
train_days = 365
date_format = "%Y-%m-%d"

train_end = datetime.date.today().strftime(date_format)
train_start = (datetime.datetime.strptime(current_date, date_format) - datetime.timedelta(days=train_days)).strftime(
    date_format
)

customer_volume_df = get_customer_volume_data(train_start, train_end, "Date")

run_forecast_pipeline(
    customer_volume_df,
    ["TradingName"],
    train_start,
    train_end,
    horizon_days,
    "NewCustomers",
)

DataFrame[forecast_label: string, TradingName: string, ds: timestamp, y: double, yhat: double, yhat_lower: double, yhat_upper: double, UniqueTrainingValues: int, RunId: string]