In [None]:
!pip install kfp
!pip install google-cloud-pipeline-components
!pip install gcsfs
!pip install scikit-learn

Collecting kfp
  Downloading kfp-2.7.0.tar.gz (441 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m441.8/441.8 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kfp-pipeline-spec==0.3.0 (from kfp)
  Downloading kfp_pipeline_spec-0.3.0-py3-none-any.whl (12 kB)
Collecting kfp-server-api<2.1.0,>=2.0.0 (from kfp)
  Downloading kfp-server-api-2.0.5.tar.gz (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.4/63.4 kB[0m [31m8.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting kubernetes<27,>=8.0.0 (from kfp)
  Downloading kubernetes-26.1.0-py2.py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m15.0 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting protobuf<5,>=4.21.1 (from kfp)
  Downloading protobuf-4.25.3-cp37-abi3-manylinux2014_x86_64.whl (294 kB)
[2K     [90m━━━━━━━━━━━━━

Collecting google-cloud-pipeline-components
  Downloading google_cloud_pipeline_components-2.14.0-py3-none-any.whl (1.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.4/1.4 MB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[31mERROR: Operation cancelled by user[0m[31m


In [29]:
from kfp.v2.dsl import pipeline, component, InputPath, OutputPath, Dataset, Output, Model, Metrics
from kfp.v2 import compiler
from google.cloud import aiplatform

# Set parameters
project_id = 'ise543-final-project-421906'
location = 'us-central1'
aiplatform.init(project=project_id, location=location)



In [30]:
from kfp.v2.dsl import Artifact
from kfp.v2.dsl import Input
from kfp.v2.dsl import Model

@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def impute_multiple_features_training(training_dataset_path: str,
                                      imputed_dataset_path: OutputPath('Dataset'),
                                      feature_medians: Output[Artifact]):
    import pandas as pd

    df = pd.read_csv(training_dataset_path)
    features_to_impute = ['cigsPerDay', 'totChol', 'glucose', 'BMI', 'heartRate']
    medians = {}
    for feature in features_to_impute:
        median_value = df[feature].median()
        df[feature].fillna(median_value, inplace=True)
        medians[feature] = median_value

    categorical_features_to_impute = ['education', 'BPMeds']
    modes = {}
    for feature in categorical_features_to_impute:
        mode_value = df[feature].mode()[0]
        df[feature].fillna(mode_value, inplace=True)
        modes[feature] = mode_value

    feature_medians.metadata['medians'] = medians
    feature_medians.metadata['modes'] = modes

    df.to_csv(imputed_dataset_path, index=False)


  return component_factory.create_component_from_func(


In [31]:
@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def impute_multiple_features_validation(validation_dataset_path: str,
                                        imputed_dataset_path: OutputPath('Dataset'),
                                        feature_medians: Input[Artifact]):
    import pandas as pd

    df = pd.read_csv(validation_dataset_path)
    medians = feature_medians.metadata['medians']
    modes = feature_medians.metadata['modes']

    for feature, median_value in medians.items():
        df[feature].fillna(median_value, inplace=True)

    for feature, mode_value in modes.items():
        df[feature].fillna(mode_value, inplace=True)

    df.to_csv(imputed_dataset_path, index=False)

In [32]:
@component(packages_to_install=["pandas", "numpy", "fsspec", "gcsfs"])
def perform_initial_data_preparation(input_dataset_path: InputPath('Dataset'),
                                     output_dataset_path: OutputPath(Dataset)):
    import pandas as pd
    import numpy as np

    df = pd.read_csv(input_dataset_path)
    df = df.drop(columns=["a1c"])
    df["totChol"] = df["totChol"].clip(upper=700)
    df["BMI"] = df["BMI"].clip(upper=50)
    df["totChol"] = np.log(df["totChol"]+1)
    df["income"] = np.log(df["income"]+1)
    df.loc[df['currentSmoker'] == 0, 'cigsPerDay'] = 0
    df = pd.get_dummies(df, drop_first=True)
    df.to_csv(output_dataset_path, index=False)

In [33]:
@component(packages_to_install=["pandas", "numpy", "scikit-learn", "imbalanced-learn==0.11.0"])
def perform_SMOTE(input_df_path:  InputPath('Dataset'),
                  output_df_path: OutputPath('Dataset')):
    import pandas as pd
    import numpy as np
    from imblearn.over_sampling import SMOTE

    # Load the input dataset
    df = pd.read_csv(input_df_path)

    X = df.drop('TenYearCHD', axis = 1)
    y = df['TenYearCHD']

    # Perform SMOTE oversampling
    smote = SMOTE()
    X_smote, y_smote = smote.fit_resample(X, y)

    # Convert the oversampled feature set and target vector back into a DataFrame
    X_smote_df = pd.DataFrame(X_smote, columns=X.columns)
    y_smote_df = pd.DataFrame(y_smote, columns=['TenYearCHD'])

    # Re-join the features and the target into a single DataFrame
    oversampled_df = pd.concat([X_smote_df, y_smote_df], axis=1)

    # Save the re-joined, oversampled dataset to the specified OutputPath
    oversampled_df.to_csv(output_df_path, index=False)

In [34]:
@component(packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_logistic_regression(training_dataset_path: InputPath('Dataset'),
                              trained_model_artifact: Output[Model]):
    import pandas as pd
    from sklearn.linear_model import LogisticRegression
    import joblib
    import os

    train_df = pd.read_csv(training_dataset_path)
    X_train = train_df.drop(['TenYearCHD', 'patientID'], axis=1)
    y_train = train_df['TenYearCHD']

    trained_model = LogisticRegression(max_iter=1000)
    trained_model.fit(X_train, y_train)

    os.makedirs(trained_model_artifact.path, exist_ok=True)
    joblib.dump(trained_model, os.path.join(trained_model_artifact.path, "model.joblib"))

In [35]:
@component(packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_random_forest(training_dataset_path: InputPath('Dataset'),
                        trained_model_artifact: OutputPath(Model)):
    import pandas as pd
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    import os

    train_df = pd.read_csv(training_dataset_path)
    X_train = train_df.drop(['TenYearCHD', 'patientID'], axis=1)
    y_train = train_df['TenYearCHD']

    trained_model = RandomForestClassifier(n_estimators=500, max_depth=4, random_state=42)
    trained_model.fit(X_train, y_train)

    model_path = os.path.join(trained_model_artifact, "model.joblib")
    os.makedirs(os.path.dirname(model_path), exist_ok=True)
    joblib.dump(trained_model, model_path)


In [36]:
@component(packages_to_install=["pandas", "scikit-learn", "joblib"])
def train_gradient_boosting(training_dataset_path: InputPath('Dataset'),
                            trained_model_artifact: Output[Model]):
    import pandas as pd
    from sklearn.ensemble import GradientBoostingClassifier
    import joblib
    import os


    train_df = pd.read_csv(training_dataset_path)
    X_train = train_df.drop(['TenYearCHD', 'patientID'], axis=1)
    y_train = train_df['TenYearCHD']


    trained_model = GradientBoostingClassifier(n_estimators=500, learning_rate=0.05, max_features = 5,
                                               max_depth=4, random_state=42)
    trained_model.fit(X_train, y_train)


    os.makedirs(trained_model_artifact.path, exist_ok=True)
    joblib.dump(trained_model, os.path.join(trained_model_artifact.path, "model.joblib"))


In [43]:
@component(packages_to_install=["pandas", "scikit-learn", "joblib", "numpy"])
def evaluate_model(test_dataset_path: InputPath('Dataset'),
                   model: Input[Model],
                   metrics: Output[Metrics]):
    import pandas as pd
    import joblib
    import numpy as np
    from sklearn.metrics import accuracy_score, f1_score, roc_auc_score


    test_df = pd.read_csv(test_dataset_path)
    X_test = test_df.drop(['TenYearCHD', 'patientID'], axis=1)
    y_test = test_df['TenYearCHD']


    trained_model = joblib.load(model.path + "/model.joblib")

    y_pred = trained_model.predict(X_test)
    y_pred_2 = trained_model.predict_proba(X_test)[:, 1]
    auc_pr = roc_auc_score(y_test, y_pred_2)


    accuracy = accuracy_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred, average='weighted')


    metrics.log_metric("accuracy", accuracy)
    metrics.log_metric("f1_score", f1)
    metrics.log_metric("AUC_PR", auc_pr)


  return component_factory.create_component_from_func(


In [44]:
@pipeline(name='final-project-pipeline')
def final_project_pipeline(training_dataset_path: str, validation_dataset_path: str):

    # Perform imputation before data preparation
    training_imputation = impute_multiple_features_training(training_dataset_path=training_dataset_path)
    validation_imputation = impute_multiple_features_validation(validation_dataset_path=validation_dataset_path,
                                                                feature_medians=training_imputation.outputs['feature_medians'])

    # Data preparation using the output from the imputation step
    training_data_preparation = perform_initial_data_preparation(input_dataset_path=training_imputation.outputs['imputed_dataset_path'])
    validation_data_preparation = perform_initial_data_preparation(input_dataset_path=validation_imputation.outputs['imputed_dataset_path'])

    # Perform SMOTE oversampling on the training partition
    oversampled_training_data = perform_SMOTE(input_df_path=training_data_preparation.outputs['output_dataset_path'])

    # Training and evaluation using the prepared datasets
    trained_random_forest = train_random_forest(training_dataset_path=oversampled_training_data.outputs['output_df_path'])
    evaluate_model(test_dataset_path=validation_data_preparation.outputs['output_dataset_path'],
                   model=trained_random_forest.outputs['trained_model_artifact'])

    trained_logistic_regression = train_logistic_regression(training_dataset_path=oversampled_training_data.outputs['output_df_path'])
    evaluate_model(test_dataset_path=validation_data_preparation.outputs['output_dataset_path'],
                   model=trained_logistic_regression.outputs['trained_model_artifact'])

    trained_gradient_boosting = train_gradient_boosting(training_dataset_path=oversampled_training_data.outputs['output_df_path'])
    evaluate_model(test_dataset_path=validation_data_preparation.outputs['output_dataset_path'],
                   model=trained_gradient_boosting.outputs['trained_model_artifact'])


In [45]:
compiler.Compiler().compile(
    pipeline_func=final_project_pipeline,
    package_path='final_project_pipeline.json'
)

In [46]:
pipeline_job = aiplatform.PipelineJob(
    display_name='final_project_pipeline',
    template_path='final_project_pipeline.json',
    pipeline_root='gs://final-bucket-2',
    parameter_values={
        'training_dataset_path': 'gs://final-bucket-2/train_data.csv',
        'validation_dataset_path': 'gs://final-bucket-2/val_data.csv'
    },
    enable_caching=True
)

In [47]:
pipeline_job.run()


INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/795244162107/locations/us-central1/pipelineJobs/final-project-pipeline-20240502235419
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/795244162107/locations/us-central1/pipelineJobs/final-project-pipeline-20240502235419')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/final-project-pipeline-20240502235419?project=795244162107
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/795244162107/locations/us-central1/pipelineJobs/final-project-pipeline-20240502235419 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/795244162107/locations/us