In [9]:
import logging
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import (mean_absolute_error, 
                             mean_squared_error, 
                             mean_absolute_percentage_error, 
                             median_absolute_error)
import mlflow
from mlflow.client import MlflowClient
#plt.rcParams.update({'font.size': 22})

import prophet
from prophet import Prophet
import kaggle
from urllib.parse import urlparse


In [5]:
def download_kaggle_dataset(
        kaggle_dataset: str ="pratyushakar/rossmann-store-sales",
        target_path = "./"
    ) -> None:
    api = kaggle.api
    print(api.get_config_value('username'))
    kaggle.api.dataset_download_files(kaggle_dataset, path=target_path, unzip=True, quiet=False)
    
def prep_store_data(df: pd.DataFrame, store_id: int = 4, store_open: int = 1) -> pd.DataFrame:
    df['Date'] = pd.to_datetime(df['Date'])
    df = df.rename(columns= {'Date': 'ds', 'Sales': 'y'})
    df_store = df[
        (df['Store'] == store_id) &\
        (df['Open'] == store_open)
    ].reset_index(drop=True)
    return df_store.sort_values('ds', ascending=True)   
    

def train_test_split_forecaster(
    df: pd.DataFrame,
    train_fraction: float 
)->tuple[pd.DataFrame, pd.DataFrame]:
    # grab split data
    train_index = int(train_fraction*df.shape[0])
    df_train = df.copy().iloc[0:train_index]
    df_test = df.copy().iloc[train_index:]
    return df_train, df_test
    
    
def train_forecaster(
    df_train: pd.DataFrame,
    seasonality: dict 
) -> prophet.forecaster.Prophet:
    #create Prophet model
    forecaster=Prophet(
        yearly_seasonality=seasonality['yearly'],
        weekly_seasonality=seasonality['weekly'],
        daily_seasonality=seasonality['daily'],
        interval_width = 0.95
    )
    forecaster.fit(df_train)
    return forecaster

def test_forecaster(
    df_test: pd.DataFrame
) -> None:
    return None

def forecast(
    forecaster: prophet.forecaster.Prophet,
    forecast_index: pd.DataFrame
) -> pd.DataFrame:
    return forecaster.predict(forecast_index)
    
    
def train_predict(
    df: pd.DataFrame, 
    train_fraction: float, 
    seasonality: dict
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, int]:
    
    # grab split data
    train_index = int(train_fraction*df.shape[0])
    df_train = df.copy().iloc[0:train_index]
    df_test = df.copy().iloc[train_index:]

    #create Prophet model
    model=Prophet(
        yearly_seasonality=seasonality['yearly'],
        weekly_seasonality=seasonality['weekly'],
        daily_seasonality=seasonality['daily'],
        interval_width = 0.95
    )

    # train and predict
    model.fit(df_train)
    predicted = model.predict(df_test)
    return predicted, df_train, df_test, train_index


# def plot_forecast(df_train: pd.DataFrame, df_test: pd.DataFrame, predicted: pd.DataFrame) -> None:
#     fig, ax = plt.subplots(figsize=(20,10))
#     df_test.plot(
#         x='ds', 
#         y='y', 
#         ax=ax, 
#         label='Truth', 
#         linewidth=1, 
#         markersize=5, 
#         color='tab:blue',
#         alpha=0.9, 
#         marker='o'
#     )
#     predicted.plot(
#         x='ds', 
#         y='yhat', 
#         ax=ax, 
#         label='Prediction + 95% CI', 
#         linewidth=2, 
#         markersize=5, 
#         color='red'
#     )
#     ax.fill_between(
#         x=predicted['ds'], 
#         y1=predicted['yhat_upper'], 
#         y2=predicted['yhat_lower'], 
#         alpha=0.15, 
#         color='red',
#     )
#     df_train.iloc[train_index-100:].plot(
#         x='ds', 
#         y='y', 
#         ax=ax, 
#         color='tab:blue', 
#         label='_nolegend_', 
#         alpha=0.5, 
#         marker='o'
#     )
#     current_ytick_values = plt.gca().get_yticks()
#     plt.gca().set_yticklabels(['{:,.0f}'.format(x) for x in current_ytick_values])
#     ax.set_xlabel('Date')
#     ax.set_ylabel('Sales')
#     plt.tight_layout()
#     plt.savefig('store_data_forecast.png')



In [6]:
import os
from dotenv import load_dotenv
load_dotenv()

