# MLOps Walkthrough - Model Development


key principles / tasks supported by RSEs
* reusable components
* running at scale
* keeping track of experiements for reporoducability



In [None]:
import warnings
warnings.filterwarnings('ignore')

In [1]:
import pathlib
import datetime
import os
import functools
import math

In [2]:
import matplotlib
%matplotlib inline

In [3]:
import numpy
import pandas
import dask

In [4]:
import sklearn
import sklearn.preprocessing
import sklearn.model_selection

In [5]:
import tensorflow

import tensorflow.keras
import tensorflow.keras.layers
import tensorflow.keras.models
import tensorflow.keras.optimizers
import tensorflow.keras.metrics
import tensorflow.keras.layers
import tensorflow.keras.constraints

In [6]:
import tensorboard

In [7]:
# Load the TensorBoard notebook extension
%load_ext tensorboard

In [8]:
import mlflow
mlflow.tensorflow.autolog()



In [9]:
import intake

In [10]:
mlflow_dash_port = 5001
tensorboard_dash_port = 5002
ray_dash_port = 5003

### Load the data

In [None]:
try:
    rse_root_data_dir = pathlib.Path(os.environ['RSE22_ROOT_DATA_DIR'])
    print('reading from environment variable')
except KeyError as ke1:
    rse_root_data_dir = pathlib.Path(os.environ['HOME'])  / 'data' / 'ukrse2022'
    print('using default path')
rse_root_data_dir

In [None]:
rotors_catalog = intake.open_catalog(rse_root_data_dir / 'rotors_catalog.yml')
rotors_catalog 

In [None]:
rotors_df = rotors_catalog['rotors'].read()

In [None]:
rotors_df

In [None]:
rotors_df  = rotors_df [(rotors_df ['wind_speed_obs'] >= 0.0) &
                            (rotors_df ['air_temp_obs'] >= 0.0) &
                            (rotors_df ['wind_direction_obs'] >= 0.0) &
                            (rotors_df ['dewpoint_obs'] >= 0.0) 
                           ]
rotors_df ['DTG'] = dask.dataframe.to_datetime(rotors_df['DTG'])
rotors_df  = rotors_df .drop_duplicates(subset=['DTG'])
rotors_df  = rotors_df [~rotors_df['DTG'].isnull()]

In [None]:
temp_feature_names = [f'air_temp_{i1}' for i1 in range(1,23)]
humidity_feature_names = [f'sh_{i1}' for i1 in range(1,23)]
wind_direction_feature_names = [f'winddir_{i1}' for i1 in range(1,23)]
wind_speed_feature_names = [f'windspd_{i1}' for i1 in range(1,23)]
target_feature_name = 'rotors_present'

In [None]:
def get_v_wind(wind_dir_name, wind_speed_name, row1):
    return math.cos(math.radians(row1[wind_dir_name])) * row1[wind_speed_name]

def get_u_wind(wind_dir_name, wind_speed_name, row1):
    return math.sin(math.radians(row1[wind_dir_name])) * row1[wind_speed_name]

In [None]:
%%time
u_feature_template = 'u_wind_{level_ix}'
v_feature_template = 'v_wind_{level_ix}'
u_wind_feature_names = []
v_wind_features_names = []
for wsn1, wdn1 in zip(wind_speed_feature_names, wind_direction_feature_names):
    level_ix = int( wsn1.split('_')[1])
    u_feature = u_feature_template.format(level_ix=level_ix)
    u_wind_feature_names += [u_feature]
    rotors_df[u_feature] = rotors_df.apply(functools.partial(get_u_wind, wdn1, wsn1), axis='columns')
    v_feature = v_feature_template.format(level_ix=level_ix)
    v_wind_features_names += [v_feature]
    rotors_df[v_feature] = rotors_df.apply(functools.partial(get_v_wind, wdn1, wsn1), axis='columns')

### Train/test split

Split based on year to avoid correlations between train and test sets.

In [None]:
train_df = rotors_df[rotors_df['DTG'] < datetime.datetime(2020,1,1,0,0)]
val_df = rotors_df[rotors_df['DTG'] > datetime.datetime(2020,1,1,0,0)]

In [None]:
input_feature_names = temp_feature_names + humidity_feature_names + u_wind_feature_names + v_wind_features_names

