# Track Machine Learning experiments and models

A machine learning model is a file that has been trained to recognize certain types of patterns. You train a model over a set of data, providing it an algorithm that it can use to reason over and learn from those data. Once you have trained the model, you can use it to reason over data that it hasn't seen before, and make predictions about that data.

In this notebook, you will learn the basic steps to run an experiment, add a model version to track run metrics and parameters and register a model.


## Limitation alert

Right of the bat we confront a major limitation of MLLib. MLLib offers a multilayer Perceptron, however the output layer can not be adjusted to be a linear function. Therefore regression is not possible within the MLLib library. For that reason, we will unfortunately not be using the Power of Spark for the training of our Neural Net. Instead of **MLLib** we will be using **Pytorch**

#### Step 1: Import libraries


In [2]:
import mlflow
import mlflow.pytorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.init as init
from sklearn.metrics import mean_squared_error, r2_score
import mlflow.sklearn
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, RandomForestRegressor
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator



StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 4, Finished, Available)

#### Step 2: Setup ML FLow experiment

In [3]:

torch.set_default_dtype(torch.float32)
#mlflow.set_experiment("price_predictions_airbnb_neural_net")
mlflow.set_experiment("airbnb_blog_large_experiment")

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 5, Finished, Available)

2023/09/08 13:40:39 INFO mlflow.tracking.fluent: Experiment with name 'airbnb_blog_large_experiment' does not exist. Creating a new experiment.


<Experiment: artifact_location='', creation_time=1694180440967, experiment_id='665f1ca1-8479-49e8-b3d1-f4da6375457b', last_update_time=None, lifecycle_stage='active', name='airbnb_blog_large_experiment', tags={}>

#### Step 3: Import and prepare data for MLLib models



In [6]:
# Import the dataset

airbnb = spark.read.parquet("Files/transformed/opendatasoft/airbnb-listings").dropna()

# Split train & test
train_df, test_df = airbnb.randomSplit([0.7, 0.3])

#Define x features
x_features = [column for column in airbnb.columns if column != 'price']

# Store in vector
vector_assembler = VectorAssembler(inputCols=x_features, 
                                outputCol="features",
                                handleInvalid="skip")
train_df = vector_assembler.transform(train_df)
test_df = vector_assembler.transform(test_df)

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 8, Finished, Available)

#### Step 4: Generate function to run following ML algorithms

#### Linear Regression
#### Decision Tree
#### Random Forrest

In [7]:
# Generate function in which Ml Flow experiment is kicked off
from mlflow.models.signature import infer_signature
import numpy as np

def train_evaluate(model, name, train_df, test_df):
    with mlflow.start_run(run_name=name) as run:
        # Train the model
        mlflow.log_param("num_training_rows", train_df.count())
        model = model.setFeaturesCol("features").setLabelCol("price")
        trained_model = model.fit(train_df)
        
        # Make predictions
        predictions = trained_model.transform(test_df)

        input_sample = train_df.drop("price")
        output_sample = predictions.select("prediction")
        #signature = infer_signature(input_sample, output_sample)
        
        # Evaluate the model for mse
        evaluator_mse = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mse")
        mse = evaluator_mse.evaluate(predictions)

        # Evaluate the model for mae
        evaluator_mae = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="mae")
        mae = evaluator_mae.evaluate(predictions)        

        # Evaluate r2 if model is linear regression
        if name == "Linear Regression":
            evaluator_r2 = RegressionEvaluator(labelCol="price", predictionCol="prediction", metricName="r2")
            r2 = evaluator_r2.evaluate(predictions)   
 
        #Store artifacts
        pred_array = np.array(predictions.select("prediction").limit(100).collect()).flatten()
        truth_array = np.array(predictions.select("price").limit(100).collect()).flatten()
        
        # Combine predictions and ground truth into a single array
        combined_array = np.stack((pred_array, truth_array), axis=-1)
        print(combined_array)

        # Save them to a .npy file
        np.save("combined_predictions_truth.npy", combined_array)
        
        # Log the combined numpy array as an artifact
        mlflow.log_artifact("combined_predictions_truth.npy")
        # Log metrics
        mlflow.log_metric("mse", mse)
        mlflow.log_metric("mae", mae)
        if name == "Linear Regression":
            mlflow.log_metric("r2", r2)

        mlflow.spark.log_model(trained_model, "price_predictions_airbnb")
        print("Model saved in run_id=%s" % run.info.run_id)

        #mlflow.register_model(
        #"runs:/{}/price_predictions_airbnb".format(run.info.run_id), name)
        
        print(f"{name} MSE: {mse}")

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 9, Finished, Available)

#### Step 5: Run experiment

In [8]:
# Test multiple regression models

models = {
    "Linear Regression": LinearRegression(),
    "Decision Tree": DecisionTreeRegressor(),
    "Random Forest": RandomForestRegressor(numTrees = 1000)
}

for name, model in models.items():
    train_evaluate(model, name, train_df, test_df)

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 10, Finished, Available)