MLFLOW_TRACKING_URI=os.getenv('MLFLOW_TRACKING_URI')
MLFLOW_TRACKING_USERNAME=os.getenv('MLFLOW_TRACKING_USERNAME')
MLFLOW_TRACKING_PASSWORD=os.getenv('MLFLOW_TRACKING_PASSWORD')

In [7]:
import logging
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s" 
logging.basicConfig(format = log_format, level = logging.INFO) 


mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI) 
logging.info("Defined MLFlowClient and set tracking URI.")

#mlflow.set_experiment("prophet_models_05042023")
#mlflow.autolog()
import os
import mlflow


2024-08-07 11:25:26,563 - root - INFO - Defined MLFlowClient and set tracking URI.


In [17]:
def get_experiment_id(name):
    exp = mlflow.get_experiment_by_name(name)
    if exp is None:
      exp_id = mlflow.create_experiment(name)
      return exp_id
    return exp.experiment_id


exp_id = get_experiment_id("prophet-retail-forecaster")
print(exp_id)


1


In [18]:
# If data present, read it in, otherwise, download it 
#file_path = 'rossman_store_data/train.csv'
file_path = "./data/"
if os.path.exists(file_path):
    logging.info('Dataset found, reading into pandas dataframe.')
    df = pd.read_csv(file_path + "train.csv")
else:
    logging.info('Dataset not found, downloading ...')
    download_kaggle_dataset(target_path=file_path)
    logging.info('Reading dataset into pandas dataframe.')
    df = pd.read_csv(file_path)   

logging.info("Training data retrieved.")

2024-08-07 13:02:43,105 - root - INFO - Dataset found, reading into pandas dataframe.
  df = pd.read_csv(file_path + "train.csv")
2024-08-07 13:02:49,338 - root - INFO - Training data retrieved.


In [19]:
for store_id in ['3', '4', '10']:
    with mlflow.start_run(experiment_id=exp_id):
        logging.info("Started MLFlow run")
        # Transform dataset in preparation for feeding to Prophet

        df_transformed = prep_store_data(df, store_id=int(store_id))
        logging.info("Transformed data")

        model_name = f"prophet-retail-forecaster-store-{store_id}"
        mlflow.autolog()
        
        # Define main parameters for modelling
        seasonality = {
            'yearly': True,
            'weekly': True,
            'daily': False
        }
        
        logging.info("Splitting data")
        # Split the data
        df_train, df_test = train_test_split_forecaster(df=df_transformed, train_fraction=0.75)
        logging.info("Data split")
        
        # Train the model
        logging.info("Training model")
        forecaster = train_forecaster(df_train=df_train, seasonality=seasonality)
        run_id = mlflow.active_run().info.run_id
        logging.info("Model trained")
        

        tracking_url_type_store = urlparse(mlflow.get_tracking_uri()).scheme
        if tracking_url_type_store != "file":
            # Register the model
            # There are other ways to use the Model Registry, which depends on the use case,
            # please refer to the doc for more information:
            # https://mlflow.org/docs/latest/model-registry.html#api-workflow
            mlflow.prophet.log_model(forecaster, artifact_path="model")#, registered_model_name=model_name
        else:
            mlflow.prophet.log_model(forecaster, "model")        


        #mlflow.prophet.log_model(forecaster, artifact_path="model")
        #mlflow.prophet.autolog(registered_model_name="<model name>")
        logging.info("Logged model")
        
        mlflow.log_params(seasonality)
        mlflow.log_metrics(
            {
                'rmse': mean_squared_error(y_true=df_test['y'], y_pred=forecaster.predict(df_test)['yhat'], squared=False),
                'mean_abs_perc_error': mean_absolute_percentage_error(y_true=df_test['y'], y_pred=forecaster.predict(df_test)['yhat']),
                'mean_abs_error': mean_absolute_error(y_true=df_test['y'], y_pred=forecaster.predict(df_test)['yhat']),
                'median_abs_error': median_absolute_error(y_true=df_test['y'], y_pred=forecaster.predict(df_test)['yhat'])
            }
        )


    # The default path where the MLflow autologging function stores the model
    artifact_path = "model"
    model_uri = "runs:/{run_id}/{artifact_path}".format(run_id=run_id, artifact_path=artifact_path)
    
    # Register the model
    model_details = mlflow.register_model(model_uri=model_uri, name=model_name)
    logging.info("Model registered")
    
    # # Create new model version
    # mv = client.create_model_version(
    #     model_name, 
    #     model_uri, 
    #     run_id, 
    #     description="Prophet model for item demand.") 
    # logging.info("Model version created")
    
    # Transition model to production
    client.transition_model_version_stage(
    name=model_details.name,
    version=model_details.version,
    stage='production',
    )
    logging.info("Model transitioned to prod stage")

