In [None]:
! pip install kfp
!pip install google-cloud-pipeline-components
!pip install gcsfs
!pip install scikit-learn
!pip install fsspec

#  Set parameters, initialize aiplatform client library, and import needed libraries

In [None]:
# Set parameters
project_id = 'my-final-project-ise-543'
location = 'us-central1'

from google.cloud import aiplatform
aiplatform.init(project=project_id, location=location)

from kfp.v2.dsl import pipeline, component, component, InputPath, OutputPath, Dataset
import joblib
import gcsfs
import fsspec
import pandas as pd
import numpy as np

  from kfp.v2.dsl import pipeline, component, component, InputPath, OutputPath, Dataset


## Configure and test connections to training pipeline

### Set up paths to artifacts

In [None]:
features_path= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240502233246/impute-median-training_-7069055405124485120/features"
median= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240502233246/impute-median-training_-7069055405124485120/median"
scaler_path= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240502233246/impute-median-training_-7069055405124485120/scaler_path"
model= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240503004309/train-random-forest_8381106066523422720/trained_model_artifact/model.joblib"

### Test access to median value

In [None]:
df=pd.read_csv("gs://heart_prediction/757245801734/vs-heart-predictions-inference-pipeline-20240503055909/perform-predictions_-6807723481434685440/predictions_path")

In [None]:
df

Unnamed: 0,male,age,BPMeds,prevalentStroke,prevalentHyp,diabetes,sysBP,diaBP,glucose,a1c,TenYearCHD
0,1.155825,-0.187500,-0.176959,-0.079876,1.497834,-0.156893,0.259796,0.599806,-0.423779,-0.197943,0.0
1,1.155825,-0.996540,-0.176959,-0.079876,-0.667631,-0.156893,0.304873,0.432332,-0.743051,-0.432508,0.0
2,1.155825,0.505964,-0.176959,-0.079876,1.497834,-0.156893,0.665486,-0.739982,0.260376,0.269658,0.0
3,1.155825,-1.458849,-0.176959,-0.079876,1.497834,-0.156893,-0.348739,0.809148,0.077935,-0.025319,0.0
4,-0.865183,1.546158,-0.176959,-0.079876,1.497834,-0.156893,1.093714,0.809148,-0.104507,0.089513,0.0
...,...,...,...,...,...,...,...,...,...,...,...
409,-0.865183,-0.996540,-0.176959,-0.079876,1.497834,-0.156893,0.800716,0.516069,0.488428,0.819439,0.0
410,-0.865183,-1.458849,-0.176959,-0.079876,-0.667631,-0.156893,-1.408040,-1.158665,-0.378168,-0.473937,0.0
411,-0.865183,1.546158,-0.176959,-0.079876,1.497834,-0.156893,1.702249,1.855856,0.260376,0.209872,0.0
412,1.155825,-0.765386,-0.176959,-0.079876,-0.667631,-0.156893,-0.123355,0.432332,-0.286948,-0.001806,0.0


In [None]:
median=pd.read_csv(median)


In [None]:
median

Unnamed: 0,Column,Median
0,totChol,234.0
1,BMI,25.42
2,heartRate,75.0
3,a1c,4.126046
4,glucose,78.0


## retrive features

In [None]:
# Create a GCS file system object
fs = gcsfs.GCSFileSystem()

with fs.open(features_path, 'rb') as f:
    features = joblib.load(f)

features

['male',
 'age',
 'BPMeds',
 'prevalentStroke',
 'prevalentHyp',
 'diabetes',
 'sysBP',
 'diaBP',
 'glucose',
 'a1c']

# retrive scalar

In [None]:
# Create a GCS file system object
fs = gcsfs.GCSFileSystem()

with fs.open(scaler_path, 'rb') as f:
    scaler = joblib.load(f)

scaler

https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


## retrive model

In [None]:
# Create a GCS file system object
fs = gcsfs.GCSFileSystem()

with fs.open(model, 'rb') as f:
   model = joblib.load(f)

model

#Define components

## Common dataset preparation steps

In [None]:
@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def perform_initial_data_preparation(input_dataset_path: str, output_dataset_path: OutputPath(Dataset)):
    import pandas as pd
    import numpy as np

    df = pd.read_csv(input_dataset_path)

    # Filling all the Nan value of cigsPerDay with zero for the rows with current Smoker=0
    df.loc[df['currentSmoker']==0,['cigsPerDay']]=df.loc[df['currentSmoker']==0,['cigsPerDay']].fillna(0)



    # create a new label of 0 for all the NA values in education
    df['education']=df['education'].fillna(0)

    # Clip the column to remove outliers
    clipped_column = df['totChol'].clip( upper=500)

    # Replace the original column with the clipped column
    df['totChol']=clipped_column

    #applying log Tranfomation
    col=['glucose','income','a1c']

    for col in col:
      df[col]=np.log(df[col]+1)


    df.to_csv(output_dataset_path, index=False)



  return component_factory.create_component_from_func(


## Imputing values



In [None]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "scipy", "joblib","fsspec", "gcsfs"])
def impute_median_validation(
        validation_dataset_path: InputPath('Dataset'),
        median_path: str,  # medians from training
        scaler_path: str,  # scaler from training
        imputed_validation_dataset_path: OutputPath('Dataset'),
        FS_dataset_path: str):


    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import StandardScaler
    import joblib
    import gcsfs

    # Load the validation dataset
    df = pd.read_csv(validation_dataset_path)

    df['cigsPerDay']=df['cigsPerDay'].fillna(20)
    df['BPMeds']=df['BPMeds'].fillna(1)

    # Load the median values from the training dataset
    median_df=pd.read_csv(median_path)


    # Fill in the missing values with the median values
    # Iterate over columns in the test dataset
    for column in median_df['Column']:
        # Retrieve the median value for the current column
        median_value = median_df.loc[median_df['Column'] == column, 'Median'].values[0]
        # Fill missing values in the test dataset with the median value
        df[column] = df[column].fillna(median_value)

    # Drop the remaining missing values
    df.dropna(inplace=True)
