In [32]:
!pip install pyOpenSSL==22.0.0 --quiet
!pip install kfp-server-api==1.8.2 --quiet
!pip install kfp==1.8.12 --quiet
!pip install kserve==0.10.0 --quiet
!pip install lightgbm

[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
asgiref 3.8.1 requires typing-extensions>=4; python_version < "3.11", but you have typing-extensions 3.10.0.2 which is incompatible.
azure-core 1.30.1 requires typing-extensions>=4.6.0, but you have typing-extensions 3.10.0.2 which is incompatible.
kfserving 0.5.1 requires azure-storage-blob<=2.1.0,>=1.3.0, but you have azure-storage-blob 12.9.0 which is incompatible.[0m[31m
[0m[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
kfp 1.8.12 requires typing-extensions<4,>=3.7.4; python_version < "3.9", but you have typing-extensions 4.11.0 which is incompatible.
kfserving 0.5.1 requires azure-storage-blob<=2.1.0,>=1.3.0, but you have azure-storage-blob 12.9.0 which is incompatible.

In [8]:
from minio import Minio

minio_client = Minio(
        "minio-service.kubeflow:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
minio_bucket = "mlpipeline"

if minio_client.bucket_exists(minio_bucket):
    print("my-bucket exists")
    minio_client.fput_object(minio_bucket,"heart-disease/data/heart.csv","heart.csv")
    minio_client.fput_object(minio_bucket,"heart-disease/data/heart_cleveland_upload.csv","heart_cleveland_upload.csv")
    minio_client.fput_object(minio_bucket,"heart-disease/data/concat_data_processed_heart.csv","concat_data_processed_heart.csv")
    print("Put object")
else:
    print("my-bucket does not exist")

my-bucket exists
Put object


In [7]:
import kfp
from kfp import dsl
import kfp.components as components
from typing import NamedTuple

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV
import lightgbm as lgb
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.model_selection import train_test_split
from sklearn import svm

import time
import pickle

In [9]:
def get_data():
    """
    Function to get dataset and load it to minio bucket
    """
    print("getting data")
    from minio import Minio
    import numpy as np
    import pandas as pd

    minio_client = Minio(
        "minio-service.kubeflow:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    minio_client.fget_object(minio_bucket,"heart-disease/data/heart.csv","/tmp/heart.csv")
    minio_client.fget_object(minio_bucket,"heart-disease/data/heart_cleveland_upload.csv","/tmp/heart_cleveland_upload.csv")
    
    def load_data(data, drop_columns=False, rename_columns=False):
        df = pd.read_csv(data)
        df = df.drop_duplicates(keep='first')
        if drop_columns:
            df.drop(drop_columns, axis=1, inplace=True)
        if rename_columns:
            df.rename(columns=rename_columns, inplace=True)
        return df
    
    df1 = load_data('/tmp/heart.csv', drop_columns=['trestbps', 'chol', 'fbs'])
    df2 = load_data('/tmp/heart_cleveland_upload.csv', drop_columns=['trestbps', 'chol', 'fbs'], rename_columns={'condition':'target'})
    
    df_concat = pd.concat([df1, df2], axis=0, ignore_index=True)
    df_concat.to_csv('/tmp/concat_data_processed_heart.csv', index=False)
    minio_client.fput_object(minio_bucket,"heart-disease/data/concat_data_processed_heart.csv","/tmp/concat_data_processed_heart.csv")
    
    print(f"Data shape: {df_concat.shape}")

def model_building(n_loops:int = 10, methods:str = "SVM"):
    """
    Build the model with Keras API
    Export model parameters
    """
    from minio import Minio
    import numpy as np
    import pandas as pd
    from sklearn.model_selection import KFold, StratifiedKFold, GridSearchCV
    import lightgbm as lgb
    from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
    from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
    from sklearn.model_selection import train_test_split
    from sklearn import svm
    from joblib import dump
    import time
    
    minio_client = Minio(
        "minio-service.kubeflow:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    def run_ML(X, y, n_loops, method):
        ml = []; values = []; metrics = []; data = []
        acc = 0; best_model = [None, None, None]; idx = [None, None]
        for i in range(n_loops):
            #print('     Run in loops', i, '...')
            cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=i)      # StratifiedKFold
            for train_idx, test_idx in (cv.split(X, y)):
                X_train = X.iloc[train_idx, :]
                X_test = X.iloc[test_idx, :]
                y_train = y.iloc[train_idx]
                y_test = y.iloc[test_idx]

                # SVM
                if method == 'SVM':
                    clf = svm.SVC(kernel='linear').fit(X_train, y_train)

                # RandomForest
                elif method == 'RandomForest':
                    clf = RandomForestClassifier().fit(X_train, y_train)

                # AdaBoost
                elif method == 'Adaboost':
                    clf = AdaBoostClassifier().fit(X_train, y_train)

                # LightGBM
                elif method == 'LightGBM':
                    clf = lgb.LGBMClassifier().fit(X_train, y_train)

                else:
                    print(f'Method {method} is not supported!')
                    continue

                # predict
                for x in ['train', 'test']:
                    if x == 'train':
                        X_check = X_train
                        y_check = y_train
                    else:
                        X_check = X_test
                        y_check = y_test

                    y_predict = clf.predict(X_check)

                    acc_check = accuracy_score(y_check, y_predict)
                    precision_check = precision_score(y_check, y_predict)
                    recall_check = recall_score(y_check, y_predict)
                    f1_check = f1_score(y_check, y_predict)

                    ml.append(method); values.append(acc_check); metrics.append('acc'); data.append(x)
                    ml.append(method); values.append(precision_check); metrics.append('precision'); data.append(x)
                    ml.append(method); values.append(recall_check); metrics.append('recall'); data.append(x)
                    ml.append(method); values.append(f1_check); metrics.append('f1_score'); data.append(x)

                    if x == 'test' and acc_check > acc:
                        acc = acc_check
                        best_model[0] = method; best_model[1] = clf
                        best_model[2] = {'acc':acc_check, 'precision':precision_check, 'recall':recall_check, 'f1_score':f1_check}
                        print(acc)
                        idx[0] = train_idx; idx[1] = test_idx

        return best_model, ml, values, metrics, data, idx
    
    def run_method(X, y, n_loops, methods=methods):
        summary = pd.DataFrame([], columns=['ML', 'values', 'metrics', 'data'])
        best_model = []; idxs = []
        t = time.time()
        print('Run in method:', methods)
        model, ml, values, metrics, data, idx = run_ML(X, y, n_loops, methods)
        best_model.append(model); idxs.append(idx)
        temp = pd.DataFrame(np.array([ml, values, metrics, data]).T,
                            columns=['ML', 'values', 'metrics', 'data'])
        summary = pd.concat([summary, temp], axis=0)
        print('Running time:', np.round(time.time() - t, 0))
        print('-------------------------\n')

        return best_model, idxs, summary
    
    minio_client.fget_object(minio_bucket,"heart-disease/data/concat_data_processed_heart.csv","/tmp/concat_data_processed_heart.csv")
    df_concat = pd.read_csv("/tmp/concat_data_processed_heart.csv")
    best_model, idxs, summary = run_method(X=df_concat.iloc[:, :-1], y=df_concat.iloc[:, -1], n_loops=n_loops, methods=methods)
    
    ### Save model to minIO
    import pickle
    for model in best_model:
        #dump(model[1], f"/tmp/heart-disease/{model[0]}/heart-disease.joblib"
        pickle.dump(model[1], open(f"/tmp/heart-disease.pickle", 'wb'))
             
    from minio import Minio
    import os

    minio_client = Minio(
            "minio-service.kubeflow:9000",
            access_key="minio",
            secret_key="minio123",
            secure=False
        )
    minio_bucket = "mlpipeline"

    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/") # Replace \ with / on Windows
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")  # Replace \ with / on Windows
                minio_client.fput_object(bucket_name, remote_path, local_file)

    #upload_local_directory_to_minio("/tmp/heart-disease",minio_bucket,"models/heart-disease/")
    minio_client.fput_object(minio_bucket,"models/heart-disease/heart-disease.pickle","/tmp/heart-disease.pickle")

    
    print("Saved models to minIO")

def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1SKLearnSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    name='heart-disease'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       sklearn=(V1beta1SKLearnSpec(
                                           storage_uri="s3://mlpipeline/models/heart-disease/"))))
    )

    KServe = KServeClient()
    KServe.create(isvc)

In [10]:
import kfp
from kfp import dsl
import kfp.components as components

comp_get_data = components.create_component_from_func(get_data,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0")
comp_model_building = components.create_component_from_func(model_building,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
                                                           packages_to_install=['lightgbm'])
comp_model_serving = components.create_component_from_func(model_serving,base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
                                                           packages_to_install=['kserve==0.11.2'])

@dsl.pipeline(
    name='heart-disease',
    description='Detect digits'
)
def output_test(n_loops,methods):
    step1 = comp_get_data()
    
    step2 = comp_model_building(n_loops,methods)
    step2.after(step1)
    
    step3 = comp_model_serving()
    step3.after(step2)

In [11]:
if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "n_loops" : 10,
        "methods": "SVM"
    }

    run_directly = 0
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="test")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=output_test,package_path='output_test.yaml')
        client.upload_pipeline(pipeline_package_path='output_test.yaml',pipeline_name="heart_disease_test_1")

ERROR:root:Failed to read a token from file '/var/run/secrets/kubeflow/pipelines/token' ([Errno 2] No such file or directory: '/var/run/secrets/kubeflow/pipelines/token').


In [12]:
from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1TFServingSpec
from datetime import datetime

In [13]:
namespace = utils.get_default_target_namespace()

In [36]:
from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
from datetime import datetime

namespace = utils.get_default_target_namespace()

name='heart-disease'
kserve_version='v1beta1'
api_version = constants.KSERVE_GROUP + '/' + kserve_version

isvc = V1beta1InferenceService(api_version=api_version,
                               kind=constants.KSERVE_KIND,
                               metadata=client.V1ObjectMeta(
                                   name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                               spec=V1beta1InferenceServiceSpec(
                               predictor=V1beta1PredictorSpec(
                                   service_account_name="sa-minio-kserve",
                                   sklearn=(V1beta1SKLearnSpec(
                                       storage_uri="s3://mlpipeline/models/heart-disease/"))))
)

KServe = KServeClient()
KServe.create(isvc)

{'apiVersion': 'serving.kserve.io/v1beta1',
 'kind': 'InferenceService',
 'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'},
  'creationTimestamp': '2024-04-30T19:47:07Z',
  'generation': 1,
  'managedFields': [{'apiVersion': 'serving.kserve.io/v1beta1',
    'fieldsType': 'FieldsV1',
    'fieldsV1': {'f:metadata': {'f:annotations': {'.': {},
       'f:sidecar.istio.io/inject': {}}},
     'f:spec': {'.': {},
      'f:predictor': {'.': {},
       'f:serviceAccountName': {},
       'f:sklearn': {'.': {}, 'f:name': {}, 'f:storageUri': {}}}}},
    'manager': 'OpenAPI-Generator',
    'operation': 'Update',
    'time': '2024-04-30T19:47:07Z'}],
  'name': 'heart-disease',
  'namespace': 'kubeflow-user-example-com',
  'resourceVersion': '26522628',
  'uid': '52e42844-4e53-48b7-afa1-87d2e144834c'},
 'spec': {'predictor': {'model': {'modelFormat': {'name': 'sklearn'},
    'name': '',
    'resources': {},
    'storageUri': 's3://mlpipeline/models/heart-disease/'},
   'serviceAccount

In [15]:
KServe = KServeClient()

In [None]:
KServe.create(isvc)

In [None]:
import requests
import base64
import librosa
import io

def send_data_for_prediction(audio_file_path, url):

    with open(audio_file_path, 'rb') as file:
        audio_content = file.read()

        base64_data = base64.b64encode(audio_content).decode('utf-8')

        inference_input = {
        'instances': [{'audio_content': base64_data}]
        }

        response = requests.post(url, json=inference_input)

    if response.status_code == 200:
        return response.json()
    else:
        print(response.json())
        print("Error:", response.status_code)
        print(response)
        return None

In [19]:
from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
import numpy as np
import pandas as pd
import requests
import json

In [28]:
x_number_five = np.array([[52,1,0,1,168,0,1.0,2,2,3]])
KServe = KServeClient()

namespace = utils.get_default_target_namespace()
isvc_resp = KServe.get("heart-disease", namespace=namespace)
isvc_url = isvc_resp['status']['address']['url']
print(isvc_url)

t = x_number_five
print(t)

inference_input = {
  'instances': t.tolist()
}


response = requests.post("http://heart-disease.kubeflow-user-example-com.svc.cluster.local/v1/models/heart-disease:predict", json=inference_input)
print(response)
r = json.loads(response.text)
print(r)
# print("Predicted: {}".format(r))

http://heart-disease.kubeflow-user-example-com.svc.cluster.local
[[ 52.   1.   0.   1. 168.   0.   1.   2.   2.   3.]]
<Response [200]>
{'predictions': [1]}


In [17]:
from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
from datetime import datetime

namespace = utils.get_default_target_namespace()

name='heart-disease'
kserve_version='v1beta1'
api_version = constants.KSERVE_GROUP + '/' + kserve_version

isvc = V1beta1InferenceService(api_version=api_version,
                               kind=constants.KSERVE_KIND,
                               metadata=client.V1ObjectMeta(
                                   name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                               spec=V1beta1InferenceServiceSpec(
                               predictor=V1beta1PredictorSpec(
                                   service_account_name="sa-minio-kserve",
                                   sklearn=(V1beta1SKLearnSpec(
                                       storage_uri="s3://mlpipeline/models/heart-disease/"))))
)
print(isvc)

{'api_version': 'serving.kserve.io/v1beta1',
 'kind': 'InferenceService',
 'metadata': {'annotations': {'sidecar.istio.io/inject': 'false'},
              'creation_timestamp': None,
              'deletion_grace_period_seconds': None,
              'deletion_timestamp': None,
              'finalizers': None,
              'generate_name': None,
              'generation': None,
              'labels': None,
              'managed_fields': None,
              'name': 'heart-disease',
              'namespace': 'kubeflow-user-example-com',
              'owner_references': None,
              'resource_version': None,
              'self_link': None,
              'uid': None},
 'spec': {'explainer': None,
          'predictor': {'active_deadline_seconds': None,
                        'affinity': None,
                        'automount_service_account_token': None,
                        'batcher': None,
                        'canary_traffic_percent': None,
                       