In [None]:
# from kfp.components.types.type_annotations import OutputPath, InputPath
# from kfp.components.component_factory import create_component_from_func
# import kfp

from kfp.components import InputPath, OutputPath, create_component_from_func

def load_tensorflow_dataset_component(dataset_str: str, 
                                      train_test_split: bool, 
                                      data_file_output: OutputPath('tf.data.Dataset')):
    
        import tensorflow_datasets as tfds
        import json

        # define function
        def load_tensorflow_dataset(dataset_str: str, 
                                    train_test_split: bool = True):
                # assign train_test_split param
                if train_test_split is True:
                        split = ['train[:20%]', 'test']
                else:
                        split = 'all'
                
                # load
                data, ds_info = tfds.load(
                        dataset_str,
                        split=split, shuffle_files=True,
                        as_supervised=True,
                        with_info=True,
                )

                # package data
                if train_test_split is True:
                        data = {'train': data[0], 
                                'test': data[1]}
                else:
                        data = {'train': data,
                                'test': None}
                return data

        # output is a dictionary with 'test' and 'train' keys. 
        data = load_tensorflow_dataset(dataset_str=dataset_str, 
                                        train_test_split=train_test_split)
        
        # save output data 
        data['train'].save(data_file_output)


load_op = create_component_from_func(
        func=load_tensorflow_dataset_component,
        base_image='python:3.9',
        packages_to_install=['tensorflow', 'tensorflow_datasets'])

In [None]:
def preprocess_component(data_file_input: InputPath('tf.data.Dataset'), 
                         pp_data_file_output: OutputPath('tf.data.Dataset')):
    import numpy as np
    import tensorflow as tf

    # define preprocess function
    def preprocess_mnist_tfds(image, label=None):
        # reshape and upsample to 3 channel for transfer learning models
        # ... for when no channel information is present
        if len(image.shape) != 3:
            image = np.dstack((image, image, image))
        # ... for when channel is only 1 dimension
        if image.shape[2] == 1:
            image = tf.image.grayscale_to_rgb(image)
        # normalize pixel values
        image = tf.cast(image, tf.float32) / 255.
        # resize with pad for mobilenetv2
        image = tf.image.resize_with_pad(image, target_height=224, target_width=224)
        return image, label

    
    # load data from previous step
    data = tf.data.Dataset.load(data_file_input)

    data = data.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
    data = data.batch(128)

    # # preprocess and batch 
    # for key, value in data.items():
    #     if value is not None:
    #         data[key] = value.map(preprocess_mnist_tfds, num_parallel_calls=tf.data.AUTOTUNE)
    #         data[key] = data[key].batch(128)
        
    # save output data 
    # with open(pp_data_file_output, "wb") as file:
    #         file.write(data.to_file())
    data.save(pp_data_file_output)


preprocess_op = create_component_from_func(
        func=preprocess_component,
        base_image='python:3.9',
        packages_to_install=['numpy', 'tensorflow'])

