In [1]:
from time import time
from typing import List, Dict, Any, Tuple

import mlflow
from mlflow.store.artifact.runs_artifact_repo import RunsArtifactRepository
from mlflow import MlflowClient
import torch
from torch.utils.data.dataloader import DataLoader
from torch import nn
from torch import optim
from torchvision import datasets, transforms

### MLflow Tracking and MinIO

In [2]:
def load_images(batch_size: int) -> Tuple[Any]:
    # Start of load time.
    start_time = time()

    # Define a transform to normalize the data
    transform = transforms.Compose([transforms.ToTensor(),
                                transforms.Normalize((0.5,), (0.5,)),
                                ])

    # Download and load the training data
    train_dataset = datasets.MNIST('./mnistdata', download=True, train=True, transform=transform)
    test_dataset = datasets.MNIST('./mnistdata', download=True, train=False, transform=transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)
    return train_loader, test_loader, len(train_dataset), len(test_dataset), (time()-start_time)

In [3]:
class MNISTModel(nn.Module):
    def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int):
        super().__init__()
        
        self.lin1 = nn.Linear(input_size, hidden_sizes[0])
        self.lin2 = nn.Linear(hidden_sizes[0], hidden_sizes[1])
        self.lin3 = nn.Linear(hidden_sizes[1], hidden_sizes[2])
        self.lin4 = nn.Linear(hidden_sizes[2], hidden_sizes[3])
        self.lin5 = nn.Linear(hidden_sizes[3], output_size)
        self.activation = nn.ReLU()
        self.output_activation = nn.LogSoftmax(dim=1)

    def forward(self, x):
        out = self.lin1(x)
        out = self.activation(out)
        out = self.lin2(out)
        out = self.activation(out)
        out = self.lin3(out)
        out = self.activation(out)
        out = self.lin4(out)
        out = self.activation(out)
        out = self.lin5(out)
        out = self.output_activation(out)
        return out

In [4]:
def train_model(model: MNISTModel, loader: DataLoader, params: Dict[str, Any]) -> Dict[str, Any]:
    start_time = time()
    loss_func = nn.NLLLoss()
    optimizer = optim.SGD(model.parameters(), lr=params['lr'], momentum=params['momentum'])
    training_metrics = {}
    for epoch in range(params['epochs']):
        total_loss = 0
        for images, labels in loader:
            # Flatten MNIST images into a 784 long vector.
            images = images.view(images.shape[0], -1)
        
            # Training pass
            optimizer.zero_grad()
            
            output = model(images)
            loss = loss_func(output, labels)
            
            # This is where the model learns by backpropagating
            loss.backward()
            
            # And optimizes its weights here
            optimizer.step()
            
            total_loss += loss.item()
        else:
            mlflow.log_metric('training_loss', total_loss/len(loader), epoch+1)
            print("Epoch {} - Training loss: {}".format(epoch+1, total_loss/len(loader)))

    training_time_sec = (time()-start_time)
    training_metrics['training_time_sec'] = training_time_sec
    print("\nTraining Time (in seconds) =",training_time_sec)
    return training_metrics

In [5]:
def test_model(model: MNISTModel, loader: DataLoader) -> Dict[str, Any]:
    correct_count, total_count = 0, 0
    for images, labels in loader:
        for i in range(len(labels)):
            img = images[i].view(1, 784)
            # Turn off gradients to speed up this part
            with torch.no_grad():
                logps = model(img)

            # Output of the network are log-probabilities, need to take exponential for probabilities
            ps = torch.exp(logps)
            probab = list(ps.numpy()[0])
            pred_label = probab.index(max(probab))
            true_label = labels.numpy()[i]
            if(true_label == pred_label):
                correct_count += 1
            total_count += 1
    
    testing_metrics = {
        'incorrect_count': total_count-correct_count,
        'correct_count': correct_count,
        'accuracy': (correct_count/total_count)
    }
    print("Number Of Images Tested =", total_count)
    print("\nModel Accuracy =", (correct_count/total_count))
    return testing_metrics