In [None]:
preproc_dict = {}
for if1 in input_feature_names:
    scaler1 = sklearn.preprocessing.StandardScaler()
    scaler1.fit(train_df[[if1]])
    preproc_dict[if1] = scaler1

In [None]:
target_encoder = sklearn.preprocessing.LabelEncoder()
target_encoder.fit(train_df[[target_feature_name]])

In [None]:
def preproc_input(data_subset, pp_dict):
    return numpy.concatenate([scaler1.transform(data_subset[[if1]]) for if1,scaler1 in pp_dict.items()],axis=1)

def preproc_target(data_subset, enc1):
     return enc1.transform(data_subset[[target_feature_name]])

In [None]:
X_train = preproc_input(train_df, preproc_dict)
y_train = numpy.concatenate(
    [preproc_target(train_df, target_encoder).reshape((-1,1)),
    1.0 - (preproc_target(train_df, target_encoder).reshape((-1,1))),],
    axis=1
)

In [None]:
X_val = preproc_input(val_df, preproc_dict)
y_val = numpy.concatenate(
    [preproc_target(val_df, target_encoder).reshape((-1,1)),
    1.0 - (preproc_target(val_df, target_encoder).reshape((-1,1))),],
    axis=1
)

### Set up experiment tracking - ML Flow

In [None]:
rse_rotors_experiment_name = 'rse_mlops_demo_rotors'

In [None]:
timestamp_template = '{dt.year:04d}{dt.month:02d}{dt.day:02d}T{dt.hour:02d}{dt.minute:02d}{dt.second:02d}'

In [None]:
rse_run_name_template = 'rse_rotors_{network_name}_' + timestamp_template

In [None]:
mlflow_server_address = '127.0.0.1'
mlflow_server_port = mlflow_dash_port
mlflow_server_uri = f'http://{mlflow_server_address}:{mlflow_server_port:d}'
mlflow_server_uri

In [None]:
mlflow.set_tracking_uri(mlflow_server_uri)

In [None]:
try: 
    print('creating experiment')
    rse_rotors_exp_id = mlflow.create_experiment(rse_rotors_experiment_name)
    rse_rotors_exp = mlflow.get_experiment(rse_rotors_exp_id)
except mlflow.exceptions.RestException:
    rse_rotors_exp = mlflow.get_experiment_by_name(rse_rotors_experiment_name)
rse_rotors_exp



### Set up model architecture hyperparameters

In [None]:
def build_ffnn_model(hyperparameters, input_shape):
    """
    Build a feed forward neural network model in tensorflow for predicting the occurence of turbulent orographically driven wind gusts called Rotors.
    """
    model = tensorflow.keras.models.Sequential()
    model.add(tensorflow.keras.layers.Dropout(hyperparameters['drop_out_rate'], 
                                              input_shape=input_shape))
    for i in numpy.arange(0,hyperparameters['n_layers']):
        model.add(tensorflow.keras.layers.Dense(hyperparameters['n_nodes'], 
                                                activation=hyperparameters['activation'], 
                                                kernel_constraint=tensorflow.keras.constraints.max_norm(3)))
        model.add(tensorflow.keras.layers.Dropout(hyperparameters['drop_out_rate']))
    model.add(tensorflow.keras.layers.Dense(2, activation='softmax'))             # This is the output layer
    return model


In [None]:
nx = X_train.shape[1]
input_shape = (nx,)

In [None]:
hyperparameters_dict = {
    'initial_learning_rate': 1.0e-4,
    'drop_out_rate': 0.2,
    'n_epochs': 100,
    'batch_size': 1000,
    'n_nodes': 1000,
    'n_layers': 4,
    'activation': 'relu',
    'loss': 'mse'
}

### Train model and setup monitoring

Tools:
* tensorboard
* tensorflow
* mlflow


In [None]:
log_dir_tensorboard = rse_root_data_dir / 'log_tensorboard' 
if not log_dir_tensorboard.is_dir():
    log_dir_tensorboard.mkdir()
    print(f'created tensorboard log directory {log_dir_tensorboard}')    
log_dir_tensorboard

In [None]:
tensorboard_callback = tensorflow.keras.callbacks.TensorBoard(log_dir=log_dir_tensorboard, 
                                                              histogram_freq=1)

