In [1]:
# Core modules
import os
from io import StringIO
import json
from typing import Callable, List, Dict, Any, Tuple, Type
from dotenv import load_dotenv, find_dotenv
import pandas as pd
import numpy as np
import inspect
from datetime import datetime

# Data retrieval
import yfinance as yf

# Plotting modules
import matplotlib.pyplot as plt
import plotly.express as px
import hyperopt.plotting as hplt

# Sklearn ML
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, mean_squared_log_error, median_absolute_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import TimeSeriesSplit, train_test_split
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.base import RegressorMixin

# Keras DL
from keras.models import Sequential
from keras.layers import LSTM, Dense

# Hyperparameter optimisation
from hyperopt import STATUS_OK, fmin, tpe, Trials, hp
from functools import partial

# Cloud services
from azure.storage.blob import BlobServiceClient
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# Tracking
import mlflow
import mlflow.keras
import mlflow.sklearn
import mlflow.pyfunc
from mlflow.deployments import get_deploy_client

# data drift
from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

In [49]:
# downloading data
data = yf.download("^FTSE", start="1999-12-01", end="2021-12-01")

[*********************100%%**********************]  1 of 1 completed


In [50]:
data.tail(10)

Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2021-11-17,7327.0,7327.0,7290.0,7291.200195,7291.200195,980280900
2021-11-18,7291.200195,7291.299805,7242.0,7256.0,7256.0,590225200
2021-11-19,7256.0,7289.5,7198.799805,7223.600098,7223.600098,1174484200
2021-11-22,7223.600098,7272.600098,7207.799805,7255.5,7255.5,638191500
2021-11-23,7255.5,7293.100098,7204.399902,7266.700195,7266.700195,624614500
2021-11-24,7266.700195,7307.899902,7246.0,7286.299805,7286.299805,661514300
2021-11-25,7286.299805,7311.899902,7286.299805,7310.399902,7310.399902,487924100
2021-11-26,7310.399902,7310.399902,7042.100098,7044.0,7044.0,1714871800
2021-11-29,7044.0,7161.899902,7044.0,7110.0,7110.0,945061900
2021-11-30,7110.0,7110.0,6989.700195,7059.5,7059.5,1459578100


In [51]:
# dropping close, as it'ts essentially equal to target
data.drop('Close', axis=1, inplace=True)
data = data.astype(float)

# cehck for quality
print(data.isnull().sum())
print(data.eq(0).sum())
print(data.index.diff().value_counts().sort_index())

Open         0
High         0
Low          0
Adj Close    0
Volume       0
dtype: int64
Open         0
High         0
Low          0
Adj Close    0
Volume       0
dtype: int64
Date
1 days    4391
2 days      11
3 days    1039
4 days      75
5 days      41
Name: count, dtype: int64


In [None]:
data.head()

In [52]:
fig = px.line(data, x=data.index, y='Adj Close')
fig.update_layout(title={'text': 'Closing price over time', 'x': 0.5})
fig.show()

Part 1: Model training and experimentation

In [2]:
# connect to team ml workspace
ml_client = MLClient.from_config(credential=DefaultAzureCredential())

Found the config file in: .\config.json


In [3]:
# get and set tracking uri
mlflow_tracking_uri = ml_client.workspaces.get(ml_client.workspace_name).mlflow_tracking_uri
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow_tracking_uri

Request time out. Ingestion may be backed up. Retrying.


In [57]:
runname = "demo_lstm"

