In [1]:
import os
import time
import numpy as np
import tensorflow as tf
import tensorflow_hub as hub
import tensorflow_datasets as tfds
import mlflow

  from .autonotebook import tqdm as notebook_tqdm


## MLFlow
MLFLow has 4 different applications: Tracking, Registry, Models, Projects. In this script, we'll demo all 4 via a local environment. 
1. Tracking: we will log hyper-parameters used and metrics for a transfer-learning model tuned to MNIST.  
2. Registry: after our tuned model is identified, we will register our model. 
3. Models: a stored model must be served to deliver value. Models will be leveraged for inference. 
4. Projects: DS code is packaged to reproduce runs on any platform/machine via Projects.

In [2]:
mlflow.get_tracking_uri()  
# mlflow.set_tracking_uri('http://mlflow-server.kubeflow.svc.cluster.local:5000')  
# mlflow.set_registry_uri('http://minio.kubeflow.svc.cluster.local:9000')  

'file:///Users/jcheung/Documents/GitHub/bookshelf/deployment_demos/mlruns'

### First we'll build all the functions for our ML pipeline.
This'll include functions for data loading, preprocessing, and model training.

In [3]:
def load_mnist_tfds():
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
    return (x_train, y_train), (x_test, y_test)

In [4]:
def preprocess_mnist_tfds(image, label=None):
    # reshape and upsample to 3 channel for transfer learning models
    # ... for when no channel information is present
    if len(image.shape) != 3:
        image = np.dstack((image, image, image))
    # ... for when channel is only 1 dimension
    if image.shape[2] == 1:
        image = tf.image.grayscale_to_rgb(image)
    # normalize pixel values
    image = tf.cast(image, tf.float32) / 255.
    # resize with pad for mobilenetv2
    image = tf.image.resize_with_pad(image, target_height=224, target_width=224)
    return image, label

In [84]:
class MNIST(mlflow.pyfunc.PythonModel):     
    def fit(self, xy_tuple_train, xy_tuple_test, hyperparameters):
        ## Build model
        # class names for mnist hardcoded
        class_names = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
    
        # set layer regularization for DNN
        regularizer = tf.keras.regularizers.l1_l2(hyperparameters['l1'], hyperparameters['l2'])

        # load in mobilenetv2 weights and instantiate dense classification head 
        base_model = "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4"
        layers = [
            hub.KerasLayer(
                base_model,
                input_shape=(224, 224, 3),
                trainable=False,
                name='mobilenet_embedding'),
            tf.keras.layers.Dense(hyperparameters['num_hidden'],
                                  kernel_regularizer=regularizer,
                                  activation='relu',
                                  name='dense_hidden'),
            tf.keras.layers.Dense(len(class_names),
                                  kernel_regularizer=regularizer,
                                  activation='softmax',
                                  name='mnist_prob')
        ]

        self._model = tf.keras.Sequential(layers, name='mnist-classification')

        # compile model 
        self._model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hyperparams['learning_rate']),
                            loss=tf.keras.losses.SparseCategoricalCrossentropy(
                            from_logits=False),
                            metrics=['accuracy'])
                      
        ## Fit model
        # fit model and save history to model store
        self._train_history = self._model.fit(xy_tuple_train, epochs=hyperparameters['epochs'], validation_data=xy_tuple_test)
        self._model_base = base_model
        
    def predict(self, context, model_input: np.ndarray) -> np.ndarray:
        image, _ = preprocess_mnist_tfds(model_input)
        image = tf.reshape(image, [1, 224, 224, 3])
        return self._model.predict(image).argmax()


                            


### Tracking your model
#### MLFlow has two levels for organizing projects:
1. At the top level we have "experiments". These should be named as "project-task-version" (e.g. mnist-classification)
2. At the lower level we have "runs". A run consists of logging hyperparameters and metrics when models are trained. Multiple runs can be stored within an experiment. 

In [85]:
# mlflow Tracking requires definition of experiment name AND logged params
# Experiment names they should be defined as "project-task-version"
experiment_name = "mnist-classification"

try:
    experiment_id = mlflow.create_experiment(
        experiment_name,
        tags={"version": "v0.1"},
    )
except mlflow.exceptions.MlflowException as e: 
    if str(e) == f"Experiment '{experiment_name}' already exists.":
        print(f'Experiment already exists, setting experiment to {experiment_name}')
        experiment_info = mlflow.set_experiment(experiment_name)
        experiment_id = experiment_info.experiment_id

experiment = mlflow.get_experiment(experiment_id)
print("---------------------")
print('Experiment details are:')
print("Name: {}".format(experiment.name))
print("Experiment_id: {}".format(experiment.experiment_id))
print("Artifact Location: {}".format(experiment.artifact_location))
print("Tags: {}".format(experiment.tags))
print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))
print("Creation timestamp: {}".format(experiment.creation_time))