[[ 73.44974366  42.        ]
 [ 90.22275998  76.        ]
 [ 74.45575443  55.        ]
 [-12.082187    28.        ]
 [ 97.12712308  30.        ]
 [ 97.01676881  35.        ]
 [ 94.49072732  76.        ]
 [  9.74574921  50.        ]
 [ 94.29745068  40.        ]
 [ 87.62425004  95.        ]
 [ 15.15980688  44.        ]
 [ 67.97486533  60.        ]
 [ 98.90566625  30.        ]
 [  8.59555472  99.        ]
 [ 46.68746299 149.        ]
 [ 15.67566958  60.        ]
 [ 95.12875765  55.        ]
 [ 12.75488982  30.        ]
 [105.27492839  45.        ]
 [  0.57401764  48.        ]
 [100.51081239  60.        ]
 [ 74.55361061  37.        ]
 [ 66.60124427  75.        ]
 [ 44.51344898  55.        ]
 [ 83.74016356  45.        ]
 [106.05530496  66.        ]
 [ 70.39531747  32.        ]
 [ 70.39531747  32.        ]
 [-12.5699937   18.        ]
 [ 90.26743225  59.        ]
 [104.2098675   39.        ]
 [106.54758821  69.        ]
 [106.54758821  69.        ]
 [106.54758821  69.        ]
 [ 96.55528533



#### Train a neural net

In the same experiment we would like to compare the performance of the Machine Learning algorithms from the MLLib library with a Pytorch neural net

#### Step 1: Create neural net architecture for regression

In [9]:
# 1. Create MLP class


class MLP(nn.Module):
    def __init__(self, input_dim, l1_neurons, l2_neurons):
        super(MLP, self).__init__()
        self.layer1 = nn.Linear(input_dim, l1_neurons)
        self.relu1 = nn.ReLU()
        self.layer2 = nn.Linear(l1_neurons, l2_neurons)
        self.relu2 = nn.ReLU()
        self.layer3 = nn.Linear(l2_neurons, 1)  

        self._initialize_weights()

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                init.kaiming_uniform_(m.weight, nonlinearity='relu')
                if m.bias is not None: # Check if bias exists before initializing
                    init.constant_(m.bias, 0)
    
    def forward(self, x):
        x = self.layer1(x)
        x = self.relu1(x)
        x = self.layer2(x)
        x = self.relu2(x)
        x = self.layer3(x)
        
        return x

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 11, Finished, Available)

#### Step 2: Create ML FLow experiment function

In [10]:
# Generate function in which Ml Flow experiment is kicked off
from mlflow.models.signature import infer_signature
import numpy as np

def train_evaluate(input_dim, batchsize, learning_rate,
                num_epochs, train_df, test_x, test_y,
                l1_neurons, l2_neurons):
    with mlflow.start_run(run_name="mlp_regression") as run:
        # Log model parameters
        mlflow.log_param("num_training_rows", len(train_df))
        mlflow.log_param("batch size", batchsize)
        mlflow.log_param("learning rate", learning_rate)
        mlflow.log_param("num_epochs", num_epochs)
        
        # Activate dataloader
        dataloader = DataLoader(train_df, batch_size=batchsize, shuffle=True)
        
        # Initialize model, loss and optimizer
        model = MLP(input_dim=input_dim, l1_neurons=l1_neurons, l2_neurons=l2_neurons)
        criterion = nn.MSELoss()
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        
        # Execute model training
        for epoch in range(num_epochs):
            for batch_x, batch_y in dataloader:
                # Forward pass
                outputs = model.forward(batch_x)
                outputs = outputs.squeeze(1)
                loss = criterion(outputs, batch_y)
                
                # Backward pass and optimization
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
            
        
        # Make predictions
        predictions = model(test_x)

        # Calculate MSE and MAE
        mse_loss = nn.MSELoss()
        #mae_loss = nn.L1Loss() 

        mse = mse_loss(predictions, test_y)
        #mae = mae_loss(predictions, test_y)
 
        #Store artifacts
        pred_array = predictions[:100].detach().numpy().flatten()
        truth_array = test_y[:100].detach().numpy().flatten()
        
        # Combine predictions and ground truth into a single array
        combined_array = np.stack((pred_array, truth_array), axis=-1)
        print(combined_array)

        # Save them to a .npy file
        np.save("combined_predictions_truth.npy", combined_array)
        
        # Log the combined numpy array as an artifact
        mlflow.log_artifact("combined_predictions_truth.npy")
        # Log metrics
        mlflow.log_metric("mse", mse)
        #mlflow.log_metric("mae", mae)

        mlflow.pytorch.log_model(model, "price_predictions_airbnb_mlp")
        print("Model saved in run_id=%s" % run.info.run_id)
        artifact_uri = mlflow.get_artifact_uri("model")
        print(artifact_uri)

        #mlflow.register_model(
        #"runs:/{}/price_predictions_airbnb".format(run.info.run_id), name)
        
        print(f"""Neural Net parameters:
                  batchsize: {batchsize} \n
                  learning_rate: {learning_rate} \n
                  num_epochs: {num_epochs} \n
                  Neurons: {l1_neurons} l2_neurons \n
                  MSE: {mse}""")

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 12, Finished, Available)