@MlflowLogger(run_name=runname, model_flavour='keras')
def train_lstm_model(df: pd.DataFrame,
                     test_size: int = 10,
                     loss: str = 'mean_squared_error',
                     activation: str = 'relu',
                     optimiser: str = 'adam',
                     n_epoch: int = 100,
                     batch_size: int = 8,
                     verbose: int = 1) -> Tuple[Sequential, np.ndarray, np.ndarray, List[float]]:
    
    """
    Train an LSTM model using the provided DataFrame. Perform inference on test

    Parameters:
    - df (pd.DataFrame): Input DataFrame with features and target ('Adj Close').
    - test_size (int; percentage): Size of the test set for time series split.
    - loss (str): Loss function for model training.
    - activation (str): Activation function for LSTM layer.
    - optimiser (str): Optimizer for model training.
    - n_epoch (int): Number of epochs for training.
    - batch_size (int): Batch size for training.
    - verbose (int): Verbosity mode for training.

    Returns:
    - lstm (Sequential): Trained LSTM model.
    - y_test (np.ndarray): True values of the target variable for the test set.
    - y_pred (np.ndarray): Predicted values of the target variable for the test set.
    - losses (List[float]): List of training losses.
    """

    # define featires, target
    X, y = df.drop('Adj Close', axis=1), data[['Adj Close']]

    # svale features
    X_scaled = MinMaxScaler().fit_transform(X)
    X = pd.DataFrame(data=X_scaled, columns=X.columns, index=X.index)

    #Building the LSTM Model
    lstm = Sequential()
    lstm.add(LSTM(32, input_shape=(1, X.shape[1]), activation=activation, return_sequences=False))
    lstm.add(Dense(1))
    lstm.compile(loss=loss, optimizer=optimiser)

    # generate training folds
    n_split = 100 // test_size - 1
    timesplit = TimeSeriesSplit(n_splits=n_split) # if n_split=3 (test_size=25): [0, 1]; [(0, 1), 2]; [(0, 1, 2), 3]

    losses = []
    for i, (train_idx, test_idx) in enumerate(timesplit.split(X)):
        
        print(f"\n\n---Training model batch {i+1} out of {n_split}---")
        # getting train and test data
        X_train = X[:len(train_idx)]
        X_test = X[len(train_idx): (len(train_idx)+len(test_idx))]
        y_train = y[:len(train_idx)].values.ravel()
        y_test = y[len(train_idx): (len(train_idx)+len(test_idx))].values.ravel()

        # reshapiong
        X_train = np.array(X_train).reshape(X_train.shape[0], 1, X_train.shape[1])

        # fitting, getting losses
        hist = lstm.fit(X_train, y_train, epochs=n_epoch, batch_size=batch_size, verbose=verbose)
        losses.extend(hist.history['loss'])
    
    X_test = np.array(X_test).reshape(X_test.shape[0], 1, X_test.shape[1])
    y_pred = lstm.predict(X_test)
    return lstm, y_test, y_pred, losses

