In [1]:
!pip install "cython<3.0.0" wheel
!pip install "pyyaml==5.4.1" --no-build-isolation
!pip install kfp==1.8.18
!pip install urllib3==1.26.15

Collecting cython<3.0.0
  Downloading Cython-0.29.37-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl.metadata (3.1 kB)
Downloading Cython-0.29.37-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl (1.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.9/1.9 MB[0m [31m17.9 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hInstalling collected packages: cython
Successfully installed cython-0.29.37
[0mCollecting pyyaml==5.4.1
  Downloading PyYAML-5.4.1.tar.gz (175 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m175.1/175.1 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hBuilding wheels for collected packages: pyyaml
  Building wheel for pyyaml (pyproject.toml) ... [?25ldone
[?25h  Created wheel for pyyaml: filename=PyYAML-5.4.1-cp310-cp310-linux_x86_64.whl size=45658 sha256=5ab5dd4f0493d73b4998c57ac335a

In [2]:
import kfp
import kfp.components as comp
import requests
import kfp.dsl as dsl

# Functions to run

In [3]:
def prepare_data():    
    import pandas as pd
    print("---- Inside prepare_data component ----")
    # Load dataset
    df = pd.read_csv("https://raw.githubusercontent.com/TripathiAshutosh/dataset/main/iris.csv")
    df = df.dropna()
    df.to_csv(f'final_df.csv', index=False)

In [4]:
def train_test_split():
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import LabelEncoder
    
    print("---- Inside train_test_split component ----")
    final_data = pd.read_csv(f'final_df.csv')
    target_column = 'class'
    X = final_data.loc[:, final_data.columns != target_column]
    y = final_data.loc[:, final_data.columns == target_column]
    y = LabelEncoder().fit_transform(y)
    
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3,stratify = y, random_state=47)
    
    np.save(f'X_train.npy', X_train)
    np.save(f'X_test.npy', X_test)
    np.save(f'y_train.npy', y_train)
    np.save(f'y_test.npy', y_test)
    
    print("\n---- X_train ----")
    print("\n")
    print(X_train)
    
    print("\n---- X_test ----")
    print("\n")
    print(X_test)
    
    print("\n---- y_train ----")
    print("\n")
    print(y_train)
    
    print("\n---- y_test ----")
    print("\n")
    print(y_test)

In [5]:
def training_basic_classifier():
    import pandas as pd
    import numpy as np
    import tensorflow as tf
    
    print("---- Inside training_basic_classifier component ----")
    print("Using GPU: ",tf.config.list_physical_devices("GPU").__len__() > 0)
    print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
    print("GPU list: ", tf.config.list_physical_devices('GPU'))
    
    X_train = np.load(f'X_train.npy',allow_pickle=True)
    y_train = np.load(f'y_train.npy',allow_pickle=True)
    
    # using test set as val set just for demonstration
    X_val = np.load(f'X_test.npy',allow_pickle=True)
    y_val = np.load(f'y_test.npy',allow_pickle=True)
    
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(4,)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(3)
    ])

    model.compile(optimizer='adam',
              loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
              metrics=['accuracy'])

    model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
        filepath='checkpoint',
        monitor='val_accuracy',
        mode='max',
        save_best_only=True)

    model.fit(X_train, y_train, 
              validation_data=(X_val, y_val), 
              callbacks=[model_checkpoint_callback],
              epochs=10)

In [6]:
def predict_on_test_data():
    import pandas as pd
    import numpy as np
    import pickle
    import tensorflow as tf
    
    print("---- Inside predict_on_test_data component ----")
    print("Using GPU: ",tf.config.list_physical_devices("GPU").__len__() > 0)
    print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))
    print("GPU list: ", tf.config.list_physical_devices('GPU'))
    
    loaded_model = tf.keras.models.load_model('checkpoint')
    
    X_test = np.load(f'X_test.npy',allow_pickle=True)
    y_pred = loaded_model.predict(X_test)
    y_pred = np.argmax(y_pred, axis=1)
    np.save(f'y_pred.npy', y_pred)
    
    print("\n---- Predicted classes ----")
    print("\n")
    print(y_pred)
    

In [7]:
def get_metrics():
    import pandas as pd
    import numpy as np
    from sklearn.metrics import accuracy_score,precision_score,recall_score
    from sklearn import metrics
    print("---- Inside get_metrics component ----")
    y_test = np.load(f'y_test.npy',allow_pickle=True)
    y_pred = np.load(f'y_pred.npy',allow_pickle=True)
    
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred,average='micro')
    recall = recall_score(y_test, y_pred,average='micro')
    
    y_test = np.load(f'y_test.npy',allow_pickle=True)
    y_pred = np.load(f'y_pred.npy',allow_pickle=True)
    print(metrics.classification_report(y_test, y_pred))
    
    print("\n Model Metrics:", {'accuracy': round(acc, 2), 'precision': round(prec, 2), 'recall': round(recall, 2)})

