In [None]:
import os
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_datasets as tfds
from dotenv import load_dotenv
import mlflow

  from .autonotebook import tqdm as notebook_tqdm


## MLFlow
MLFLow has 4 different applications: Tracking, Registry, Models, Projects. In this script, we'll demo all 4 via a local environment. 
1. Tracking: we will log hyper-parameters used and metrics for a transfer-learning model tuned to MNIST.  
2. Registry: after our tuned model is identified, we will register our model. 
3. Models: a stored model must be served to deliver value. Models will be leveraged for inference. 
4. Projects: DS code is packaged to reproduce runs on any platform/machine via Projects.

In [17]:
# mlflow tracking server
os.environ['MLFLOW_TRACKING_URI'] = "http://0.0.0.0:5000"
# mlflow artifact/model store
os.environ['MLFLOW_S3_ENDPOINT_URL'] = "http://localhost:9000"
os.environ['AWS_ACCESS_KEY_ID'] = 'minio_user'
os.environ['AWS_SECRET_ACCESS_KEY'] = "minio_pass"
# optuna hyperparam stores
os.environ["DB_USER"] = 'postgres'
os.environ["DB_PASSWORD"] = 'postgres_pw'
os.environ["OPTUNA_DB_NAME"] = 'optunadb'

Experiment already exists, setting experiment to mnist-hyperparam-local
---------------------
Experiment details are:
Name: mnist-hyperparam-local
Experiment_id: 3
Artifact Location: s3://mlflow/3
Creation timestamp: 1688063866244


### First we'll build all the functions for our ML pipeline.
This'll include functions for data loading, preprocessing, and model training.

In [None]:
def load_tensorflow_dataset_training(dataset_str: str):
    (xy_train, xy_test), ds_info = tfds.load(
        dataset_str,
        split=['train', 'test'], shuffle_files=True,
        as_supervised=True,
        with_info=True,
    )
    return (xy_train, xy_test)

In [None]:
def preprocess_mnist_tfds(image, label=None):
    # reshape and upsample to 3 channel for transfer learning models
    # ... for when no channel information is present
    if len(image.shape) != 3:
        image = np.dstack((image, image, image))
    # ... for when channel is only 1 dimension
    if image.shape[2] == 1:
        image = tf.image.grayscale_to_rgb(image)
    # normalize pixel values
    image = tf.cast(image, tf.float32) / 255.
    # resize with pad for mobilenetv2
    image = tf.image.resize_with_pad(image, target_height=224, target_width=224)
    return image, label

In [None]:
class MNIST(mlflow.pyfunc.PythonModel): 
    def __init__(self, mlflow_registered_model_name: str = None):
        self._model = None
        self._mlflow_registered_model_name = mlflow_registered_model_name
        self.load()    
    @staticmethod
    def _build(self, hyperparameters):
        ## Build model
        # class names for mnist hardcoded
        class_names = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
        # set layer regularization for DNN
        regularizer = tf.keras.regularizers.l1_l2(hyperparameters['l1'], hyperparameters['l2'])

        # load in mobilenetv2 weights and instantiate dense classification head 
        base_model = "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4"
        layers = [
            hub.KerasLayer(
                base_model,
                input_shape=(224, 224, 3),
                trainable=False,
                name='mobilenet_embedding'),
            tf.keras.layers.Dense(hyperparameters['num_hidden'],
                                  kernel_regularizer=regularizer,
                                  activation='relu',
                                  name='dense_hidden'),
            tf.keras.layers.Dense(len(class_names),
                                  kernel_regularizer=regularizer,
                                  activation='softmax',
                                  name='mnist_prob')
        ]

        self._model = tf.keras.Sequential(layers, name='mnist-classification')

        # compile model 
        self._model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hyperparameters['learning_rate']),
                            loss=tf.keras.losses.SparseCategoricalCrossentropy(
                            from_logits=False),
                            metrics=['accuracy'])
        
        # base model logging
        self._model_base = base_model

    def fit_hp_search(self, xy_train, xy_test, hyperparameters):                      
        self._build(self, hyperparameters)
        # fit model using train/test split to find hyperparams
        self._train_history = self._model.fit(xy_train,
                                               epochs=hyperparameters['epochs'],
                                               validation_data=xy_test)
    
    def fit_production(self, xy_train, hyperparameters):                      
        self._build(self, hyperparameters)
        # fit model using all the data 
        self._train_history = self._model.fit(xy_train,
                                               epochs=hyperparameters['epochs'])
        
    def load(self):
        try:
            results = mlflow.search_registered_models(
                filter_string=f'name = "{self._mlflow_registered_model_name}"')
            latest_model_details = results[0].latest_versions[0]
            self._model = mlflow.pyfunc.load_model(
                model_uri=f'{latest_model_details.source}')
        except IndexError:
            print('No models found.')
            self._model = None
            return self
        
    def predict(self, context, model_input: np.ndarray) -> np.ndarray:
        image, _ = preprocess_mnist_tfds(model_input)
        image = tf.reshape(image, [1, 224, 224, 3])
        return self._model.predict(image).argmax()

