In [1]:
!pip install  --quiet kfp==1.8.22 torch

In [2]:
# Restart the kernel to have the new kfp version installed

In [3]:
import kfp
from kfp import dsl
import kfp.components as components
# from kfp.components import InputPath, InputTextFile, OutputPath, OutputTextFile

from typing import NamedTuple

In [18]:


def put_public_mnist_to_local_minio(batch_size=32, test_batch_size=64) :

    import tempfile
    import os
    from minio import Minio
    import numpy as np
    import uuid
    from torchvision import datasets, transforms
    import glob
    
    def upload_local_directory_to_minio(minio_client, local_path, bucket_name, minio_path):
        # assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    minio_client, local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    def get_minio_url():
        minio_host, minio_port = os.environ["MINIO_SERVICE_SERVICE_HOST"], os.environ["MINIO_SERVICE_SERVICE_PORT_HTTP"]
        minio_url= "{}:{}".format(minio_host, minio_port)
        return minio_url
    minio_url = get_minio_url()
    print("minio url:", minio_url)
    BUCKET_NAME="datapipeline-028"
    config = {"endpoint": minio_url,
        "access_key": "minio",
        "secret_key": "minio123",
        "secure": False}
    minio_client = Minio(**config)

    print("try to find bucket {}".format(BUCKET_NAME))
    found = minio_client.bucket_exists(BUCKET_NAME)
    print("found", found)
    if not found:
        minio_client.make_bucket(BUCKET_NAME)
    else:
        print("Bucket '{}' already exists".format(BUCKET_NAME))
    

    mnist_data_dirpath="/tmp/"+str(uuid.uuid4())
    

    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    dataset1 = datasets.MNIST(mnist_data_dirpath, train=True, download=True,
                       transform=transform)
    dataset2 = datasets.MNIST(mnist_data_dirpath, train=False,
                       transform=transform)
    
    upload_local_directory_to_minio(minio_client, mnist_data_dirpath, BUCKET_NAME, "data/original")
                                   
    from shutil import rmtree
    rmtree(mnist_data_dirpath)
    

In [None]:

#-> NamedTuple("Output", [("MnistInputDirPath", str)])
# mnist_input_dir_path: OutputPath(str)
def reshape_mnist_and_put_back() :
    # import numpy as np
    # import pandas as pd
    # import matplotlib.pyplot as plt
    # import tensorflow as tf
    # from tensorflow import keras

    import tempfile
    import os
    from minio import Minio
    import numpy as np
    import uuid
    import glob
    

    
    
    def get_minio_url():
        minio_host, minio_port = os.environ["MINIO_SERVICE_SERVICE_HOST"], os.environ["MINIO_SERVICE_SERVICE_PORT_HTTP"]
        minio_url= "{}:{}".format(minio_host, minio_port)
        return minio_url
    minio_url = get_minio_url()
    print("minio url:", minio_url)
    
    BUCKET_NAME="datapipeline-024"
    config = {"endpoint": minio_url,
        "access_key": "minio",
        "secret_key": "minio123",
        "secure": False}
    minio_client = Minio(**config)

    
    random_prefix=str(uuid.uuid4())
    
    def download_path(filename):
        return "/tmp/{}_{}.npy".format(random_prefix,filename)
    
    for file_name in ["x_train", "x_test"]:
        minio_client.fget_object(BUCKET_NAME,  "data/original/{}.npy".format(file_name), download_path(file_name) )
        reshaped = np.load(download_path(file_name)).reshape(-1,28,28,1) / 255
        np.save(download_path(file_name+".reshaped"),reshaped)
        minio_client.fput_object(BUCKET_NAME, "data/original/{}.reshaped.npy".format(file_name), download_path(file_name+".reshaped"))

    

    # minio_client.fput_object(BUCKET_NAME,  "data/original/y_train.npy", download_path("y_train") )
    # minio_client.fput_object(BUCKET_NAME,  "data/original/y_test.npy", download_path("y_test") )
    # y_train = np.load(download_path("y_train")) / 255
    # y_test = np.load(download_path("y_test"))
    
   
    for f in glob.glob("/tmp/{}_*".format(random_prefix)):
        os.remove(f)

    # from collections import namedtuple
    # output = namedtuple('Output', ['MnistInputDirPath'])
    # return output(mnist_input_dir_path)
    