# Create Steps

In [8]:
create_step_prepare_data = kfp.components.create_component_from_func(
    func=prepare_data,
    base_image='nvcr.io/nvidia/tensorflow:24.01-tf2-py3',
    packages_to_install=[]
)

In [9]:
create_step_train_test_split = kfp.components.create_component_from_func(
    func=train_test_split,
    base_image='nvcr.io/nvidia/tensorflow:24.01-tf2-py3',
    packages_to_install=[]
)

In [10]:
create_step_training_basic_classifier = kfp.components.create_component_from_func(
    func=training_basic_classifier,
    base_image='nvcr.io/nvidia/tensorflow:25.02-tf2-py3',
    packages_to_install=[]
)

In [11]:
create_step_predict_on_test_data = kfp.components.create_component_from_func(
    func=predict_on_test_data,
    base_image='nvcr.io/nvidia/tensorflow:24.01-tf2-py3',
    packages_to_install=[]
)

In [12]:
create_step_get_metrics = kfp.components.create_component_from_func(
    func=get_metrics,
    base_image='nvcr.io/nvidia/tensorflow:24.01-tf2-py3',
    packages_to_install=[]
)

# Create Pipeline

In [13]:
# Define the pipeline
@dsl.pipeline(
   name='IRIS classifier Kubeflow Demo Pipeline',
   description='A sample pipeline that performs IRIS classifier task'
)
# Define parameters to be fed into pipeline
def iris_classifier_pipeline():
    volume_name = 'mlops-demo-workspace'
    data_path = '/workspace'
    
    prepare_data_task = create_step_prepare_data()
    prepare_data_task.add_pvolumes({data_path: dsl.PipelineVolume(pvc=volume_name)})
    prepare_data_task.set_memory_limit('64G')
    prepare_data_task.set_cpu_limit('8.0')
    prepare_data_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

    train_test_split = create_step_train_test_split().after(prepare_data_task)
    train_test_split.add_pvolumes({data_path: dsl.PipelineVolume(pvc=volume_name)})
    train_test_split.set_memory_limit('64G')
    train_test_split.set_cpu_limit('8.0')
    train_test_split.execution_options.caching_strategy.max_cache_staleness = "P0D"

    classifier_training = create_step_training_basic_classifier().after(train_test_split)
    classifier_training.add_pvolumes({data_path: dsl.PipelineVolume(pvc=volume_name)})
    classifier_training.set_memory_limit('64G')
    classifier_training.set_cpu_limit('8.0')
    classifier_training.set_gpu_limit(1)
    classifier_training.add_node_selector_constraint(
        label_name='nvidia.com/gpu.product',
        value='NVIDIA-L40S',
    )
    classifier_training.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    log_predicted_class = create_step_predict_on_test_data().after(classifier_training)
    log_predicted_class.add_pvolumes({data_path: dsl.PipelineVolume(pvc=volume_name)})
    log_predicted_class.set_memory_limit('64G')
    log_predicted_class.set_cpu_limit('8.0')
    log_predicted_class.set_gpu_limit(1)
    log_predicted_class.add_node_selector_constraint(
        label_name='nvidia.com/gpu.product',
        value='NVIDIA-L40S',
    )
    log_predicted_class.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    log_metrics_task = create_step_get_metrics().after(log_predicted_class)
    log_metrics_task.add_pvolumes({data_path: dsl.PipelineVolume(pvc=volume_name)})
    log_metrics_task.set_memory_limit('64G')
    log_metrics_task.set_cpu_limit('8.0')
    log_metrics_task.execution_options.caching_strategy.max_cache_staleness = "P0D"

In [14]:
kfp.compiler.Compiler().compile(
    pipeline_func=iris_classifier_pipeline,
    package_path='pipeline.yaml')