In [None]:
def production_train_component(pp_data_file_input: InputPath('tf.data.Dataset'),
                               experiment_name: str):
    
    import tensorflow_hub as hub
    import mlflow
    import numpy as np
    import tensorflow as tf
    import time

    # mlflow Tracking requires definition of experiment name AND logged params
    # Experiment names they should be defined as "project-task-version"
    def set_mlflow_experiment(experiment_name:str, artifact_location: str = None):
        try:
            experiment_id = mlflow.create_experiment(experiment_name, 
                                                    artifact_location=artifact_location)
        # except mlflow.exceptions.MlflowException as e:
        #   if str(e) == f"Experiment '{experiment_name}' already exists.":
        except:
            print(f'Experiment already exists, setting experiment to {experiment_name}')
            experiment_info = mlflow.set_experiment(experiment_name)
            experiment_id = experiment_info.experiment_id
        experiment = mlflow.get_experiment(experiment_id)
        print("---------------------")
        print('Experiment details are:')
        print("Name: {}".format(experiment.name))
        print("Experiment_id: {}".format(experiment.experiment_id))
        print("Artifact Location: {}".format(experiment.artifact_location))
        print("Creation timestamp: {}".format(experiment.creation_time))
        return experiment_id

    class MNIST(mlflow.pyfunc.PythonModel): 
        def __init__(self, mlflow_registered_model_name: str = None):
            self._model = None
            self._mlflow_registered_model_name = mlflow_registered_model_name
            self.load()    
        @staticmethod
        def _build(self, hyperparameters):
            ## Build model
            # class names for mnist hardcoded
            class_names = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
        
            # set layer regularization for DNN
            regularizer = tf.keras.regularizers.l1_l2(hyperparameters['l1'], hyperparameters['l2'])

            # load in mobilenetv2 weights and instantiate dense classification head 
            base_model = "https://tfhub.dev/google/tf2-preview/mobilenet_v2/feature_vector/4"
            layers = [
                hub.KerasLayer(
                    base_model,
                    input_shape=(224, 224, 3),
                    trainable=False,
                    name='mobilenet_embedding'),
                tf.keras.layers.Dense(hyperparameters['num_hidden'],
                                    kernel_regularizer=regularizer,
                                    activation='relu',
                                    name='dense_hidden'),
                tf.keras.layers.Dense(len(class_names),
                                    kernel_regularizer=regularizer,
                                    activation='softmax',
                                    name='mnist_prob')
            ]

            self._model = tf.keras.Sequential(layers, name='mnist-classification')

            # compile model 
            self._model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=hyperparameters['learning_rate']),
                                loss=tf.keras.losses.SparseCategoricalCrossentropy(
                                from_logits=False),
                                metrics=['accuracy'])
            
            # base model logging
            self._model_base = base_model

        def fit_hp_search(self, xy_train, xy_test, hyperparameters):                      
            self._build(self, hyperparameters)
            # fit model using train/test split to find hyperparams
            self._train_history = self._model.fit(xy_train,
                                                epochs=hyperparameters['epochs'],
                                                validation_data=xy_test)
        
        def fit_production(self, xy_train, hyperparameters):                      
            self._build(self, hyperparameters)
            # fit model using all the data 
            self._train_history = self._model.fit(xy_train,
                                                epochs=hyperparameters['epochs'])
    
    # instantiate model and load data
    mnist_model = MNIST()
    ds_train = tf.data.Dataset.load(pp_data_file_input)
    hyperparameters = {
    'learning_rate': 0.01,
    'l1': 0.0,
    'l2': 0.0, 
    'num_hidden': 16,
    'epochs': 10}

    # train model and log via mlflow
    experiment_id = set_mlflow_experiment(experiment_name=experiment_name)
    mlflow_run_name=f'production-{time.strftime("%Y%m%d-%H%M%S")}'
    with mlflow.start_run(experiment_id=experiment_id,  
                            run_name=mlflow_run_name) as run:
        mnist_model.fit_production(xy_train=ds_train,
                                    hyperparameters=hyperparameters)
        # MLFlow Tracking parameters
        mlflow.log_params(params=hyperparameters)

        # MLFlow Tracking metrics 
        # Logging metrics for each epoch (housed in dictionary)
        training_history = mnist_model._train_history.history
        for epoch in range(0, hyperparameters['epochs']):
            insert = {}
            for metric, value in training_history.items():
                insert[metric] = training_history[metric][epoch]
            mlflow.log_metrics(metrics=insert, step=epoch+1)

        # MLFlow tracking artifact (e.g. model file)
        # this will log the model and all its details under run_id/artifacts
        # ths will also register the model so it can be served
        result = mlflow.tensorflow.log_model(python_model=mnist_model,
                                artifact_path="",
                                registered_model_name=experiment_name)

        uri = f'{run.info.artifact_uri}/{result.artifact_path}'
        
        # Close out MLFlow run to prevent any log contamination.
        mlflow.end_run(status='FINISHED') 

    return uri



production_train_op = create_component_from_func(
                    func=production_train_component,
                    base_image='python:3.9',
                    packages_to_install=['numpy', 'tensorflow', 'tensorflow_hub', 'mlflow'])



In [None]:
import kfp.dsl as dsl
from kubernetes.client.models import V1EnvVar

@dsl.pipeline(name="production-pipeline",
              description="production training pipeline")