In [56]:
class MlflowLogger:

    """
    A decorator class for logging Keras model metrics and other artifacts using MLflow.
    The wrapper function should be modified to fit your training function, as well
    as the associated static methods.
    """

    def __init__(self, run_name: str, model_flavour: str):
        """
        Initializes the MlflowLogger instance.

        Parameters:
        - run_name (str): The base name for the MLflow run. The experiment name/id and tracking uri should be configured beforehand
        - model_flavour: the name of the autolog library to use when logging.
        """

        current_dt = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        self.run_name = f"{run_name}_{current_dt}"
        self.autolog_func = self._set_autologging(model_flavour)

    
    def _set_autologging(self, model_flavour: str):
        """
        Sets autologging based on the specified library using a dictionary.

        Parameters:
        - model_flavour: the name of the autolog library to use when logging.
        """
        autolog_functions = {
            'fastai': mlflow.fastai.autolog,
            'gluon': mlflow.gluon.autolog,
            'keras': mlflow.keras.autolog,
            'lightgbm': mlflow.lightgbm.autolog,
            'pytorch': mlflow.pytorch.autolog,
            'scikit-learn': mlflow.sklearn.autolog,
            'spark': mlflow.spark.autolog,
            'statsmodels': mlflow.statsmodels.autolog,
            'xgboost': mlflow.xgboost.autolog,
        }

        autolog_function = autolog_functions.get(model_flavour.lower())
        if autolog_function is not None:
            return autolog_function
        else:
            raise ValueError(f"Unsupported autologging library: {model_flavour}")

    def __call__(self, func: Callable) -> Callable:
        """
        Decorator for a model training function
        Parameters:
        - func (Callable): The function to be decorated.
        Returns:
        - wrapper (Callable): The decorated function.

        The decorated function returns a tuple containing:
        - model: The trained Keras model.
        - x_test: The input features for testing.
        - y_test: The true output values for testing.
        - losses: The list of loss values over epochs during model training.
        This should be modified for your use case.
        """

        def wrapper(*args, **kwargs) -> Tuple[Sequential, np.ndarray, np.ndarray, List[float]]:

            with mlflow.start_run(run_name=self.run_name):

                # Automatically log model, parameters, etc...
                self.autolog_func()

                # Train model, get losses and predictions
                model, y_test, y_pred, losses = func(*args, **kwargs)

                # Log metrics
                metrics = self.get_metrics(y_pred, y_test)
                mlflow.log_metrics(metrics)

                # Log figures
                lossplt = self.plot_true_value_vs_prediction(y_pred, y_test)
                mlflow.log_figure(lossplt, "true_value_vs_prediction.png")

                epochplt = self.plot_loss_over_epoch(losses)
                mlflow.log_figure(epochplt, "loss_over_epochs.png")

                return model, y_test, y_pred, losses
        
        return wrapper

        
    @staticmethod
    def plot_true_value_vs_prediction(pred: np.ndarray, test: np.ndarray) -> plt.figure:
        fig, ax = plt.subplots()
        ax.plot(test, label='True Value')
        ax.plot(pred, label='LSTM Value')
        ax.set_title('Prediction by LSTM')
        ax.set_xlabel('Time Scale')
        ax.set_ylabel('Scaled USD')
        ax.legend()
        return fig

    @staticmethod
    def plot_loss_over_epoch(losses: List[float]) -> plt.figure:
        fig, ax = plt.subplots()
        ax.plot(losses)
        ax.set_title('Model Loss Over Epochs')
        ax.set_xlabel('Epoch')
        ax.set_ylabel('Loss')
        return fig
    
    @staticmethod
    def get_metrics(pred: np.ndarray, test: np.ndarray) -> Dict[str, float]:
        mse = mean_squared_error(test, pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(test, pred)
        r2 = r2_score(test, pred)
        msle = mean_squared_log_error(test, pred)
        medae = median_absolute_error(test, pred)
        return {"mse": mse, "rmse": rmse, "mae": mae, "r2": r2, "msle": msle, "medae": medae}

In [58]:
mlflow.set_experiment(experiment_name='ftse-demo-17/01/24')

2024/01/17 15:22:05 INFO mlflow.tracking.fluent: Experiment with name 'ftse-demo-17/01/24' does not exist. Creating a new experiment.


<Experiment: artifact_location='', creation_time=1705500286572, experiment_id='1486df82-7fd9-46a6-9b4d-645cfe2ade86', last_update_time=None, lifecycle_stage='active', name='ftse-demo-17/01/24', tags={}>

In [None]:
train_lstm_model(data)

In [None]:
# rf param space
rfspace = {
    "n_estimators": hp.uniform("n_estimators", 200, 1000),
    "max_depth": hp.quniform("max_depth", 10, 1200, 10),
    "min_samples_split": hp.uniform("min_samples_split", 0.1, 1.0),
    "min_samples_leaf": hp.uniform("min_samples_leaf", 0.1, 0.5),
    "max_features": hp.choice("max_features", options=[None, 'sqrt', 'log2']),
    "criterion": hp.choice("criterion", ['squared_error', 'poisson', 'absolute_error', 'friedman_mse'])
}

# gbm space
gbmspace = {
    'n_estimators': hp.quniform('n_estimators', 50, 200, 1),
    'learning_rate': hp.loguniform('learning_rate', -5, 0),
    'max_depth': hp.choice('max_depth', [None, hp.quniform('max_depth_val', 3, 10, 1)]),
    'min_samples_split': hp.uniform('min_samples_split', 0.1, 1.0),
    'min_samples_leaf': hp.uniform('min_samples_leaf', 0.1, 0.5),
    'subsample': hp.uniform('subsample', 0.5, 1.0),
    'max_features': hp.choice('max_features', ['sqrt', 'log2', None]),
}

# dtypes
rfdtypes = {"max_depth": lambda x: int(x), "n_estimators": lambda x: int(x)}
gbmdtypes = {"max_depth": lambda x: int(x) if x is not None else x, "n_estimators": lambda x: int(x)}

data = data.astype(float)


models = [RandomForestRegressor, GradientBoostingRegressor]
func_obj = zip(models, [rfspace, gbmspace], [rfdtypes, gbmdtypes])



In [None]:
def train_sklearn_regressor(hyperparams: Dict[str, Any],
                            X_train: pd.DataFrame,
                            X_test: pd.DataFrame,
                            y_train: pd.Series,
                            y_test: pd.Series,
                            model: Type[RegressorMixin],
                            dtypes: Dict[str, Callable]) -> Dict[str, Any]:
    
    """
    Train a scikit-learn regressor model, log hyperparameters, and evaluate metrics using MLflow.

    Parameters:
    - hyperparams (Dict[str, Any]): Dictionary of hyperparameters for the regressor model.
    - X_train (pd.DataFrame): Training features.
    - X_test (pd.DataFrame): Testing features.
    - y_train (pd.Series): Training target values.
    - y_test (pd.Series): Testing target values.
    - model (Type[RegressorMixin]): The scikit-learn regressor model class (e.g., RandomForestRegressor).
    - dtypes (Dict[str, Callable]): Dictionary mapping hyperparameter names to functions for datatype conversion.

    Returns:
    - result (Dict[str, Any]): A dictionary containing the status, loss (mean squared error), (for hyperopt to inform tis search) and run_id.
    """
    
    # map datatypes
    for k, v in hyperparams.items():
        if k in dtypes.keys():
            hyperparams[k] = dtypes[k](v)
    
    # create model instance
    model = model(**hyperparams)

    # define run name
    hypervals = '_'.join([f'{key}: {value}' for key, value in hyperparams.items()])
    run_name = f"{type(model).__name__}_{hypervals}"
    with mlflow.start_run(nested=True, run_name=run_name) as child_run:
        
        # enable autologging
        mlflow.sklearn.autolog()

        # fit, predict
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)

        # calculate and log metrics
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        msle = mean_squared_log_error(y_test, y_pred)
        medae = median_absolute_error(y_test, y_pred)

        mlflow.log_metrics({"mse": mse, "rmse": rmse, "mae": mae, "r2": r2, "msle": msle, "medae": medae})
    
        return {'status': STATUS_OK, 'loss': mse, "attachments": {"run_id": child_run.info.run_id}}


