In [129]:
from typing import NamedTuple
from kfp.components import InputPath, OutputPath

def load_data(
             url:str,
             out_path: OutputPath("PandasDataFrame")
    ):
    
    import pandas as pd
    from collections import namedtuple
    df = pd.read_csv(url)
    print("No of records",df.index)
    df.to_parquet(out_path) 
    
def process_data(
                batch_size: int,
                train_size: float,
                val_size: float,
                test_size: float,
                pandas_parquet: InputPath("PandasDataFrame"),
                out_train_path: OutputPath("TF_DataSet"),
                out_test_path: OutputPath("TF_DataSet"),
                out_validate_path: OutputPath("TF_DataSet")
    ):
    import pandas as pd
    import sklearn as sk
    import numpy as np
    import tensorflow as tf
    
    from tensorflow import keras
    from tensorflow.keras import layers
    from tensorflow.keras.layers.experimental import preprocessing
    from sklearn.model_selection import train_test_split
    
    df = pd.read_parquet(pandas_parquet)
    SAMPLE_SIZE = len(df.index)
    
    # Specify the data 3 skip the quality out for y
    X= df.loc[:, :"thall"]
    y = df.loc[:,["output"]]
    print(X.head())
    print(y.head())
    print("shape of X",X.to_numpy().shape)
    print("shape of y",y.to_numpy().shape)

    # Scale the Feature data
    scaler = sk.preprocessing.StandardScaler().fit(X)
    X_s = scaler.transform(X)
    # Convert the  validation data 
    num_classes = 2
    y_s = keras.utils.to_categorical(y, num_classes)

    print("shape of X_s",X_s.shape)
    print("shape of y_s",y_s.shape)
    print("X_s",X_s[1])
    print("y_s",y_s[1])
    # Preprocess the data (these are NumPy arrays)
    X_s = X_s.reshape(SAMPLE_SIZE, 13).astype("float32") 
    y_s = y_s.reshape(SAMPLE_SIZE, 2).astype("float32") 
    print("shape of X_s",X_s.shape)
    print("shape of y_s",y_s.shape)
    print("X_s",X_s[1])
    print("y_s",y_s[1])

    dataset = tf.data.Dataset.from_tensor_slices((X_s, y_s))

    train_size = int(train_size * SAMPLE_SIZE)
    val_size = int(val_size * SAMPLE_SIZE)
    test_size = int(test_size * SAMPLE_SIZE)
    print("train_size=",train_size,"val_size=",val_size,"test_size=",test_size)
    dataset = dataset.shuffle(SAMPLE_SIZE)

    train_dataset = dataset.take(train_size)
    test_dataset = dataset.skip(train_size)
    val_dataset = test_dataset.skip(val_size)
    test_dataset = test_dataset.take(test_size)

    train_dataset = train_dataset.shuffle(buffer_size=1024).batch(batch_size)
    test_dataset = test_dataset.shuffle(buffer_size=1024).batch(batch_size)
    val_dataset = val_dataset.shuffle(buffer_size=1024).batch(batch_size)
    print("Created Train, Test and Validate Data Set")
    tf.data.experimental.save(train_dataset,out_train_path)
    tf.data.experimental.save(test_dataset,out_test_path)
    tf.data.experimental.save(val_dataset,out_validate_path)
    return 

def create_nn_model(
                   optimizer: str,
                   loss: str,
                   mertics_key: str,
                   layers1: int,
                   layers2: int,
                   out_test: OutputPath("TF_Model")
    ):
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers

    inputs = keras.Input(shape=(13,), name="inputs")
    flat = keras.layers.Flatten()(inputs)
    x = layers.Dense(layers1, activation="relu", name="dense_1")(flat)
    x = layers.Dense(layers2, activation="relu", name="dense_2")(x)
    outputs = layers.Dense(2, activation="softmax", name="predictions")(x)
    model = keras.Model(inputs=inputs, outputs=outputs)

    optimizer = optimizer
    loss = loss
    mertics_key = mertics_key
    
    model.compile( loss=loss, optimizer=optimizer, metrics=[mertics_key])
    model.save(out_test)
    print("Saved the model")
    
def train_model(
               epochs: int,
               modelpath:InputPath("TF_Model"),
               train_dataset_path:InputPath("TF_DataSet"),
               validation_dataset_path:InputPath("TF_DataSet"),
               trained_h5_path:OutputPath("TF_Model")
    ):
    from tensorflow import keras
    import tensorflow as tf
    
    # Load data from the file path /KF components
    model = keras.models.load_model(modelpath)
    train_dataset = tf.data.experimental.load(train_dataset_path)
    val_dataset = tf.data.experimental.load(validation_dataset_path)

    history = model.fit(train_dataset, epochs=epochs, verbose=0, validation_data=val_dataset)
    model.save(trained_h5_path)
    print("Saved Model")