In [74]:
def train_model(bucket_name=None, device_name="cpu", epochs:int = 1, optimizer: str = "adam", model_save_prefix:str="models/trained/detect-digits", version:str = "1", lr=0.03, gamma=0.7, batch_size=64, test_batch_size=1000, log_interval=100) :
    import os
    from minio import Minio
    import numpy as np
    import uuid
    import glob
    import json
    import shutil
    import argparse
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    import torch.optim as optim
    from torchvision import datasets, transforms
    from torch.optim.lr_scheduler import StepLR


    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 32, 3, 1)
            self.conv2 = nn.Conv2d(32, 64, 3, 1)
            self.dropout1 = nn.Dropout(0.25)
            self.dropout2 = nn.Dropout(0.5)
            self.fc1 = nn.Linear(9216, 128)
            self.fc2 = nn.Linear(128, 10)

        def forward(self, x):
            x = self.conv1(x)
            x = F.relu(x)
            x = self.conv2(x)
            x = F.relu(x)
            x = F.max_pool2d(x, 2)
            x = self.dropout1(x)
            x = torch.flatten(x, 1)
            x = self.fc1(x)
            x = F.relu(x)
            x = self.dropout2(x)
            x = self.fc2(x)
            output = F.log_softmax(x, dim=1)
            return output

    def train(log_interval,model, device, train_loader, optimizer, epoch):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 100 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item()))


    def test(model, device, test_loader):
        model.eval()
        test_loss = 0
        correct = 0
        with torch.no_grad():
            for data, target in test_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
                pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
                correct += pred.eq(target.view_as(pred)).sum().item()

        test_loss /= len(test_loader.dataset)
        print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
            test_loss, correct, len(test_loader.dataset),
            100. * correct / len(test_loader.dataset)))
    
        
    def upload_local_directory_to_minio(minio_client, local_path, bucket_name, minio_path):
        # assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    minio_client, local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    
    def get_minio_url():
        minio_host, minio_port = os.environ["MINIO_SERVICE_SERVICE_HOST"], os.environ["MINIO_SERVICE_SERVICE_PORT_HTTP"]
        minio_url= "{}:{}".format(minio_host, minio_port)
        return minio_url
    minio_url = get_minio_url()
    
    config = {"endpoint": minio_url,
        "access_key": "minio",
        "secret_key": "minio123",
        "secure": False}
    minio_client = Minio(**config)
    
    random_prefix=str(uuid.uuid4())
    def download_path(filename):
        return "/tmp/{}_{}.npy".format(random_prefix,filename)


    train_data_saved_path="/tmp"
    os.makedirs(train_data_saved_path, exist_ok=True)
    
    
    model_data_remote_path="data/original"


    
    
    for bucket in minio_client.list_buckets():
        if bucket.name!=bucket_name:
            continue
        for item in minio_client.list_objects(bucket.name,model_data_remote_path,recursive=True):
            print("remote name:",item.object_name)
            print("local name:", train_data_saved_path+"/"+item.object_name)
            minio_client.fget_object(bucket.name,item.object_name, train_data_saved_path+"/"+item.object_name)

    
    train_kwargs = {'batch_size': 64}
    test_kwargs = {'batch_size': 1000}

    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])



    local_mnist_data_rootdir=train_data_saved_path+"/"+model_data_remote_path
    
    files = os.listdir(local_mnist_data_rootdir)
    print("local_mnist_data_rootdir:",files)
    
    dataset1 = datasets.MNIST(local_mnist_data_rootdir, train=True, download=False,
                       transform=transform)
    dataset2 = datasets.MNIST(local_mnist_data_rootdir, train=False, download=False,
                       transform=transform)


    train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    device = torch.device(device_name)
    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=0.03)

    scheduler = StepLR(optimizer, step_size=1, gamma=gamma)
    for epoch in range(1, epochs + 1):
        train(log_interval, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()
    
    
    
    model_remote_path="{}/{}".format(model_save_prefix, version)
    model_save_dir="/tmp/model_{}/{}".format(model_save_prefix, version)
    os.makedirs(model_save_dir, exist_ok=True)


    model_save_path="{}/{}".format(model_save_dir, "mnist.pt")
    torch.save(model.state_dict(), model_save_path)

    
    model_script_save_path="{}/{}".format(model_save_dir, "model_scripted.pt")
    model_scripted = torch.jit.script(model) # Export to TorchScript
    model_scripted.save(model_script_save_path) # Save
    
    
    
    
    upload_local_directory_to_minio(minio_client, model_save_dir,bucket_name,model_remote_path) 
    
    
    



In [7]:
from typing import NamedTuple

def test_model_and_save_metrics(model_save_prefix:str="models/trained/detect-digits", version:str="1") -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]) :
    import tensorflow as tf
    import tensorflow.keras as keras
    import os
    from minio import Minio
    import numpy as np
    import uuid
    import glob
    import pandas as pd
    import json
    import shutil
    
    # generate confusion matrix csv
    def gen_cm_csv(y_test=None,test_predictions=None):
        confusion_matrix = tf.math.confusion_matrix(labels=y_test,predictions=test_predictions)
        confusion_matrix = confusion_matrix.numpy()
        vocab = list(np.unique(y_test))
        data = []
        for target_index, target_row in enumerate(confusion_matrix):
            for predicted_index, count in enumerate(target_row):
                data.append((vocab[target_index], vocab[predicted_index], count))

        df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
        cm_csv = df_cm.to_csv(header=False, index=False)
        return cm_csv
    
    
    def get_minio_url():
        minio_host, minio_port = os.environ["MINIO_SERVICE_SERVICE_HOST"], os.environ["MINIO_SERVICE_SERVICE_PORT_HTTP"]
        minio_url= "{}:{}".format(minio_host, minio_port)
        return minio_url
    minio_url = get_minio_url()
    
    BUCKET_NAME="datapipeline-024"
    config = {"endpoint": minio_url,
        "access_key": "minio",
        "secret_key": "minio123",
        "secure": False}
    minio_client = Minio(**config)
    
    random_prefix=str(uuid.uuid4())
    def download_path(filename):
        return "/tmp/{}_{}.npy".format(random_prefix,filename)


    model_remote_path="{}/{}".format(model_save_prefix, version)
    model_saved_path="/tmp/{}/{}".format(model_save_prefix, version)
    # minio_client.fget_object(BUCKET_NAME,model_remote_path,model_saved_path)
    
    for bucket in minio_client.list_buckets():
        if bucket.name!=BUCKET_NAME:
            continue
        for item in minio_client.list_objects(bucket.name,model_remote_path,recursive=True):
            print("remote name:",item.object_name)
            print("local name:", "/tmp/"+item.object_name)
            minio_client.fget_object(bucket.name,item.object_name, "/tmp/"+item.object_name)
    
    
    from os import listdir
    from os.path import isfile, join

    

    itisfile=isfile(model_saved_path)
    print("{} : it is file: {}".format( model_saved_path, itisfile) )
    if not itisfile:
        print ("it is directory")
        onlyfiles = [f for f in listdir(model_saved_path) ]
        print ("dir content: ")
        print(onlyfiles)
    
    # model = tf.keras.models.load_model("/tmp/{}/{}".format(model_save_prefix, version))
    # model = tf.keras.saving.load_model("/tmp/{}/{}".format(model_save_prefix, version))
    print ("loading model in path:", model_saved_path)
    # model = tf.saved_model.load(model_saved_path)
    model = tf.keras.models.load_model(model_saved_path)
    
    for file_name in ["x_test", "y_test"]:
        minio_client.fget_object(BUCKET_NAME,  "data/original/{}.npy".format(file_name), download_path(file_name) )
    x_test, y_test = np.load(download_path("x_test")), np.load(download_path("y_test"))
    
    
    # Test the model against the test dataset
    # Returns the loss value & metrics values for the model in test mode.
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    
    # Generates output predictions for the input samples.
    test_predictions = model.predict(x=x_test)

    # Returns the indices of the maximum values along an axis.
    test_predictions = np.argmax(test_predictions,axis=1) 
    
    cm_csv = gen_cm_csv(y_test=y_test,test_predictions=test_predictions)
    
    #show model summary - how it looks
    stringlist = []
    model.summary(print_fn=lambda x: stringlist.append(x))
    metric_model_summary = "\n".join(stringlist)
    
    output_confussion_matrix = {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {'name': 'target', 'type': 'CATEGORY'},
                    {'name': 'predicted', 'type': 'CATEGORY'},
                    {'name': 'count', 'type': 'NUMBER'},
                  ],
                "target_col" : "actual",
                "predicted_col" : "predicted",
                "source": cm_csv,
                "storage": "inline",
                "labels": list(np.arange(10)) #0..9 labels
            }
    output_model_summary = {
                'type': 'markdown',
                'storage': 'inline',
                'source': f'''# Model Overview
## Model Summary

```
{metric_model_summary}
```

## Model Performance

**Accuracy**: {model_accuracy}
**Loss**: {model_loss}

'''
            }
    
    metadata = {"outputs": [output_confussion_matrix, output_model_summary]}
    metrics = {
      'metrics': [{
          'name': 'model_accuracy',
          'numberValue':  float(model_accuracy),
          'format' : "PERCENTAGE"
        },{
          'name': 'model_loss',
          'numberValue':  float(model_loss),
          'format' : "PERCENTAGE"
        }]}
    
    
    class NpJsonEncoder(json.JSONEncoder):
        """Serializes numpy objects as json."""

        def default(self, obj):
            if isinstance(obj, np.integer):
                return int(obj)
            elif isinstance(obj, np.bool_):
                return bool(obj)
            elif isinstance(obj, np.floating):
                if np.isnan(obj):
                    return None  # Serialized as JSON null.
                return float(obj)
            elif isinstance(obj, np.ndarray):
                return obj.tolist()
            else:
                return super().default(obj)
        
    from collections import namedtuple
    output = namedtuple('Output', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return output(json.dumps(metadata, cls=NpJsonEncoder),json.dumps(metrics, cls=NpJsonEncoder))

In [75]:

import kfp
from kfp import dsl
import kfp.components as components

step_put_public_mnist_to_local_minio = components.create_component_from_func(put_public_mnist_to_local_minio,base_image="kubeflownotebookswg/jupyter-pytorch-full")
step_train_model = components.create_component_from_func(train_model,base_image="kubeflownotebookswg/jupyter-pytorch-full")
# step_test_model_and_save_metrics = components.create_component_from_func(test_model_and_save_metrics,base_image="kubeflownotebookswg/jupyter-tensorflow-full")

@dsl.pipeline(
    name='kfp-pipeline-digits-clsfier',
    description='Classify digits'
)
def kfp_pipeline_start(bucket_name, device_name, epochs,optimizer,model_save_prefix,  batch_size, test_batch_size, version, lr, gamma, seed, log_interval):
    # step1 = step_put_public_mnist_to_local_minio(batch_size, test_batch_size)
    step2 = step_train_model(bucket_name,device_name, epochs,optimizer,model_save_prefix, version, lr,gamma, batch_size, test_batch_size, log_interval)
#     step4 = step_test_model_and_save_metrics(model_save_prefix, version)

    # step2.after(step1)
#     step3.after(step2)
#     step4.after(step3)






In [76]:
import os
host, port = os.environ["ML_PIPELINE_PORT_8888_TCP_ADDR"],os.environ["ML_PIPELINE_SERVICE_PORT_HTTP"]
kfp_endpoint="http://{}:{}".format(host,port)

kfp_client = kfp.Client(host=kfp_endpoint)
pipeline_arguments = {
    "epochs": 1,
    "optimizer": "adam",
    "batch_size":64,
    "test_batch_size":1000,
    "device_name":"cpu",
    "epochs":1,
    "lr":0.03,
    "gamma":0.7,
    "seed":1,
    "log_interval":100,
    "model_save_prefix": "models/trained/detect-digits",
    "bucket_name":"datapipeline-028",
    "version": "14"
}
pipeline_run = kfp_client.create_run_from_pipeline_func(kfp_pipeline_start, arguments=pipeline_arguments)


In [77]:
import os
from IPython.display import IFrame
cluster_id= os.environ["MEDDASH_CLUSTER_ID"]
run_id=pipeline_run.run_id
print("pipeline.run_id:", run_id)
pipeline_vis_endpoint="http://kf-ml-pipeline-ui.{}.hosted.meddash.cloud/#/runs/details/{}".format(cluster_id,run_id)
IFrame(pipeline_vis_endpoint, width=1200, height=600)

pipeline.run_id: 84cf42ec-eea0-4017-ac84-9d08679c6798