In [None]:

def search(hyperparam_space: Dict[str, Any],
           df: pd.DataFrame,
           target: str,
           model: Type[RegressorMixin],
           dtypes: Dict[str, Callable],
           max_evals: int = 100) -> Tuple[Trials, Dict[str, Any]]:
    
    """
    Perform hyperparameter search using Hyperopt for a regression model.

    Parameters:
    - hyperparam_space (Dict[str, Any]): Dictionary specifying the hyperparameter search space.
    - df (pd.DataFrame): Input DataFrame with features and target variable.
    - target (str): Name of the target variable.
    - model (Type[RegressorMixin]): sklearn Regressor model class to be trained and evaluated.
        - note this can be changed, as long as the corresponding train_sklearn_regressor is changed
    - dtypes (Dict[str, Callable]): Dictionary mapping feature names to functions for preprocessing data types.
    - max_evals (int): Maximum number of evaluations for the hyperparameter search. Default is 100.

    Returns:
    - trials (Trials): Hyperopt Trials object containing information about each evaluation.
    - best_params (Dict[str, Any]): Best hyperparameters found during the search.

    Notes:
    - Each iteration of hyperopt's search is stored as a child run of the parent run model.__name__
    - The 'best run's' data is logged in the parent run
    - The fn parameter should be specified elsewhere to represent your model training. See example for return format
    - Your external model training func should be wrapped in a functools.partial object so hyperopt knows what it can optimise with respect to
    """

    # get training and testing data
    X, y = df.drop(target, axis=1), df[target]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    with mlflow.start_run(nested=False, run_name=model.__name__):
        
        
        # perform search, store trials in Trials object
        trials = Trials()
        best_params = fmin(
            fn=partial(
                train_sklearn_regressor, X_train=X_train, X_test=X_test, y_train=y_train, y_test=y_test, model=model, dtypes=dtypes
                ),
            space=hyperparam_space,
            algo=tpe.suggest,
            trials=trials,
            max_evals=max_evals
        )

        # get best run
        best_run_id = trials.trial_attachments(trials.best_trial)["run_id"]

        # download artifacts
        client = mlflow.tracking.MlflowClient()
        models = client.download_artifacts(best_run_id, path="model")

        # log
        mlflow.log_param("best_run_id", best_run_id)
        mlflow.log_params({f"best_{p}": v for p, v in best_params.items()})
        mlflow.log_metric("best_mse", trials.best_trial["result"]["loss"])
        mlflow.log_artifacts(local_dir=models, artifact_path="model")

        # log other metrics from trainig function
        run = mlflow.get_run(best_run_id)
        mlflow.log_metrics(run.data.metrics)

        # plot and log loss histogram
        fig = plt.figure()
        fig.add_subplot()
        hplt.main_plot_histogram(trials, do_show=False)
        mlflow.log_figure(fig, "loss_histogram.png")

        return trials, best_params


In [None]:
for model, grid, dtype in func_obj:
    search(grid, data, 'Adj Close', model, dtype, max_evals=5)

In [None]:
# creating data store within workspace
load_dotenv(find_dotenv())
connection_string = os.environ.get('AZURE_STORAGE_CONNECTION_STRING')
container = os.environ.get('CONTAINER_NAME')

blob_service_client = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service_client.get_container_client(container)

main_blob_name = 'ftse100data.csv'

In [None]:
def save_data_to_blob(blob_name: str, df: pd.DataFrame, overwrite: bool = True) -> None:
    container_client.upload_blob(name=blob_name, data=df.to_csv(), overwrite=overwrite)

def read_data_from_blob(blob_name: str) -> pd.DataFrame:
    blob_client = container_client.get_blob_client(blob_name)
    data = blob_client.download_blob().readall().decode('utf-8')
    df = pd.read_csv(StringIO(data), parse_dates=['Date'])
    return df.set_index('Date')


save_data_to_blob(main_blob_name, data)
data = read_data_from_blob(main_blob_name)
data.tail()

In [59]:
# looking at experiments within the workspace
experiments = mlflow.search_experiments()
for exp in experiments:
    print(exp.name)



train_model_credit_default_prediction
mslearn-auto-training
mslearn-auto-inference
mslearn-bike-rental
mslearn-diabetes-training
mslearn-diabetes-inference
mslearn-penguin-training
diabetes-training
auto-ml-class-dev
mlflow-experiment-diabetes
pipeline_diabetes
batch-05191126317467
dataset_profile
mslearn-diabates-drift-Monitor-Runs
demo4
Default