Experiment already exists, setting experiment to mnist-classification
---------------------
Experiment details are:
Name: mnist-classification
Experiment_id: 661060572347012195
Artifact Location: file:///Users/jcheung/Documents/GitHub/bookshelf/deployment_demos/mlruns/661060572347012195
Tags: {'version': 'v0.1'}
Lifecycle_stage: active
Creation timestamp: 1684258817271


In [86]:
# preprocess and define batch sizes for tensorflow 
(ds_train, ds_test), ds_info = tfds.load(
    'mnist',
    split=['train', 'test'],
    shuffle_files=True,
    as_supervised=True,
    with_info=True,
)

model = MNIST()
ds_train = ds_train.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
ds_train = ds_train.batch(128)
ds_test = ds_test.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
ds_test = ds_test.batch(128) 


In [88]:
ds_test

<BatchDataset element_spec=(TensorSpec(shape=(None, 224, 224, 3), dtype=tf.float32, name=None), TensorSpec(shape=(None,), dtype=tf.int64, name=None))>

In [89]:
# log a base model 
hyperparams = {
    'learning_rate': 0.01,
    'l1': 0.0,
    'l2': 0.0, 
    'num_hidden': 16,
    'epochs': 10}

# Good practice to explicitly define experiment_id and run_name. 
# Experiment_id can be extracted from above. 
# Run name examples (e.g. Linear Regression Default Hyperparams).
mlflow_run_name='MNIST Xfer Learning Base'
with mlflow.start_run(experiment_id=experiment_id, 
                      run_name=mlflow_run_name) as run:
    # You can set autolog for tensorflow model.
    # Note that autolog does not allow logging of any additional params and metrics.
    # We'll choose to do manual logging.
    # mlflow.tensorflow.autolog()

#     model.fit(ds_train, ds_test, hyperparams)

    # MLFlow Tracking parameters
    mlflow.log_params(params=hyperparams)
    
    # MLFlow Tracking metrics 
    # Logging metrics for each epoch (housed in dictionary)
    training_history = model._train_history.history
    for epoch in range(0, hyperparams['epochs']):
        insert = {}
        for metric, value in training_history.items():
            insert[metric] = training_history[metric][epoch]
        mlflow.log_metrics(metrics=insert, step=epoch+1)

    # MLFlow tracking artifact (e.g. model file)
    # this will log the model and all its details under run_id/artifacts
    mlflow.pyfunc.log_model(python_model=model,
                           artifact_path="")

    # Close out MLFlow run to prevent any log contamination.
    mlflow.end_run(status='FINISHED')

2023/05/16 13:58:49 INFO mlflow.types.utils: Unsupported type hint: <class 'numpy.ndarray'>, skipping schema inference
2023/05/16 13:58:49 INFO mlflow.types.utils: Unsupported type hint: <class 'numpy.ndarray'>, skipping schema inference


INFO:tensorflow:Assets written to: ram://c175ea1e-cf62-4a4f-ac47-906c7be8bcc1/assets


INFO:tensorflow:Assets written to: ram://c175ea1e-cf62-4a4f-ac47-906c7be8bcc1/assets


In [None]:
import optuna
# hyperparameters search using Optuna
# can scale Optuna with Kubeflow https://medium.com/optuna/parallel-hyperparameter-tuning-with-optuna-and-kubeflow-pipelines-4ef05ce614ae
def objective(trial): 
    """
    Optuna objective function for tuning transfer learning model
    """
    hyperparams = {
        'learning_rate': trial.suggest_float('learning_rate', 0.00001, 0.1, log=True),
        'l1': trial.suggest_float('l1', 0.0, 0.1),
        'l2': trial.suggest_float('l2', 0.0, 0.1),
        'num_hidden': trial.suggest_int('num_hidden', 8, 64),
        'epochs': trial.suggest_int('epochs', 2, 5)
    }

    model.train(ds_train, ds_test, hyperparams)
    training_history = model._train_history.history
    validation_accuracy = training_history['val_accuracy'][-1]
    return validation_accuracy

study = optuna.create_study(study_name='mnist-classification',
                            pruner=optuna.pruners.HyperbandPruner(),
                            direction='maximize')
study.optimize(objective, n_trials=6, n_jobs=3)

