In [1]:
! pip install minio



In [None]:
# 
# Creating a ML pipeline with the MNIST digits dataset
# KFPv1 Example with lightweight Python components only
#

import os
import requests

from typing import NamedTuple
def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float),('datapoints_test', float),('dataset_version', str)]):
    """
    Function to get dataset and load it to minio bucket
    """
    print("getting data")
    from tensorflow import keras
    from minio import Minio
    import numpy as np
    import json

    minio_client = Minio(
        "172.17.0.35:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    minio_client.fget_object(minio_bucket,"mnist.npz","/tmp/mnist.npz")
    
    def load_data():
        with np.load("/tmp/mnist.npz", allow_pickle=True) as f:
            x_train, y_train = f["x_train"], f["y_train"]
            x_test, y_test = f["x_test"], f["y_test"]

        return (x_train, y_train), (x_test, y_test)
    
    # Get MNIST data directly from library
    (x_train, y_train), (x_test, y_test) = load_data()

    # save to numpy file, store in Minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")

    np.save("/tmp/y_train.npy",y_train)
    minio_client.fput_object(minio_bucket,"y_train","/tmp/y_train.npy")

    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")

    np.save("/tmp/y_test.npy",y_test)
    minio_client.fput_object(minio_bucket,"y_test","/tmp/y_test.npy")
    
    dataset_version = "1.0"
    
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    from collections import namedtuple
    divmod_output = namedtuple('Outputs', ['datapoints_training', 'datapoints_test', 'dataset_version'])
    return [float(x_train.shape[0]),float(x_test.shape[0]),dataset_version]
    
def get_latest_data():
    """
    Dummy functions for showcasing
    """
    print("Adding latest data")
    
    
def reshape_data():
    """
    Reshape the data for model building
    """
    print("reshaping data")
    
    from minio import Minio
    import numpy as np
    import os

    minio_client = Minio(
        "172.17.0.35:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"

    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)

    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    # save data from minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")