In [6]:
# Setup parameters
params = {
    'batch_size': 64,
    'epochs': 10,
    'input_size': 784,
    'hidden_sizes': [1024, 1024, 1024, 1024],
    'lr': 0.025,
    'momentum': 0.5,
    'output_size': 10
    }

# Setup mlflow to point to our server.
experiment_name = 'MNIST Learning Rate Experiments'
run_name = f'Hidden state ={params["hidden_sizes"]}'
mlflow.set_tracking_uri('http://localhost:5001/')
active_experiment = mlflow.set_experiment(experiment_name)
active_run = mlflow.start_run(run_name=run_name)

# Log parameters
mlflow.log_params(params)

# Load the data and log loading metrics.
train_loader, test_loader, train_size, test_size, load_time_sec = load_images(params['batch_size'])
mlflow.log_metric('train_size', train_size)
mlflow.log_metric('test_size', test_size)
mlflow.log_metric('load_time_sec', load_time_sec)

# Train the model and log training metrics.
model = MNISTModel(params['input_size'], params['hidden_sizes'], params['output_size'])
training_metrics = train_model(model, train_loader, params)
mlflow.log_metrics(training_metrics)

# Test the model and log the accuracy as a metric.
testing_metrics = test_model(model, test_loader)
mlflow.log_metrics(testing_metrics)

# Log the raw data.
#mlflow.log_artifacts('./mnistdata', artifact_path='mnistdata')



Epoch 1 - Training loss: 0.6336258619801322
Epoch 2 - Training loss: 0.19468905425656324
Epoch 3 - Training loss: 0.12938457320847396
Epoch 4 - Training loss: 0.09959393369852067
Epoch 5 - Training loss: 0.07963338740535382
Epoch 6 - Training loss: 0.06545253556970355
Epoch 7 - Training loss: 0.054836851275607404
Epoch 8 - Training loss: 0.04588503119357522
Epoch 9 - Training loss: 0.03869035498235129
Epoch 10 - Training loss: 0.031787062595190825

Training Time (in seconds) = 81.01446604728699
Number Of Images Tested = 10000

Model Accuracy = 0.9795


In [7]:
# Log the trained model.
images, labels = next(iter(test_loader))
img = images[0].view(1, 784)
label = labels[0]
print(img.size())
print(label.size())
with torch.no_grad():
    logps = model(img)

model_signature = mlflow.models.infer_signature(img.numpy(), logps.numpy())
model_artifact_path = 'mnistmodel'
mlflow.pytorch.log_model(model, artifact_path=model_artifact_path, signature=model_signature)

# End the run
mlflow.end_run()

torch.Size([1, 784])
torch.Size([])
🏃 View run Hidden state =[1024, 1024, 1024, 1024] at: http://localhost:5001/#/experiments/3/runs/ef8189d30fa64c98a16f2cc75274a39a
🧪 View experiment at: http://localhost:5001/#/experiments/3


### MLflow Model Registry and MinIO

In [None]:
active_experiment

In [None]:
model(img)

In [10]:
#model_version = mlflow.register_model(run_uri, model_name, tags=version_tags)

In [None]:

model_name = 'mnist_lr_optimized'
client = MlflowClient()

# Register top-level collection entity if it has not been previously registered.
filter_string = f"name='{model_name}'"
results = client.search_registered_models(filter_string=filter_string)
if len(results) == 0:
    model_tags = {'framework': 'Pytorch'}
    model_description = 'Testing MNIST model with schema.'
    client.create_registered_model(model_name, model_tags, model_description)

# Register the new version
run_id = active_run.info.run_id
run_uri = f'runs:/{run_id}/{model_artifact_path}'
model_source = RunsArtifactRepository.get_underlying_uri(run_uri)
version_tags = {'layers': len(params['hidden_sizes'])}
version_description = f'Hidden sizes: {params["hidden_sizes"]}'
model_version = client.create_model_version(model_name, model_source, run_id, tags=version_tags, description=version_description)



In [None]:
print("Name: {}".format(model_version.name))
print("Version: {}".format(model_version.version))
print("Description: {}".format(model_version.description))
print("Status: {}".format(model_version.status))
print("Stage: {}".format(model_version.current_stage))

In [None]:
model_signature