2024-08-07 13:02:49,887 - root - INFO - Started MLFlow run
2024-08-07 13:02:50,088 - root - INFO - Transformed data
2024/08/07 13:02:50 INFO mlflow.tracking.fluent: Autologging successfully enabled for sklearn.
2024/08/07 13:02:50 INFO mlflow.tracking.fluent: Autologging successfully enabled for pyspark.
2024-08-07 13:02:50,287 - root - INFO - Splitting data
2024-08-07 13:02:50,291 - root - INFO - Data split
2024-08-07 13:02:50,293 - root - INFO - Training model
2024-08-07 13:02:50,370 - cmdstanpy - DEBUG - input tempfile: /var/folders/rj/64nmqpkj7j1dz_vk_9vdrqfw0000gn/T/tmpo4kt847b/clpmcykp.json
2024-08-07 13:02:50,397 - cmdstanpy - DEBUG - input tempfile: /var/folders/rj/64nmqpkj7j1dz_vk_9vdrqfw0000gn/T/tmpo4kt847b/zouv1_yo.json
2024-08-07 13:02:50,405 - cmdstanpy - DEBUG - idx 0
2024-08-07 13:02:50,406 - cmdstanpy - DEBUG - running CmdStan, num_threads: None
2024-08-07 13:02:50,412 - cmdstanpy - DEBUG - CmdStan args: ['/Users/mac-zhou/miniconda3/envs/mlflow-env/lib/python3.10/site-p

In [20]:
from pprint import pprint

all_experiments = client.search_experiments()
pprint(all_experiments)
default_experiment = [
    {"name": experiment.name, "lifecycle_stage": experiment.lifecycle_stage}
    for experiment in all_experiments
    if experiment.name == "Default"
][0]

pprint(default_experiment)

[<Experiment: artifact_location='mlflow-artifacts:/bc51a6fea6ff45bf8805494fddd9f863', creation_time=1723006903238, experiment_id='1', last_update_time=1723006903238, lifecycle_stage='active', name='prophet-retail-forecaster', tags={}>,
 <Experiment: artifact_location='mlflow-artifacts:/c4405e794d6c46929b8dbab6e1f66f13', creation_time=1723002134464, experiment_id='0', last_update_time=1723002134464, lifecycle_stage='active', name='Default', tags={}>]
{'lifecycle_stage': 'active', 'name': 'Default'}


In [22]:
# create a client to access the MLflow tracking server
client = mlflow.MlflowClient()

store_id = 4
model_name = f"prophet-retail-forecaster-store-"
# loop through all registered models
# NOTE: `filter_string` should be optional, but leaving it as `None` failed to work.
# Instead, using `"name LIKE '%'"` will match all model names
for model in client.search_registered_models(filter_string="name LIKE 'prophet-retail-forecaster-store-%'"):
    # loop through the latest versions for each stage of a registered model
    for model_version in model.latest_versions:
        print(f"name={model_version.name}; run_id={model_version.run_id}; version={model_version.version}, stage={model_version.current_stage}")

name=prophet-retail-forecaster-store-10; run_id=f8e289b11cac44d9b1650ab4c14368b9; version=2, stage=Production
name=prophet-retail-forecaster-store-3; run_id=8baecf4b244b48e48d6ca15477818ece; version=2, stage=Production
name=prophet-retail-forecaster-store-4; run_id=36177d20c70d4a51833d4bacf03e5d78; version=2, stage=Production


In [26]:
model_name = f"prophet-retail-forecaster-store-4"
latest_mv = client.get_latest_versions(model_name, stages=['Production'])[0]
model_production_uri = client.get_model_version_download_uri(name=model_name, version=latest_mv.version)

'2'

In [32]:
mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)

client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
def get_production_model(store_id:int):
    model_name = f"prophet-retail-forecaster-store-{store_id}"
    model =mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/production")
    return model



  latest = client.get_latest_versions(name, None if stage is None else [stage])


Downloading artifacts:   0%|          | 0/9 [00:00<?, ?it/s]

mlflow.pyfunc.loaded_model:
  artifact_path: model
  flavor: mlflow.prophet
  run_id: 36177d20c70d4a51833d4bacf03e5d78