# MLOPs Parsl workflow

This notebook is the stand-alone companion to the Parsl MLOPs workflow in `main.py` in this repository. This notebook is designed to be run directly on an HPC resource while the `main.py` in this workflow uses the `parsl_utils` to launch MLOPs applications from a central coordinating node (i.e. a laptop or the Parallel Works platform). This workflow simulates a typical MLOPs situation with the following tasks:
1. start an MLFlow tracking server
2. start DVC tracking within an architve repository + remote
3. download and preprocess training data
4. run training loop and store results on-the-fly with MLFlow
5. commit and push resulting models with DVC to repo + remote
6. use the model for inference and generate figures.
7. reusing the model for inference and generating figures

The core ML training used here started as a copy of [Francois Chollet's VAE digits example](https://keras.io/examples/generative/vae/) but modified for online learning, MLOPs, and Parsl orchestration.

## Installs

The bulk of the install commands below are commented out 
since it is faster to reconstruct a Conda environment from
an exported env file (.yaml) than to rebuild from scratch.
That reconstruction command is kept active here since 
env files are distributed with this notebook. Note that there
are two different environments - one for CPU and one for GPU -
TensorFlow will not in general work if you use . Once the command to reconstruct
the Conda environment has been run, you may need to tell
this notebook to use the kernel from that Conda environment
with the `Kernel > Change kernel...` option in the menu above.

In [None]:
install_from_scratch=False

if (install_from_scratch):
    # Use a specific version of Python due to
    # compatibility with TensorFlow and Keras
    #! conda create --name mlops-parsl-gpu python=3.9
    
    # To use a Jupyter notebook with a
    # specific conda environment:
    ! conda install -y requests
    ! conda install -y ipykernel
    ! conda install -y -c anaconda jinja2
    
    # Conda installs
    ! conda install -y -c conda-forge matplotlib
    ! conda install -y -c conda-forge pandas
    ! conda install -y -c conda-forge dvc 
    
    # pip installs
    ! pip install --upgrade pip
    ! pip install tensorflow # For CPU nodes, will work on GPU nodes but will be slow
    #! pip install tensorflow[and-cuda] # Replace the above with this line for GPU nodes
    ! pip install tensorflow-plugin-profile
    ! pip install mlflow
    ! pip install 'parsl[monitoring, visualization]' # Conda does not install monitoring, so use pip

    # The environment was then exported with:
    # ! conda env export --name mlops-parsl-cpu > ./requirements/mlops-parsl-cpu.yaml
else:
    # You can rebuild the environment with:
    ! conda env update -f ./requirements/mlops-parsl-cpu.yaml --name mlops-parsl-cpu

# Note that there are two exported Conda environments
# - *cpu and *gpu - because the installation of TensorFlow 
# will automatically detect the presence of GPUs and 
# change which version is installed accordingly.

## Imports

Based on the instructions in the [Parsl Tutorial](https://parsl.readthedocs.io/en/latest/1-parsl-introduction.html)

In [None]:
import os
import numpy as np
import pandas as pd

os.environ["KERAS_BACKEND"] = "tensorflow"

# ml dependencies
import tensorflow as tf
import keras
from keras import ops
from keras import layers

# mlflow dependencies
import mlflow
from mlflow import MlflowClient

# parsl dependencies
import parsl
import logging
from parsl.app.app import python_app, bash_app
from parsl.configs.local_threads import Config
from parsl.executors import HighThroughputExecutor # we want to use monitoring, so we must use HTEX
from parsl.monitoring.monitoring import MonitoringHub
from parsl.addresses import address_by_hostname

#=================================================
# Log everything to stdout (ends up in pink boxes 
# in the notebook). This information is logged anyway
# in ./runinfo/<run_id>/parsl.log
#parsl.set_stream_logger() # <-- log everything to stdout
#==================================================

print(parsl.__version__)

## Configure Parsl

This configuration must use the HTEX since we also want to enable [Parsl monitoring](https://parsl.readthedocs.io/en/latest/userguide/monitoring.html).

In [None]:
config = Config(
   executors=[
       HighThroughputExecutor(
           label="local_htex",
           cores_per_worker=0.1,
           max_workers_per_node=4,
           address=address_by_hostname(),
       )
   ],
   monitoring=MonitoringHub(
       hub_address=address_by_hostname(),
       hub_port=55055,
       monitoring_debug=False,
       resource_monitoring_interval=10,
   ),
   strategy='none'
)

# Loading the configuration starts a Parsl DataFlowKernel
dfk = parsl.load(config)

## Define Parsl apps

Parsl workflows are divided into the smallest unit of execution, the app. There are two types of Parsl apps:
1. Python apps are useful when launching pure Python code (i.e. TensorFlow)
2. Bash apps are useful when launching tasks on the command line (i.e. starting the MLFlow server)

Here, the applications are *defined* but not run.

### Python Apps

In [None]:
@python_app # make directory to keep all files associated with the ML model
def make_dir(model_dir):
    import os
    os.makedirs(model_dir, exist_ok = False)
    return 0

In [None]:
@python_app # get and preprocess training data
def preprocess_data(inputs = [], outputs = []):
    import keras
    import numpy as np
    (x_train, Y_train), (x_test, Y_test) = keras.datasets.mnist.load_data()
    mnist_digits = np.expand_dims(np.concatenate([x_train, x_test], axis=0), -1).astype("float32") / 255
    return mnist_digits

In [None]:
@python_app # build the model and then train the model
def build_train_model(inputs=[], outputs=[]): # inputs = [data, experiment, num, build]
    
    # imports ---------------------------------------------------------------------------------------------
    
    import os
    import numpy as np
    import pandas as pd

    with open('model_build_out.txt', 'a') as f:
        f.write("Starting training app...\n")
        f.write("Execution is in "+os.getcwd()+"\n")
    
    os.environ["KERAS_BACKEND"] = "tensorflow"

    # ml dependencies
    import tensorflow as tf
    import keras
    from keras import ops
    from keras import layers

    # mlflow dependencies
    import mlflow
    from mlflow import MlflowClient
    
    # definition library
    import sys
    sys.path.append(os.getcwd())
    from definitions import Sampling, VAE
    
    # -----------------------------------------------------------------------------------------------------
    
    with open('model_build_out.txt', 'a') as f:
        f.write("Building model...\n")
    
    # build encoder ---------------------------------------------------------------------------------------
    
    latent_dim = 2
    encoder_inputs = keras.Input(shape=(28, 28, 1))
    
    x = layers.Conv2D(32, 3, activation="relu", strides=2, padding="same")(encoder_inputs)
    x = layers.Conv2D(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.Flatten()(x)
    x = layers.Dense(16, activation="relu")(x)
    
    z_mean = layers.Dense(latent_dim, name="z_mean")(x)
    z_log_var = layers.Dense(latent_dim, name="z_log_var")(x)
    z = Sampling()([z_mean, z_log_var])
    
    encoder = keras.Model(encoder_inputs, [z_mean, z_log_var, z], name="encoder")
    with open('model_build_out.txt', 'a') as f:
        encoder.summary(print_fn=lambda x: f.write(x + '\n'))
    
    # -----------------------------------------------------------------------------------------------------
    
    # build decoder ---------------------------------------------------------------------------------------
    
    latent_dim = 2
    latent_inputs = keras.Input(shape=(latent_dim,))
    
    x = layers.Dense(7 * 7 * 64, activation="relu")(latent_inputs)
    x = layers.Reshape((7, 7, 64))(x)
    x = layers.Conv2DTranspose(64, 3, activation="relu", strides=2, padding="same")(x)
    x = layers.Conv2DTranspose(32, 3, activation="relu", strides=2, padding="same")(x)
    
    decoder_outputs = layers.Conv2DTranspose(1, 3, activation="sigmoid", padding="same")(x)
    decoder = keras.Model(latent_inputs, decoder_outputs, name="decoder")
    with open('model_build_out.txt', 'a') as f:
        decoder.summary(print_fn=lambda x: f.write(x + '\n'))
    
    # -----------------------------------------------------------------------------------------------------
    
    # build model -----------------------------------------------------------------------------------------    
    
    vae = VAE(encoder, decoder)
    vae.compile(optimizer=keras.optimizers.Adam())
    
    # train model ----------------------------------------------------------------------------------------- 
    
    
    with open('model_build_out.txt', 'a') as f:
        f.write("Set up training context...\n")
    
    model_dir = './model_dir' 
    early_stopping_cb = keras.callbacks.EarlyStopping(patience = 5, restore_best_weights = True)
    
    # if the model has already been trained at least once, load that model
    if os.path.exists(os.path.join(model_dir, 'vae.weights.h5')): 
        with open('model_build_out.txt', 'a') as f:
            f.write("found weights\n")
        vae.load_weights(os.path.join(model_dir, 'vae.weights.h5'))
    
    mlflow.autolog() # start autologging
    
    run_name = f"{inputs[2]}_test" # define a run name for this iteration of training
    artifact_path = f"{inputs[2]}"  # define an artifact path that the model will be saved to
    
    # initiate the MLflow run context 
    # - training needs to happen inside of the mlflow run or you will run into problems with double logging
    with mlflow.start_run(run_name = run_name, experiment_id = inputs[1]) as run:
        
        with open('model_build_out.txt', 'a') as f:
            f.write("Core model training...\n")
        history = vae.fit(inputs[0], epochs=30, batch_size=128, callbacks = [early_stopping_cb])
        
        with open('model_build_out.txt', 'a') as f:
            f.write("Save model weights...\n")
        vae.save_weights(os.path.join(model_dir, 'vae.weights.h5')) # save model weights after training

        with open('model_build_out.txt', 'a') as f:
            f.write("Getting and saving model training history...\n")
        hist_pd = pd.DataFrame(history.history)
        hist_pd.to_csv(os.path.join(model_dir, f'history_{inputs[2]}.csv'), index = False)
    
    with open('model_build_out.txt', 'a') as f:
        f.write("Training app done.\n")
        
    return 1
    # figure out how to clear model data

### Bash Apps

In [None]:
@bash_app # start the parsl visulaizer
def start_parsl_visualize(stdout='parsl_vis_app.stdout', stderr='parsl_vis_app.stderr'):
    return 'parsl-visualize --listen 127.0.0.1 --port 8080'

In [None]:
@bash_app # start the mlflow sever
def start_mlflow(stdout='mlflow_app.stdout', stderr='mlflow_app.stderr'):
    return 'mlflow server --host 127.0.0.1 --port 8081'

In [None]:
@bash_app # grab your dvc repository
def add_submodule(stdout='dvc.stdout', stderr='dvc.stderr'):
    dvc_repo_link="git@github.com:oobielodan/digits_dvc.git" # <ssh link to the repo you set aside for dvc>
    
    return 'git submodule add --force %s' % (dvc_repo_link)

In [None]:
@bash_app # DVC initialization and storage set up
def dvc_setup(stdout='dvc.stdout', stderr='dvc.stderr'):
    dvc_storage="/demo-bucket" # <complete path to the mounted storage you have set up for dvc>
    dvc_repo="digits_dvc"
    
    return 'cd %s && dvc init -f && dvc remote add -d dvcstorage %s' % (dvc_repo, dvc_storage)

In [None]:
@bash_app # initial commit to git
def dvc_init(stdout='dvc.stdout', stderr='dvc.stderr'):
    dvc_repo="digits_dvc"
    
    return 'cd %s && git add . && git commit -m "loaded dependencies, mkdir -p, DVC init"' % (dvc_repo)

In [None]:
@bash_app
def execute_dvc(stdout='dvc.stdout', stderr='dvc.stderr'):    
    dvc_repo="digits_dvc"
    env_name="mlops-parsl-cpu" # <name of your env>
    model_dir="model_dir"
    
    return 'cp ./%s/vae.weights.h5 %s/experiment_2.weights.h5 && sh dvcgit.sh experiment_2.weights.h5 "digit experiment 2" %s %s' % (model_dir, dvc_repo, dvc_repo, env_name)

In [None]:
@bash_app
def rm_dvc(stdout='dvc.stdout', stderr='dvc.stderr'):
    dvc_repo="digits_dvc"
    model_dir="model_dir"
    
    return 'rm ./%s/experiment_2.weights.h5 && rm ./%s/vae.weights.h5' % (dvc_repo, model_dir)

## Start Parsl monitoring - Option 1 - direct shell invocation to background

This step can be done at any point provided that a database file exists.  The default location of this file is in `./runinfo/monitoring.db` and this file is created when the Parsl configuration is loaded. When the notebook kernel is restarted, additional Parsl workflow runs' information is appended to the monitoring information in `./runinfo`. It is possible to view this information "offline" (i.e. no active running Parsl workflows, see Option 3, at the end of this notebook).

This launch is commented out here since it is also possible to launch `parsl-visualize` from a Parsl app within the workflow, which is done below. This command is retained as a functional example. The advantage to running `parsl-visualize` as a Parsl app is that the visualization server is up and running while the workflow is running and then is shut down when the workflow is cleaned up. Otherwise, when `parsl-visualize` is launched via `os.system` the running child process can persist even after workflow shut down or notebook kernel restart.

In [None]:
# Launch Parsl 
# os.system('parsl-visualize 1> parsl_vis.stdout 2> parsl_vis.stderr &')

## Start MLFlow and start Parsl monitoring - Option 2 - Monitoring as a Parsl app

This approach is helpful if we want Parsl Monitoring processes to be cleaned up after the workflow is complete. This point is also the most natural time to start MLFlow. Note that both of these commands are tracked by Parsl and are considered to be part of the workflow.

In [None]:
# Start Parsl visualization in a
# separate cell since we only want
# to run this app one time. This
# invocation of parsl_visualize is
# technically part of the workflow.

parsl_vis_future = start_parsl_visualize()
mlflow_future = start_mlflow()

## Run the workflow

The workflow code below runs the applications.

In [None]:
# create the model directory that holds information on the model
model_dir = './model_dir' 
mkdir_future = make_dir(model_dir)

In [None]:
# utilize and set up the initialized server for tracking 
client = MlflowClient(tracking_uri = "http://127.0.0.1:8081")
mlflow.set_tracking_uri("http://127.0.0.1:8081")

### Set up experiments for MLFlow

#### Experiment 1
In Experiment 1, we train the Digit CVAE model on multiple datasets. To create these datasets, we split the original dataset into five equal, randomized parts. After each training session, we save the weights and use them as the starting point for retraining the model on the next dataset.

In [None]:
# provide an experiment description that will appear in the UI
experiment1_description = (
    "This is the digits forecasting project."
    "This experiment contains the digit model for randomized numbers (0-9) trained separately."
)

# provide searchable tags for the experiment
experiment1_tags = {
    "project_name": "digit-forecasting",
    "model_type": "randomzied",
    "team": "digit-ml",
    "project_quarter": "Q3-2024",
    "mlflow.note.content": experiment1_description,
}

# create the experiment and give it a unique name
digit_experiment1 = client.create_experiment(
    name="Randomize_Model", tags=experiment1_tags
)

#### Experiment 2
In Experiment 2, we train the Digit CVAE model on all digit samples simultaneously, without any subsequent retraining using the weights.

In [None]:
# provide an experiment description that will appear in the UI
experiment2_description = (
    "This is the digits forecasting project."
    "This experiment contains the digit model for numbers (0-9) trained all together."
)

# provide searchable tags for the experiment
experiment2_tags = {
    "project_name": "digit-forecasting",
    "model_type": "all digits",
    "team": "digit-ml",
    "project_quarter": "Q3-2024",
    "mlflow.note.content": experiment2_description,
}

# create the experiment and give it a unique name
digit_experiment2 = client.create_experiment(
    name="Together_Model", tags=experiment2_tags
)

#### Experiment 3
In Experiment 3, we revisit the approach used in Experiment 1 - initializing the model with the weights from a previous training session and retraining it from there. However in this experiment, we train the Digit CVAE model sequentially on each of the 10 digits (0–9), one digit at a time. After each training session, we save the weights and use them to retrain the model on the next digit. This approach induces a 'forgetting' effect, where the model gradually loses its ability to recognize previous digits with each subsequent training session.

In [None]:
# provide an experiment description that will appear in the UI
experiment3_description = (
    "This is the digits forecasting project."
    "This experiment contains the digit model for each of the numbers (0-9) trained separately."
)

# provide searchable tags that define characteristics of the runs that will be in this experiment
experiment3_tags = {
    "project_name": "digit-forecasting",
    "model_type": "sequential",
    "team": "digit-ml",
    "project_quarter": "Q3-2024",
    "mlflow.note.content": experiment3_description,
}

# create the experiment and give it a unique name
digit_experiment3 = client.create_experiment(
    name="Sequenced_Model", tags=experiment3_tags
)

In [None]:
# save each of the experiment's metadata
digit_experiment1 = mlflow.set_experiment("Randomize_Model")
digit_experiment2 = mlflow.set_experiment("Together_Model")
digit_experiment3 = mlflow.set_experiment("Sequenced_Model")

### Run experiments

In [None]:
# get data with the preprocess_data() app
mnist_digits = preprocess_data().result()

#### Experiment 1

In [None]:
# retraining the model n times
count = 0
build = []

for arr in np.array_split(mnist_digits, 5):
    count += 1
    
    if (count > 1):
        print('Launching retraining...')
        # Note that we augment the counter above
        # for the next training iteration BUT the
        # .append() operation only happens after the
        # execution of the code inside the (). This
        # means that we need to reference build[count-2]
        # (and not -1) because we haven't appended
        # the future to the future list until the
        # app is launched, so we need to use the counter
        # the corresponds to the future list before
        # the launch happens.
        enforce = build[count-2]
    else:
        print('Launching first training...')
        enforce = 0
    
    # Launch training
    build.append(
        build_train_model(
            inputs=[
                arr, 
                digit_experiment1.experiment_id, 
                f"rand_{count}", enforce]))
    
    # Print the future status of the launched app
    print(build[count-1])

In [None]:
future_add1 = add_submodule()

In [None]:
future_setup1 = dvc_setup()

In [None]:
future_init1 = dvc_init()

In [None]:
future_execute1 = execute_dvc()

In [None]:
futurre_rm1 = rm_dvc()

#### Experiment 2

In [None]:
# train all numbers at the same time
build = build_train_model(inputs=[mnist_digits, digit_experiment2.experiment_id, "all", 0])

In [None]:
future_add2 = add_submodule()

In [None]:
future_setup2 = dvc_setup()

In [None]:
future_init2 = dvc_init()

In [None]:
future_execute2 = execute_dvc()

In [None]:
futurre_rm2 = rm_dvc()

#### Experiment 3

In [None]:
# training one number at a time
count = 0
build = []

for num in np.arange(10):
    count += 1
    
    train_filter = np.where(Y_train == num)
    test_filter = np.where(Y_test == num)
    
    x_trn = x_train[train_filter]
    x_tst = x_test[test_filter]
    
    digits = np.expand_dims(np.concatenate([x_trn, x_tst], axis=0), -1).astype("float32") / 255
    
    if (count > 1):
        print('Launching retraining...')
        enforce = build[count-2]
    else:
        print('Launching first training...')
        enforce = 0
    
    # Launch training
    train_model(inputs=[digits, digit_experiment3.experiment_id, num, enforce])
    
    # Print the future status of the launched app
    print(build[count-1])

In [None]:
future_add3 = add_submodule()

In [None]:
future_setup3 = dvc_setup()

In [None]:
future_init3 = dvc_init()

In [None]:
future_execute3 = execute_dvc()

In [None]:
future_rm3 = rm_dvc()

#### dvc run through

## Stop Parsl

The cells above can be rerun any number of times; this will simply send more and more apps to be run by Parsl. When the workflow is truly complete, it is time to call the cleanup() command. This command runs implicitly when a `main.py` script finishes executing, but it is *not* run in a notebook unless it is explicitly called as it is below.

In [None]:
dfk.cleanup()

## Clean up some log files

In [None]:
# script for cleaing .stdout and .stderr files
! sh clean_logs

In [None]:
# Application logs
! rm model_build_out.txt

# This directory contains Parsl monitoring along with other logs
! rm -rf runinfo

# This directory contains the saved model files
# ! rm -rf model_dir

# This directory contains the databases for MLFlow
! rm -rf mlruns

## Start Parsl Monitoring - Option 3 - Post workflow manual invocation

Once the Parsl `./runinfo/monitoring.db` is created, it is possible to start Parsl Monitoring and browse the results of workflow in an offline manner.  In this scenario, `parsl-visualize` can be started on the command line provided that a Conda env with `parsl[visualize]` installed is activated. For example:
```
source pw/.miniconda3/etc/profile.d/conda.sh
conda activate base
parsl-visualize sqlite:////${HOME}/mlops-parsl-workflow/runinfo/monitoring.db
```
(You may need to adjust the path to the Conda environment, its name, and the path to `monitoring.db`.)