In [None]:
log_dir_tensorboard

In [None]:
%tensorboard --logdir /Users/stephen.haddad/data/ukrse2022/log_tensorboard

In [None]:
%time 
current_run_name = rse_run_name_template.format(network_name='ffnn',
                                                dt=datetime.datetime.now()
                                               )
with mlflow.start_run(experiment_id=rse_rotors_exp.experiment_id, run_name=current_run_name) as current_run:
    rotors_ffnn_model = build_ffnn_model(hyperparameters=hyperparameters_dict,
                                     input_shape=input_shape,
                                    )
    rotors_ffnn_optimizer = tensorflow.optimizers.Adam(
        learning_rate=hyperparameters_dict['initial_learning_rate'])  
    
    rotors_ffnn_model.compile(optimizer=rotors_ffnn_optimizer, 
                          loss=hyperparameters_dict['loss'], 
                          metrics=[tensorflow.keras.metrics.RootMeanSquaredError()])
    
    history=rotors_ffnn_model.fit(
        X_train, 
        y_train, 
        validation_data=(X_val, 
                          y_val), 
        epochs=hyperparameters_dict['n_epochs'], 
        batch_size=hyperparameters_dict['batch_size'], 
        shuffle=True,
        verbose=0,
        callbacks=[tensorboard_callback],
    )    
    

#  Example do hyperparameteer tuning in ray

https://docs.ray.io/en/latest/tune/examples/tune_mnist_keras.html#tune-mnist-keras


In [11]:
import ray
import ray.tune
import ray.tune.schedulers 
import ray.tune.integration.keras

In [12]:
import rotors_hpt

In [13]:
import importlib

In [31]:
importlib.reload(rotors_hpt)

<module 'rotors_hpt' from '/Users/stephen.haddad/prog/ukrse_2022_mlops_walkthrough/rotors_hpt.py'>

In [15]:
num_training_terations=20

In [16]:
ray.init(num_cpus=4, dashboard_port=ray_dash_port, dashboard_host='127.0.0.1')


RayContext(dashboard_url='', python_version='3.8.13', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03d692b2be3e', address_info={'node_ip_address': '127.0.0.1', 'raylet_ip_address': '127.0.0.1', 'redis_address': None, 'object_store_address': '/tmp/ray/session_2022-08-12_16-23-57_765638_26561/sockets/plasma_store', 'raylet_socket_name': '/tmp/ray/session_2022-08-12_16-23-57_765638_26561/sockets/raylet', 'webui_url': '', 'session_dir': '/tmp/ray/session_2022-08-12_16-23-57_765638_26561', 'metrics_export_port': 58009, 'gcs_address': '127.0.0.1:61655', 'address': '127.0.0.1:61655', 'node_id': 'd77f56b269afeebbc697ba72e771ed6bbd2a60a4f0e564b05d2b6d5e'})

In [17]:
# rse_rotors_sched = 
# set up local ray cluster

In [18]:
rotors_hpt_config = {
    'initial_learning_rate': ray.tune.uniform(1e-5,1e-3),
    'n_nodes': ray.tune.randint(100,500),
    'n_layers': ray.tune.randint(2,6)
}

In [34]:
rotors_hpt_analysis = ray.tune.run(
    rotors_hpt.run_ml_pipeline,
    name="rse_rotors_hpt",
    scheduler=ray.tune.schedulers.AsyncHyperBandScheduler(
        time_attr="training_iteration", 
        max_t=400, 
        grace_period=20,
    ),
    metric="root_mean_squared_error",
    mode="max",
    stop={"root_mean_squared_error": 0.99, 
          "training_iteration": num_training_terations},
    num_samples=10,
    resources_per_trial={"cpu": 2,
                         "gpu": 0},
    config=rotors_hpt_config,
    progress_reporter=ray.tune.JupyterNotebookReporter(overwrite=True),
    )

TypeError: __init__() missing 1 required positional argument: 'overwrite'

# Example save the model 

* use ONNX
* Use MLFlow


In [None]:
# save model in ml flow

In [None]:
# stretch goal - save model in ONNX?

### Next Steps / Further Reading

### References
* mlflow
* scikit-learn
* tensorflow
* tensorboard