def load_pp_pipeline(dataset_str: str,
                     train_test_split: bool,
                     experiment_name: str):
    load_task = load_op(dataset_str=dataset_str,
                        train_test_split=train_test_split)
    preprocess_task = preprocess_op(data_file_input=load_task.outputs['data_file_output'])
    production_train_task = (production_train_op(pp_data_file_input=preprocess_task.outputs['pp_data_file_output'],
                                                experiment_name=experiment_name)).add_env_variable(V1EnvVar(name='MLFLOW_TRACKING_URI',
                                             value='http://mlflow.mlflow.svc.cluster.local'))


In [None]:
import kfp
kfp.compiler.Compiler().compile(load_pp_pipeline, "./pipelines/production-pipeline.yaml")


In [None]:
experiment_name = "mnist-classification"
mlflow_run_name = "test_run"
hyperparams = {
    'learning_rate': 0.01,
    'l1': 0.0,
    'l2': 0.0, 
    'num_hidden': 16,
    'epochs': 10}

def hyperparameter_search(pp_data_file_input: InputPath('pickle'),
                          num_runs,
                          num_parallel):
    import numpy as np
    import tensorflow as tf
    import pickle
    import mlflow
    from components.utils import set_mlflow_experiment
    from components.model_step import model

    
    
    
    # load data from pipeline
    with open(pp_data_file_input, "rb") as file:
        data = pickle.load(file)
    
    # instantiate model 
    model = MNIST()

    # train model 
    model.fit_hp_search(xy_train=data[0],
                        xy_test=data[1],
                        )
    
    # define experiment name and run name
    experiment_name = "mnist-classification"
    experiment_id = mlflow_experiment_definition(experiment_name)
    mlflow_run_name = "deployment_run"
    
    # loaded hyperparameters
    hyperparams = {
    'learning_rate': 0.01,
    'l1': 0.0,
    'l2': 0.0, 
    'num_hidden': 16,
    'epochs': 10}
    
    with mlflow.start_run(experiment_id=experiment_id, 
                      run_name=mlflow_run_name) as run:
        # You can set autolog for tensorflow model.
        # Note that autolog does not allow logging of any additional params and metrics.
        # We'll choose to do manual logging.
        # mlflow.tensorflow.autolog()

        model.fit(ds_train, ds_test, hyperparams)

        # MLFlow Tracking parameters
        mlflow.log_params(params=hyperparams)

        # MLFlow Tracking metrics 
        # Logging metrics for each epoch (housed in dictionary)
        training_history = model._train_history.history
        for epoch in range(0, hyperparams['epochs']):
            insert = {}
            for metric, value in training_history.items():
                insert[metric] = training_history[metric][epoch]
            mlflow.log_metrics(metrics=insert, step=epoch+1)

        # MLFlow tracking artifact (e.g. model file)
        # this will log the model and all its details under run_id/artifacts
        mlflow.pyfunc.log_model(python_model=model,
                               artifact_path="")
        
        mlflow.get_artifact_uri()
        # Close out MLFlow run to prevent any log contamination.
        mlflow.end_run(status='FINISHED')
        
    return f"{mlflow.get_artifact_uri()}/{model.artifact_path}"


if __name__ == "__main__":
    
    train_op = kfp.components.create_component_from_func(
        func=train,
        base_image='python:3.9',
        packages_to_install=['tensorflow', 'numpy', 'mlflow'],
        output_component_file='train_tensorflow_mnist.yaml')
    print('Completed transfer learning training on MNIST')



In [None]:
load_op = kfp.components.load_component_from_file('./components/load_step/load_component.yaml')
pp_op = kfp.components.load_component_from_file('./components/preprocess_step/preprocess_component.yaml')


load_op(dataset_str='mnist',
        train_test_split=True)

pp_op()



In [25]:
import os
import tensorflow as tf
os.getcwd()

data_file_input = os.getcwd() + '/components/preprocess_step/data/'

data = tf.data.Dataset.load(data_file_input)


[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/text_format.cc:337] Error parsing text-format tensorflow.data.experimental.DistributedSnapshotMetadata: 1:1: Invalid control characters encountered in text.
[libprotobuf ERROR external/com_google_protobuf/src/google/protobuf/text_format.cc:337] Error parsing text-format tensorflow.data.experimental.DistributedSnapshotMetadata: 1:3: Expected identifier, got: 408422040865492794