In [60]:
# looking at runs for a particular experiment
exp = mlflow.get_experiment_by_name('mslearn-bike-rental')
mlflow.search_runs(exp.experiment_id)

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.explained_variance,metrics.normalized_median_absolute_error,metrics.median_absolute_error,metrics.root_mean_squared_error,...,tags.score_000,tags.model_explain_run,tags.mlflow.runName,tags.run_preprocessor_000,tags.iteration_000,tags.mlflow.source.type,tags.mlflow.parentRunId,tags.mlflow.source.name,tags.model_explanation,tags.model_explain_run_id
0,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,FINISHED,,2023-05-17 07:38:29.930000+00:00,2023-05-17 07:45:34.590000+00:00,0.832549,0.033129,112.904844,274.305927,...,0.08048882819856848;NaN;NaN;NaN;NaN;NaN;NaN;Na...,best_run,upbeat_grass_dk58hs52,MaxAbsScaler;;;;;;;;;MaxAbsScaler;;;;;;;;;;,0;7;13;6;14;17;16;15;9;1;2;18;12;19;8;5;11;4;3;10,,,,,
1,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_setup,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,FINISHED,,2023-05-17 07:40:57.094000+00:00,2023-05-17 07:42:37.759000+00:00,,,,,...,,,amiable_napa_x18490kl,,,JOB,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,setup_AutoML_cb7a19de-94bb-40e3-a109-6e2330f40...,,
2,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_fe...,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,FINISHED,,2023-05-17 07:42:38.489000+00:00,2023-05-17 07:44:33.999000+00:00,,,,,...,,,tender_brick_gl2rwrdn,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,
3,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_fe...,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,FINISHED,,2023-05-17 07:42:49.162000+00:00,2023-05-17 07:44:33.746000+00:00,,,,,...,,,ivory_cheetah_yyv0nyjj,,,JOB,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_fe...,featurize_AutoML_cb7a19de-94bb-40e3-a109-6e233...,,
4,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_0,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,FINISHED,,2023-05-17 07:44:55.867000+00:00,2023-05-17 07:45:23.949000+00:00,0.832549,0.033129,112.904844,274.305927,...,,,boring_feijoa_lbmtxkvv,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,True,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_Mo...
5,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_1,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,KILLED,,2023-05-17 07:45:24.443000+00:00,2023-05-17 07:45:26.482000+00:00,,,,,...,,,musing_basket_smplpmxf,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,
6,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_2,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,KILLED,,2023-05-17 07:45:26.618000+00:00,2023-05-17 07:45:26.618000+00:00,,,,,...,,,mango_bird_b9t7wh7c,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,
7,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_3,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,KILLED,,2023-05-17 07:45:27.885000+00:00,2023-05-17 07:45:27.885000+00:00,,,,,...,,,musing_rocket_hcwcdtg1,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,
8,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_4,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,KILLED,,2023-05-17 07:45:27.746000+00:00,2023-05-17 07:45:27.746000+00:00,,,,,...,,,nifty_tent_6w0jb5mh,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,
9,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_5,f4d5ac82-33a3-4228-8f3c-bfdbe351e81e,KILLED,,2023-05-17 07:45:27.340000+00:00,2023-05-17 07:45:27.340000+00:00,,,,,...,,,patient_flower_xxclq9j4,,,,AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e,,,


Step 2: registering the model

In [61]:
client = mlflow.tracking.MlflowClient()
for model in client.search_registered_models():
    print(f"{model.name}")

azureml_boring_cassava_smcw7bv63h_output_mlflow_log_model_548350309
azureml_boring_cassava_smcw7bv63h_output_mlflow_log_model_1700531275
credit_defaults_model
amlstudio-predict-auto-price
azureml_AutoML_cb7a19de-94bb-40e3-a109-6e2330f4023e_0_output_mlflow_log_model_805459951
AutoMLcb7a19de90
amlstudio-predict-diabetes
azureml_5a48cbc5-ac10-4512-bb9f-4d6ac06c4468_output_mlflow_log_model_503845975
azureml_loving_machine_3r8228hz83_output_mlflow_log_model_503845975
diabetes-mlflow
azureml_9f297f68-d9fa-4714-a98b-6546413102d5_output_mlflow_log_model_1899327965
azureml_9f297f68-d9fa-4714-a98b-6546413102d5_output_mlflow_log_model_1938168045
azureml_6651ba31-eeac-4b4d-a021-39659af5ba27_output_mlflow_log_model_613564256
azureml_e84dddfe-1b66-4fa8-a8a0-c1c720cceed4_output_mlflow_log_model_613564256
azureml_e0299110-ad86-4156-9a20-575cf5c8730d_output_mlflow_log_model_613564256
azureml_e01e8cb3-4cc6-4d0e-8afd-ecc644103688_output_mlflow_log_model_613564256
azureml_d947dee0-32e9-4bbf-bffc-4030fca49