def model_evaluvate(
                    modelpath:InputPath("TF_Model"),
                    test_dataset_path:InputPath("TF_DataSet")
    )  -> NamedTuple('conf_m_result', [('mlpipeline_ui_metadata', 'UI_metadata'), ('mlpipeline_metrics', 'Metrics')]):
    
    from tensorflow import keras
    import tensorflow as tf
    from tensorflow.python.lib.io import file_io
    from sklearn.metrics import confusion_matrix
    import json
    import pandas as pd
    import os
    from collections import namedtuple
    
    # Load data from the file path /KF components
    model = keras.models.load_model(modelpath)
    test_dataset = tf.data.experimental.load(test_dataset_path)
    score = model.evaluate(test_dataset, verbose = 1) 
    print("Test loss:", score[0]) 
    print("Test accuracy:", score[1])
    accuracy = score[1]
    
    metrics_accuracy = {
        "metrics": [{
          "name": "accuracy-score", # The name of the metric. Visualized as the column name in the runs table.
          "numberValue":  accuracy, # The value of the metric. Must be a numeric value.
          "format": "PERCENTAGE",   # The optional format of the metric. Supported values are "RAW" (displayed in raw format) and "PERCENTAGE" (displayed in percentage format).
        }]
    }
  
    #lets get the confusion matrix
    y_pred = model.predict(test_dataset)
    print(y_pred.shape)
    for (f1, f2) in test_dataset :
        data = f1.numpy()
        true_output = f2.numpy()
    print(true_output.shape)  
    predicted_categories = tf.argmax(y_pred, axis=1)
    true_categories = tf.argmax(true_output, axis=1)

    vocab = ["0-No HeartAttack", "1-Possibilty HeartAttack"]
    cm = confusion_matrix(predicted_categories, true_categories)
    
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
            
    df_cm = pd.DataFrame(data, columns=['target', 'predicted', 'count'])
    df_cm = df_cm.to_csv(header=False, index=False)
    
    print(df_cm)
        
    
    metadata_confusion_matrix = {
        'outputs' : [{
          'type': 'confusion_matrix',
          'format': 'csv',
          'schema': [
            {'name': "0-No HeartAttack", 'type': 'CATEGORY'},
            {'name': "1-Possibilty HeartAttack", 'type': 'CATEGORY'},
            {'name': 'count', 'type': 'NUMBER'},
          ],
          'storage': 'inline',
          'source': df_cm,
          # Convert vocab to string because for bealean values we want "True|False" to match csv data.
          'labels': list(map(str, vocab)),
        }]
    }
    
    
    print("Confusion Matrix\n", cm)
    print(json.dumps(metadata_confusion_matrix))
    
    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata', 'mlpipeline_metrics'])
    return conf_m_result(json.dumps(metadata_confusion_matrix), json.dumps(metrics_accuracy))

In [135]:
from kfp.components import create_component_from_func
load_data_component = create_component_from_func(load_data, base_image="tensorflow/tensorflow:2.6.0", packages_to_install=["pandas==0.24","sklearn","numpy","pyarrow"])
process_data_component = create_component_from_func(process_data,base_image="tensorflow/tensorflow:2.6.0", packages_to_install=["pandas==0.24","sklearn","numpy","pyarrow"])
create_nn_model_component = create_component_from_func(create_nn_model,base_image="tensorflow/tensorflow:2.6.0")
train_model_component = create_component_from_func(train_model,base_image="tensorflow/tensorflow:2.6.0", packages_to_install=["pandas==0.24","sklearn","numpy","pyarrow"])
model_evaluvate_component = create_component_from_func(model_evaluvate,base_image="tensorflow/tensorflow:2.6.0",packages_to_install=["pandas==0.24","sklearn","numpy","pyarrow"])

In [142]:
import kfp.dsl as dsl
@dsl.pipeline(
  name="Get and Process Training Data",
  description="Get and Process Training data"
)
def getdata_and_process_pipeline(
        url:str="https://raw.githubusercontent.com/alexcpn/neuralnetwork_learn/main/data/heart-attack-prediction/heart.csv",
    
        optimizer:str="adam",
        loss:str="categorical_crossentropy",
        mertics_key:str="accuracy",
        layers1:int=64,
        layers2:int=64,
    
        batch_size:int=128,
        train_size:float=0.7,
        val_size:float=0.15,
        test_size:float=0.15,
    
        epochs:int=100
    ):
    
    pd_as_parquet = load_data_component(url=url).output
    
    model_path = create_nn_model_component(
      optimizer=optimizer,
      loss=loss,
      mertics_key=mertics_key,
      layers1=layers1,
      layers2=layers2
    ).output
    
    process_task = process_data_component(
      batch_size=batch_size,
      train_size=train_size,
      val_size=val_size,
      test_size=test_size,
      pandas_parquet=pd_as_parquet
    ) 
    
    train_model_path = train_model_component(
      epochs=epochs,
      modelpath=model_path, 
      train_dataset=process_task.outputs["out_train"], 
      validation_dataset=process_task.outputs["out_validate"]
    ).output
    
    model_evaluvate_component(
      modelpath=train_model_path, 
      test_dataset=process_task.outputs["out_test"]
    )

In [143]:
import kfp
client = kfp.Client()
experiment = client.create_experiment(
    name = "getdata and process pipeline",
    description = "demo pipeline"
)

In [144]:
client.create_run_from_pipeline_func(getdata_and_process_pipeline, arguments={}, experiment_name=experiment.name)

RunPipelineResult(run_id=6943513e-fe82-4bff-8fc9-ddeb1284fcf3)