### Tracking your model
#### MLFlow has two levels for organizing projects:
1. At the top level we have "experiments". These should be named as "project-task-version" (e.g. mnist-classification)
2. At the lower level we have "runs". A run consists of logging hyperparameters and metrics when models are trained. Multiple runs can be stored within an experiment. 

In [None]:
# mlflow Tracking requires definition of experiment name AND logged params
# Experiment names they should be defined as "project-task-version"

def set_mlflow_experiment(experiment_name:str, artifact_location: str = None):
    try:
        experiment_id = mlflow.create_experiment(experiment_name, 
                                                 artifact_location=artifact_location)
    # except mlflow.exceptions.MlflowException as e:
    #   if str(e) == f"Experiment '{experiment_name}' already exists.":
    except:
        print(f'Experiment already exists, setting experiment to {experiment_name}')
        experiment_info = mlflow.set_experiment(experiment_name)
        experiment_id = experiment_info.experiment_id
    experiment = mlflow.get_experiment(experiment_id)
    print("---------------------")
    print('Experiment details are:')
    print("Name: {}".format(experiment.name))
    print("Experiment_id: {}".format(experiment.experiment_id))
    print("Artifact Location: {}".format(experiment.artifact_location))
    print("Creation timestamp: {}".format(experiment.creation_time))
    return experiment_id

experiment_name = "mnist-classification-notebook"
experiment_id = set_mlflow_experiment(experiment_name=experiment_name)

In [None]:
# log a base model 
hyperparams = {
    'learning_rate': 0.01,
    'l1': 0.0,
    'l2': 0.0, 
    'num_hidden': 16,
    'epochs': 3}

# Good practice to explicitly define experiment_id and run_name. 
# Experiment_id can be extracted from above. 
# Run name examples (e.g. Linear Regression Default Hyperparams).
mlflow_run_name='base_model'
with mlflow.start_run(experiment_id=experiment_id, 
                      run_name=mlflow_run_name) as run:
    # You can set autolog for tensorflow model.
    # Note that autolog does not allow logging of any additional params and metrics.
    # We'll choose to do manual logging.
    # mlflow.tensorflow.autolog()
    # preprocess and define batch sizes for tensorflow 
    ds_train, ds_test = load_tensorflow_dataset_training('mnist')
    model = MNIST()
    ds_train = ds_train.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
    ds_train = ds_train.batch(128)
    ds_test = ds_test.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
    ds_test = ds_test.batch(128) 

    model.fit(ds_train, ds_test, hyperparams)

    # MLFlow Tracking parameters
    mlflow.log_params(params=hyperparams)
    
    # MLFlow Tracking metrics 
    # Logging metrics for each epoch (housed in dictionary)
    training_history = model._train_history.history
    for epoch in range(0, hyperparams['epochs']):
        insert = {}
        for metric, value in training_history.items():
            insert[metric] = training_history[metric][epoch]
        mlflow.log_metrics(metrics=insert, step=epoch+1)

    # MLFlow tracking artifact (e.g. model file)
    # this will log the model and all its details under run_id/artifacts
    mlflow.pyfunc.log_model(python_model=model,
                            artifact_path="")

    # Close out MLFlow run to prevent any log contamination.
    mlflow.end_run(status='FINISHED')

In [None]:
# hyperparameters search using Optuna
# all this code is stashed under the train-pipeline.py function
def objective(trial): 
    """
    Optuna objective function for tuning transfer learning model
    """
    hyperparams = {
        'learning_rate': trial.suggest_float('learning_rate', 0.00001, 0.1, log=True),
        'l1': trial.suggest_float('l1', 0.0, 0.05),
        'l2': trial.suggest_float('l2', 0.0, 0.05),
        'num_hidden': trial.suggest_int('num_hidden', 8, 64),
        'epochs': trial.suggest_int('epochs', 1, 3)
    }

    mnist_model.fit_hp_search(ds_train, ds_test, hyperparams)
    training_history = mnist_model._train_history.history
    validation_accuracy = training_history['val_accuracy'][-1]
    return validation_accuracy