#### Step 3: Prepare data for pytorch

In [15]:
# Convert the PySpark DataFrame to Pandas DataFrame
pandas_train_df = train_df.drop("features").toPandas()
pandas_test_df = test_df.drop("features").toPandas()

# Split train in x and y
train_x = torch.from_numpy(pandas_train_df.drop("price", axis = 1).values).to(torch.float32)
train_y = torch.from_numpy(pandas_train_df["price"].values).to(torch.float32)
dataset_train = TensorDataset(train_x, train_y)

# Split test in x and y
test_x = torch.from_numpy(pandas_test_df.drop("price", axis = 1).values).to(torch.float32)
test_y = torch.from_numpy(pandas_test_df["price"].values).to(torch.float32).to(torch.float32)


print(train_x.shape)
# # Assemble the features into a single vector
# vector_assembler = VectorAssembler(inputCols=data.feature_names, outputCol="features")
# train_df = vector_assembler.transform(train_df)
# test_df = vector_assembler.transform(test_df)

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 17, Finished, Available)

torch.Size([186633, 78])


In [16]:
# Perform sanity checks

# Check for nan values
has_nan = torch.isnan(train_x).any()
print(f"Contains NaN values: {has_nan.item()}")

# Check for inf values
has_inf = torch.isinf(train_x).any()
print(f"Contains Inf values: {has_inf.item()}")

nan_count = torch.sum(torch.isnan(train_x)).item()
print(f"Number of NaN values: {nan_count}")

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 18, Finished, Available)

Contains NaN values: False
Contains Inf values: False
Number of NaN values: 0


#### Step 4: Run multiple combinations of hyperparameters

In [17]:
# Test multiple hyperparameters

import itertools

learning_rates = [ 0.1, 0.01, 0.001]
epochs = [100]
batch_sizes = [32, 64]
neurons = [32, 64]

# Get all combinations
combinations = list(itertools.product(learning_rates, epochs, batch_sizes, neurons))

for combo in combinations:
    learning_rate, epochs, batch_size, neuron = combo
    train_evaluate(78, batch_size, learning_rate,
                epochs, dataset_train, test_x, test_y,
                neuron, neuron)

StatementMeta(, f29bf580-53de-48f1-a7c1-2dc479c0524f, 19, Submitted, Running)

  return F.mse_loss(input, target, reduction=self.reduction)


[[152.47261  42.     ]
 [152.47261  76.     ]
 [152.47261  55.     ]
 [152.47261  28.     ]
 [152.47261  30.     ]
 [152.47261  35.     ]
 [152.47261  76.     ]
 [152.47261  50.     ]
 [152.47261  40.     ]
 [152.47261  95.     ]
 [152.47261  44.     ]
 [152.47261  60.     ]
 [152.47261  30.     ]
 [152.47261  99.     ]
 [152.47261 149.     ]
 [152.47261  60.     ]
 [152.47261  55.     ]
 [152.47261  30.     ]
 [152.47261  45.     ]
 [152.47261  48.     ]
 [152.47261  60.     ]
 [152.47261  37.     ]
 [152.47261  75.     ]
 [152.47261  55.     ]
 [152.47261  45.     ]
 [152.47261  66.     ]
 [152.47261  32.     ]
 [152.47261  32.     ]
 [152.47261  18.     ]
 [152.47261  59.     ]
 [152.47261  39.     ]
 [152.47261  69.     ]
 [152.47261  69.     ]
 [152.47261  69.     ]
 [152.47261  56.     ]
 [152.47261  50.     ]
 [152.47261  80.     ]
 [152.47261  80.     ]
 [152.47261  35.     ]
 [152.47261  50.     ]
 [152.47261  60.     ]
 [152.47261  60.     ]
 [152.47261  39.     ]
 [152.47261

In [11]:
# Load in Pytorch models

loaded_model = mlflow.pytorch.load_model('sds://lake.trident.com/2d2772c8-0868-471e-a900-374b70d8e1ce/f2f0946d-3623-46d6-a0ab-a7c5853ccb4a/ccfff665-0473-443a-833f-c4cf3b7bbb7c/artifact/model')


StatementMeta(, 0bffae6c-d183-4707-9c3b-5028e5efadfb, 13, Finished, Available)

2023-09-08:12:51:43,544 ERROR    [synapse_mlflow_utils.py:398] [fabric mlflow plugin]: <class 'synapse.ml.mlflow.artifact_repo.TridentMLflowArtifactRepository'>.list_artifacts exception
2023-09-08:12:51:43,546 ERROR    [synapse_mlflow_utils.py:398] [fabric mlflow plugin]: <class 'mlflow.store.artifact.artifact_repo.ArtifactRepository'>._is_directory exception
2023-09-08:12:51:43,548 ERROR    [synapse_mlflow_utils.py:398] [fabric mlflow plugin]: <class 'mlflow.store.artifact.artifact_repo.ArtifactRepository'>.download_artifacts exception


ResourceNotFoundError: The specified path does not exist.
RequestId:550928b8-201f-002b-5753-e2022e000000
Time:2023-09-08T12:51:43.5413722Z
ErrorCode:PathNotFound