# Load the list of selected feature names from the training dataset
    # Create a GCS file system object
    fs = gcsfs.GCSFileSystem()

    with fs.open(FS_dataset_path, 'rb') as f:
        selected_features_names = joblib.load(f)
    #scale_feature=selected_features_names
    #selected_features_names.append('patientID')
    # Select the same features in the test dataset as selected in the training dataset
    selected_test_df=df

    # Drop the remaining missing values
    selected_test_df.dropna(inplace=True)
    # Load the scaler
    # Create a GCS file system object
    fs = gcsfs.GCSFileSystem()

    with fs.open(scaler_path, 'rb') as f:
        scaler = joblib.load(f)

    X_test=selected_test_df

    X_test_scaled_array=scaler.transform(X_test)

    X_test_scaled=pd.DataFrame(X_test_scaled_array, columns=X_test.columns)

    selected_test_df.reset_index(drop=True, inplace=True)
    df.reset_index(drop=True, inplace=True)

    selected_test_df=pd.concat([X_test_scaled[selected_features_names],df['patientID']],axis=1)




    # Save the imputed dataframe to the output path
    selected_test_df.to_csv(imputed_validation_dataset_path, index=False)


  return component_factory.create_component_from_func(


## Predicitons

In [None]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "joblib", "fsspec", "gcsfs"])
def perform_predictions(dataset_for_prediction_path: InputPath('Dataset'),
                   model_path: str,
                   predictions_path: OutputPath('Dataset')):

    import pandas as pd
    import joblib
    import gcsfs

    # Create a GCS file system object
    fs = gcsfs.GCSFileSystem()

    # Load the trained model
    with fs.open(model_path, 'rb') as f:
      trained_model = joblib.load(f)

    # Load the test dataset
    pred_df = pd.read_csv(dataset_for_prediction_path)

    final_df=pred_df

    # Drop the patientID column
    pred_df1=pred_df.iloc[:,0:10]


    # Make predictions
    y_pred = trained_model.predict(pred_df1)
    final_df['TenYearCHD'] = y_pred
    final_df=final_df[['patientID','TenYearCHD']]

    # Save the predictions
    final_df.to_csv(predictions_path, index=False)

# Define Pipeline

In [None]:
features_path= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240507023953/impute-median-training_-2540672785663918080/features"
median= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240507023953/impute-median-training_-2540672785663918080/median"
scaler_path= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240507023953/impute-median-training_-2540672785663918080/scaler_path"
#logistic model= "gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240504043722/train-logistic-regression_-3582196170191011840/trained_model_artifact/model.joblib"
#knn model="gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240504043722/train-knn_5641175866663763968/trained_model_artifact/model.joblib"
#NB model="gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240507023953/train-naive-bayes_4809201806204731392/trained_model_artifact/model.joblib"
model="gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240507023953/train-random-forest_-4414170230650044416/trained_model_artifact/model.joblib"
# voting model="gs://heart_prediction/757245801734/heart-disease-prediction-pipeline-20240503004309/train-random-forest_8381106066523422720/trained_model_artifact/model.joblib"
@pipeline(name='vs-heart_predictions-inference-pipeline')
def heart_disease_prediction_pipeline(dataset_for_predictions_path: str,
                               features_uri: str = features_path,
                               median_uri:  str = median,
                               scaler_uri:  str = scaler_path,
                               model_uri: str = model):

    # Process dataset - initial data preparation
    initial_prepared_dataset = perform_initial_data_preparation(input_dataset_path=dataset_for_predictions_path)

    # Impute
    imputed_dataset = impute_median_validation(
       validation_dataset_path=initial_prepared_dataset.outputs['output_dataset_path'],
       median_path= median_uri,
       FS_dataset_path=features_uri,
       scaler_path=scaler_uri

    )

    perform_predictions(
        dataset_for_prediction_path=imputed_dataset.outputs['imputed_validation_dataset_path'],model_path=model_uri,predictions_path="gs://heart_prediction/m_pred.csv")


TypeError: perform-predictions() got an unexpected keyword argument "predictions_path".

In [None]:

from kfp.v2 import compiler

# Compile the pipeline

compiler.Compiler().compile(
    pipeline_func=heart_disease_prediction_pipeline,
    package_path="heart_disease_prediction_pipeline.json"
)

pipeline_job = aiplatform.PipelineJob(
    display_name='heart_disease_prediction',
    template_path='heart_disease_prediction_pipeline.json',
    pipeline_root='gs://heart_prediction',
    parameter_values={
      'dataset_for_predictions_path':'gs://heart_prediction/Final Project Evaluation Dataset - Student(1) (1).csv'
    },
    enable_caching=True
)

pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/757245801734/locations/us-central1/pipelineJobs/vs-heart-predictions-inference-pipeline-20240507033322
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/757245801734/locations/us-central1/pipelineJobs/vs-heart-predictions-inference-pipeline-20240507033322')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/vs-heart-predictions-inference-pipeline-20240507033322?project=757245801734
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/757245801734/locations/us-central1/pipelineJobs/vs-heart-predictions-inference-pipeline-20240507033322 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aip