In [None]:
# log the hyper parameter tuned model 
# No need to set experiment ID if you've set above. 
# Set a run name here(e.g. Linear Regression Default Hyperparams).
mlflow_run_name='MNIST Xfer Learning Hyperparam tuned'
with mlflow.start_run(experiment_id=experiment_id, 
                      run_name=mlflow_run_name) as run:
    
    model.train(ds_train, ds_test, hyperparams)

    # MLFlow Tracking parameters
    mlflow.log_params(params=hyperparams)
    
    # MLFlow Tracking metrics 
    # Logging metrics for each epoch (housed in dictionary)
    training_history = model._train_history.history
    for epoch in range(0, hyperparams['epochs']):
        insert = {}
        for metric, value in training_history.items():
            insert[metric] = training_history[metric][epoch]
        mlflow.log_metrics(metrics=insert, step=epoch+1)

    # MLFlow tracking artifact (e.g. model file)
    # mlflow.log_artifact(self._model)

    # we need to define how we use tags for testing or if we even need them...
    mlflow.set_tag(key="test",
                   value="manual-logging")

    mlflow.end_run(status='FINISHED')

### Registering your model
Once you've run a couple experiments (e.g. hyper parameter tuning) we can select the best model and register that. Model registering just means that we decide this our optimized model to save. 

In [90]:
# search runs 
filtering_clause = 'params.epochs = "10" and params.learning_rate = "0.01"'
run = mlflow.search_runs(
    experiment_names=['mnist-classification'],
    filter_string=filtering_clause,
    max_results=5,
    order_by=["metrics.val_accuracy DESC"],
)

# best performing model run_id. This'll be used to register our model
best_run_id = run['run_id'][0]

In [91]:
best_run_id

'6970ace0f7764536a6125e05008c130e'

In [92]:
# register model after a scanning a couple different runs from your experiment 
model_name = f'{experiment.name}'
# mlflow.tensorflow.log_model
mv = mlflow.register_model(model_uri=f"runs:/{best_run_id}/",
                           name=model_name)


Registered model 'mnist-classification' already exists. Creating a new version of this model...
2023/05/16 14:00:04 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: mnist-classification, version 2
Created version '2' of model 'mnist-classification'.


### Serving model
After we've registered our model, we can now test inference for that model. 

In [19]:
f'{mlflow.get_tracking_uri()}/{best_run_id}/artifacts/mnist-classification'

'file:///Users/jcheung/Documents/GitHub/bookshelf/deployment_demos/mlruns/3a2c4fb424004539b8e79f4074145851/artifacts/mnist-classification'

In [100]:
# load directly from registry 
# model_version=1
# model_name = f'{experiment.name}'
# model = mlflow.pyfunc.load_model(model_uri=f"models:/{model_name}/{model_version}")

# load using artifact path directory
results = mlflow.search_registered_models(filter_string='name = "mnist-classification"')
latest_model_details = results[0].latest_versions[0]
model = mlflow.pyfunc.load_model(model_uri=f'{latest_model_details.source[7:]}')

In [128]:
# randomly sample and load a MNIST JPEG
import os 
from random import sample
from PIL import Image
fp = '/Users/jcheung/Documents/GitHub/thin-ML-deployment/app/ml/test_images'
files = [f'{fp}/{x}' for x in os.listdir(fp) if x.split('.')[-1] == 'jpg']
filename = sample(files, 1)[0]
image = np.array(Image.open(filename))

# predict using custom mlflow model
predicted = model.predict(image)

# output and compare results
true = filename.split('/')[-1].split('_')[-1][0]

print(f'True:{true} and predicted:{predicted}')

True:6 and predicted:6


In [129]:
model.predict(ds_test)

AttributeError: 'BatchDataset' object has no attribute 'shape'

In [None]:
import kubeflow.katib as katib

# Step 1. Create an objective function.
def objective(hyperparams):
    training_history = model.train(ds_train, ds_test, hyperparams)
    validation_accuracy = training_history.history['val_accuracy'][-1]
    # Katib parses metrics in this format: <metric-name>=<metric-value>.
    print(f"result={result}")

# Step 2. Create HyperParameter search space.
hyperparams = {
    'learning_rate': trial.suggest_float('learning_rate', 0.00001, 0.1, log=True),
    'l1': trial.suggest_float('l1', 0.0, 1),
    'l2': trial.suggest_float('l2', 0.0, 1),
    'num_hidden': katib.search.int(min=8, max=64),
    'epochs': katib.search.int(min=5, max=10)
}

hyperparams = {
    "a": katib.search.int(min=10, max=20),
    "b": katib.search.double(min=0.1, max=0.2)
}

# Step 3. Create Katib Experiment.
katib_client = katib.KatibClient()
name = "tune-experiment"
katib_client.tune(
    name=name,
    objective=objective,
    parameters=parameters,
    objective_metric_name="result",
    max_trial_count=12
)

# Step 4. Get the best HyperParameters.
print(katib_client.get_optimal_hyperparameters(name))