# define optuna variables
optuna_storage_url="postgresql://{}:{}@localhost:5433/{}".format(
            os.environ["DB_USER"],
            os.environ["DB_PASSWORD"],
            os.environ["OPTUNA_DB_NAME"]
        )

# create or load optuna study
try:
    print('loading study...')
    study = optuna.load_study(
        study_name=experiment_name,
        storage=optuna_storage_url,
    )  
except KeyError:
    print('no study found. building from scratch...')
    study = optuna.create_study(
        study_name=experiment_name,
        storage=optuna_storage_url,
        pruner=optuna.pruners.HyperbandPruner(),
        direction='maximize')

# preprocess and define batch sizes for tensorflow 
ds_train, ds_test = load_tensorflow_dataset_training('mnist')
ds_train = ds_train.map(preprocess_mnist_tfds, 
                        num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(128)
ds_test = ds_test.map(preprocess_mnist_tfds, 
                        num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128) 

mnist_model = model.MNIST()

# a new experiment name will be created in MLFlow using the Optuna study name
mlflow_kwargs = {'experiment_id': experiment_id}
study.optimize(objective,
                n_trials=2,
                n_jobs=2,
                callbacks=[MLflowCallback(metric_name="val_accuracy",
                                            create_experiment=False,
                                            mlflow_kwargs=mlflow_kwargs)]
                    )

In [None]:
study = optuna.load_study(
    study_name=optuna_study_name,
    storage=optuna_storage_url,
)  
hyperparameters = study.best_params

# log the hyper parameter tuned model 
# No need to set experiment ID if you've set above. 
# Set a run name here(e.g. Linear Regression Default Hyperparams).
mlflow_run_name='optuna_hyperparameter_tuned'
with mlflow.start_run(experiment_id=experiment_id, 
                      run_name=mlflow_run_name) as run:
    # load the full dataset for training in production
    ds_train = load_tensorflow_dataset_production('mnist')
    ds_train = ds_train.map(preprocess_mnist_tfds, 
                            num_parallel_calls=tf.data.AUTOTUNE)
    ds_train = ds_train.batch(128)

    # fit model without validation since we're using all the data
    model.fit_production(xy_train=ds_train,
                        hyperparameters=hyperparameters)

    # MLFlow Tracking parameters
    mlflow.log_params(params=hyperparameters)
    
    # MLFlow Tracking metrics 
    # Logging metrics for each epoch (housed in dictionary)
    training_history = model._train_history.history
    for epoch in range(0, hyperparams['epochs']):
        insert = {}
        for metric, value in training_history.items():
            insert[metric] = training_history[metric][epoch]
        mlflow.log_metrics(metrics=insert, step=epoch+1)

    # MLFlow tracking artifact (e.g. model file)
    # if you want to bake in model registration instead of using the below:
    # use the additional parameter of "registered_model_name=experiment_name"
    mlflow.pyfunc.log_model(python_model=mnist_model,
                            artifact_path="")

    mlflow.end_run(status='FINISHED')

### Registering your model
Once you've run a couple experiments (e.g. hyper parameter tuning) we can select the best model and register that. Model registering just means that we decide this our optimized model to save. 

In [None]:
# search runs 
run = mlflow.search_runs(
    experiment_names=[experiment_name],
    max_results=5,
    order_by=["metrics.val_accuracy DESC"],
)

run

In [None]:
# best performing model run_id. This'll be used to register our model
best_run_id = run['run_id'][0] 

# register model after a scanning a couple different runs from your experiment 
model_name = experiment_name
# mlflow.tensorflow.log_model
mv = mlflow.register_model(model_uri=f"runs:/{best_run_id}/",
                           name=model_name)


### Serving model
After we've registered our model, we can now test inference for that model. 

In [None]:
# load using artifact path directory
results = mlflow.search_registered_models(filter_string=f'name = "{experiment_name}"')
latest_model_details = results[0].latest_versions[0]
model = mlflow.pyfunc.load_model(model_uri=f'{latest_model_details.source}')

In [43]:
# randomly sample and load a MNIST JPEG
import os 
import numpy as np
from random import sample
from PIL import Image
fp = '/Users/jcheung/Documents/GitHub/thin-ML-deployment/app/ml/test_images'
files = [f'{fp}/{x}' for x in os.listdir(fp) if x.split('.')[-1] == 'jpg']
filename = sample(files, 1)[0]
image = np.array(Image.open(filename))

# predict using custom mlflow model
predicted = model.predict(image)

# output and compare results
true = filename.split('/')[-1].split('_')[-1][0]

print(f'True:{true} and predicted:{predicted}')

True:2 and predicted:8