In [62]:
diab_model = mlflow.pyfunc.load_model(f"models:/diabetes-mlflow/None")
inspect.getmembers(diab_model, predicate=inspect.ismethod)


``mlflow.tracking.client.MlflowClient.get_latest_versions`` is deprecated since 2.9.0. Model registry stages will be removed in a future major release. To learn more about the deprecation of model registry stages, see our migration guide here: https://mlflow.org/docs/2.9.2/model-registry.html#migrating-from-stages

Downloading artifacts: 100%|██████████| 5/5 [00:00<00:00,  8.84it/s]
 - cloudpickle (current: 3.0.0, required: cloudpickle==2.2.0)
 - psutil (current: 5.9.7, required: psutil==5.8.0)
 - scikit-learn (current: 1.3.2, required: scikit-learn==0.24.1)
To fix the mismatches, call `mlflow.pyfunc.get_model_dependencies(model_uri)` to fetch the model's environment and install dependencies using the resulting environment file.

Trying to unpickle estimator LogisticRegression from version 0.24.1 when using version 1.3.2. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:
https://scikit-learn.org/stable/model_persistence.html#secu

[('__eq__',
  <bound method PyFuncModel.__eq__ of mlflow.pyfunc.loaded_model:
    artifact_path: model
    flavor: mlflow.sklearn
    run_id: 36b85079-2e17-43da-b4b4-775944b9a2d3
  >),
 ('__init__',
  <bound method PyFuncModel.__init__ of mlflow.pyfunc.loaded_model:
    artifact_path: model
    flavor: mlflow.sklearn
    run_id: 36b85079-2e17-43da-b4b4-775944b9a2d3
  >),
 ('__repr__',
  <bound method PyFuncModel.__repr__ of mlflow.pyfunc.loaded_model:
    artifact_path: model
    flavor: mlflow.sklearn
    run_id: 36b85079-2e17-43da-b4b4-775944b9a2d3
  >),
 ('_predict_fn',
  <bound method _SklearnModelWrapper.predict of <mlflow.sklearn._SklearnModelWrapper object at 0x0000010ADD0D3E90>>),
 ('predict',
  <bound method PyFuncModel.predict of mlflow.pyfunc.loaded_model:
    artifact_path: model
    flavor: mlflow.sklearn
    run_id: 36b85079-2e17-43da-b4b4-775944b9a2d3
  >),
 ('unwrap_python_model',
  <bound method PyFuncModel.unwrap_python_model of mlflow.pyfunc.loaded_model:
    artifac

In [63]:
client.search_model_versions(f"name='lstm'")

[<ModelVersion: aliases=[], creation_timestamp=1705416370772, current_stage='Production', description='', last_updated_timestamp=1705502815871, name='lstm', run_id='5727663c-084e-4c18-a007-b8d69d67691d', run_link='', source='azureml://uksouth.api.azureml.ms/mlflow/v2.0/subscriptions/b501a57e-71d5-4887-b72c-a0c961a0f281/resourceGroups/uk-environment/providers/Microsoft.MachineLearningServices/workspaces/daadspocs/experiments/d2f4cd6e-b006-4000-9b48-2f1916d8f10a/runs/5727663c-084e-4c18-a007-b8d69d67691d/artifacts/model', status='READY', status_message='', tags={}, user_id='', version='2'>,
 <ModelVersion: aliases=[], creation_timestamp=1705341048135, current_stage='Staging', description='', last_updated_timestamp=1705398650887, name='lstm', run_id='5727663c-084e-4c18-a007-b8d69d67691d', run_link='', source='azureml://uksouth.api.azureml.ms/mlflow/v2.0/subscriptions/b501a57e-71d5-4887-b72c-a0c961a0f281/resourceGroups/uk-environment/providers/Microsoft.MachineLearningServices/workspaces/da

In [64]:
client.transition_model_version_stage("lstm", version=1, stage="Staging")


``mlflow.tracking.client.MlflowClient.transition_model_version_stage`` is deprecated since 2.9.0. Model registry stages will be removed in a future major release. To learn more about the deprecation of model registry stages, see our migration guide here: https://mlflow.org/docs/2.9.2/model-registry.html#migrating-from-stages



<ModelVersion: aliases=[], creation_timestamp=1705341048135, current_stage='Staging', description='', last_updated_timestamp=1705505740172, name='lstm', run_id='5727663c-084e-4c18-a007-b8d69d67691d', run_link='', source='azureml://uksouth.api.azureml.ms/mlflow/v2.0/subscriptions/b501a57e-71d5-4887-b72c-a0c961a0f281/resourceGroups/uk-environment/providers/Microsoft.MachineLearningServices/workspaces/daadspocs/experiments/d2f4cd6e-b006-4000-9b48-2f1916d8f10a/runs/5727663c-084e-4c18-a007-b8d69d67691d/artifacts/model', status='READY', status_message='', tags={}, user_id='', version='1'>

step 3: model deployment

In [65]:
# geenrate model version stage
random_arr = np.random.randint(low=0, high=10, size=20)
suffix = "".join(random_arr.astype(str))
endpoint_name = f"ftse100demo-{suffix}"
endpoint_name

'ftse100demo-65252619053028174895'

In [None]:
deployment_client = get_deploy_client(mlflow.get_tracking_uri())

In [66]:
load_dotenv('.env')
ep_name = os.environ.get('ENDPOINT_NAME')
deployment_name = os.environ.get('DEPLOYMENT_NAME')
print(deployment_name, ep_name)

ftse100demo ftse100demo-12132267894507554523


In [None]:
endpoint = deployment_client.create_endpoint(ep_name)

In [67]:
scoring_uri = deployment_client.get_endpoint(endpoint=ep_name)["properties"][
    "scoringUri"
]
print(scoring_uri)

https://ftse100demo-12132267894507554523.uksouth.inference.ml.azure.com/score


In [68]:
deploy_config_filepath = os.environ.get('DEPLOYMENT_CONFIG')
deploy_config = {
    "instance_type": "Standard_DS2_v2",
    "instance_count": 1,
}
with open(deploy_config_filepath, "w") as j:
    j.write(json.dumps(deploy_config))

In [69]:
traffic_config_filepath = os.environ.get('TRAFFIC_CONFIG')
traffic_config = {
    "traffic": {deployment_name: 100}
}

with open("traffic_config.json", "w") as j:
    j.write(json.dumps(traffic_config))

In [None]:
dep = deployment_client.create_deployment(
    name=deployment_name,
    endpoint=ep_name,
    model_uri="models:/lstm/Production",
    config={
        "deploy-config-file": "deployment_config.json",
        "endpoint-config-file": "traffic_config.json"
    },
)

In [70]:
newdata = yf.download("^FTSE", start="2021-12-02")
newdata = newdata.drop(newdata.tail(1).index) # as no volume for current day
newdata = newdata.drop('Close', axis=1)
newdata

[*********************100%%**********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,Low,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
2021-12-02,7168.700195,7168.700195,7083.200195,7129.200195,724509800
2021-12-03,7129.200195,7196.100098,7105.299805,7122.299805,867111100
2021-12-06,7122.299805,7246.299805,7122.299805,7232.299805,637274000
2021-12-07,7232.299805,7344.700195,7232.299805,7339.899902,783615400
2021-12-08,7339.899902,7378.899902,7333.600098,7337.399902,776663000
...,...,...,...,...,...
2024-01-10,7684.000000,7684.000000,7647.399902,7651.799805,668838800
2024-01-11,7651.799805,7693.899902,7576.600098,7576.600098,1306895000
2024-01-12,7576.600098,7655.200195,7576.600098,7624.899902,794125500
2024-01-15,7624.899902,7637.799805,7578.299805,7594.899902,740769500


In [71]:
# generating sample to post request
sample = newdata.head(30).drop('Adj Close', axis=1)
json_sample = sample.to_json(orient="split", index=False)
json_sample

'{"columns":["Open","High","Low","Volume"],"data":[[7168.7001953125,7168.7001953125,7083.2001953125,724509800],[7129.2001953125,7196.1000976562,7105.2998046875,867111100],[7122.2998046875,7246.2998046875,7122.2998046875,637274000],[7232.2998046875,7344.7001953125,7232.2998046875,783615400],[7339.8999023438,7378.8999023438,7333.6000976562,776663000],[7337.1000976562,7366.1000976562,7303.0,615185100],[7321.2998046875,7331.6000976562,7284.7998046875,533854300],[7291.7998046875,7309.2001953125,7226.6000976562,1173948800],[7231.3999023438,7284.5,7218.6000976562,662325900],[7218.6000976562,7224.8999023438,7166.7998046875,813609300],[7170.7998046875,7273.2998046875,7170.7998046875,1155236100],[7260.6000976562,7286.5,7236.2001953125,1477578800],[7269.8999023438,7269.8999023438,7101.0,925062200],[7198.0,7297.3999023438,7198.0,683669700],[7297.3999023438,7341.7001953125,7273.5,514772900],[7341.7001953125,7385.3999023438,7331.1000976562,422671500],[7373.2998046875,7403.7001953125,7368.7998046875,

In [None]:
sample

In [72]:
deployment_client.predict(endpoint=endpoint_name, df=sample)

AttributeError: 'NoneType' object has no attribute 'get'

In [None]:
#update and save data
data_update = pd.concat([data, newdata])
save_data_to_blob(main_blob_name, data_update)

In [None]:
def eval_feature_drift(reference_data: pd.DataFrame,
                       new_data: pd.DataFrame,
                       col_mapping: ColumnMapping) -> Dict[str, float]:
    
    """
    Evaluate feature drift between reference data and new data using Evidently

    Parameters:
    - reference_data (pd.DataFrame): The reference dataset for comparison.
    - new_data (pd.DataFrame): The new dataset for comparison.
    - col_mapping (ColumnMapping): An instance of ColumnMapping class specifying mapping between columns.

    Returns:
    Tuple[Dict[str, float], Report]: A tuple containing:
        - A dictionary with feature drift scores, where keys are feature names (including 'drift_share').
        - The Report object containing detailed drift analysis results.

    The function initializes a Data Drift report with a DataDriftPreset and runs it on the provided
    reference and new datasets using the specified column mapping. It extracts relevant drift scores
    from the report and returns them in a dictionary along with the full Report object.
    """

    # create report, dict of important metrics
    report = Report(metrics=[DataDriftPreset()])
    report.run(reference_data=reference_data, current_data=new_data, column_mapping=col_mapping)
    report_dict = report.as_dict()

    drifts = {}

    num_features = col_mapping.numerical_features or [] #otherwise iterate over None
    cat_features = col_mapping.categorical_features or []

    # fetch feature drifts
    for feature in num_features + cat_features:
        drifts[feature] = report_dict["metrics"][1]["result"]["drift_by_columns"][feature]["drift_score"]

    # add drift share
    drifts['drift_share'] = report_dict["metrics"][0]["result"]["drift_share"]
    return drifts, report

In [None]:
def log_drifts(old_data: pd.DataFrame,
               new_data: pd.DataFrame,
               target: str,
               numerical_features: List[str] = [],
               categorical_features: List[str] = [],
               n_batch: int = 2) -> None:
    
    """
    Log data drift analysis for a time series dataset using MLflow.

    Parameters:
    - old_data (pd.DataFrame): The reference dataset for comparison.
    - new_data (pd.DataFrame): The new dataset for comparison.
    - target (str): The target column for the data drift analysis.
    - numerical_features (List[str], optional): List of numerical features for drift analysis.
    - categorical_features (List[str], optional): List of categorical features for drift analysis.
    - n_batch (int, optional): Number of batches to split the new data for incremental analysis.

    Returns:
    None
    """
    
    if not numerical_features and not categorical_features:
        raise AttributeError("Features must be specified")
    
    # create a ColumnMapping object for drift analysis
    colmap = ColumnMapping()
    if numerical_features: colmap.numerical_features = numerical_features
    if categorical_features: colmap.categorical_features = categorical_features
    colmap.target = target

    # define the time range for batches
    start, end = new_data.index.min(), new_data.index.max()
    batch_idx = pd.date_range(start=start, end=end, periods=n_batch + 1)

    # start a top-level mlflow run for the entire analysis
    with mlflow.start_run(run_name=f"data_drift_{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"):

        mlflow.log_param("new_data_start", start)
        mlflow.log_param("new_data_end", end)

        # iterate over batches
        for idx, (sdate, edate) in enumerate(zip(batch_idx, batch_idx[1:])):

            # extract the batch from the new data
            batch = new_data[(sdate <= new_data.index) & (new_data.index < edate)]

            # start a nested run for each batch
            with mlflow.start_run(nested=True, run_name=f"Batch {idx + 1}"):
                mlflow.log_param("batch_start", sdate)
                mlflow.log_param("batch_end", edate - pd.Timedelta(days=1))

                # perform drift analysis on the batch
                batch_drifts, batch_report = eval_feature_drift(old_data, batch, colmap)

                # log metrics and html report
                batch_report.save_html(f'test{idx}.html')
                mlflow.log_metrics(batch_drifts)
                mlflow.log_artifact(f'test{idx}.html', artifact_path='reports')
                os.remove(f"test{idx}.html")

        # perform drift analysis on the entire dataset
        drifts, report = eval_feature_drift(old_data, new_data, colmap)

        # log metrics and html report
        report.save_html('main.html')
        mlflow.log_metrics(drifts)
        mlflow.log_artifact('main.html', artifact_path='reports')
        os.remove('main.html')
    return

In [None]:
log_drifts(data, newdata, 'Adj Close', numerical_features=['Open', 'High', 'Low', 'Volume'])

In [73]:
#!!
deployment_client.delete_deployment(os.environ.get('ENDPOINT_NAME'))