In [None]:
#! pip install kfp

In [None]:
from google.cloud import aiplatform
aiplatform.init(project=PROJECT_ID, location=REGION)

from kfp import compiler
from kfp.dsl import pipeline, component, InputPath, OutputPath, Input, Output, Dataset, Artifact, Model, Metrics
import joblib, gcsfs, fsspec
import pandas as pd
import numpy as np

In [None]:
@component(packages_to_install=["pandas", "numpy","fsspec", "gcsfs"])
def loading_data(input_dataset_path:str, output_dataset_path: OutputPath('Dataset')):
    import pandas as pd

    df = pd.read_csv(input_dataset_path,keep_default_na=False,na_values=[""])
    df.to_csv(output_dataset_path, index = False)

In [None]:
@component(packages_to_install=["pandas", 'numpy'])
def preprocess_data(input_data_path: InputPath('Dataset'),
                    output_data_path: OutputPath('Dataset')):
    import pandas as pd
    import numpy as np

    df = pd.read_csv(input_data_path,keep_default_na=False,na_values=[""])

    # 1. Handling Missing Values
    df['Number of Prior Visits'] = df['Number of Prior Visits'].fillna(df['Number of Prior Visits'].mode()[0])
    df['Medications Prescribed'] = df['Medications Prescribed'].fillna(df['Medications Prescribed'].mode()[0])

    # Remove age outliers
    df = df[df['Age'] <= 100]

    # 2. Feature Engineering
    exercise_map = {'None': 0, 'Occasional': 1, 'Regular': 2}
    df['Exercise_Encoded'] = df['Exercise Frequency'].map(exercise_map)

    def bmi_category(bmi):
        if bmi < 18.5:
            return 'Underweight'
        elif bmi < 25:
            return 'Normal'
        elif bmi < 30:
            return 'Overweight'
        else:
            return 'Obese'
    df['BMI_Category'] = df['BMI'].apply(bmi_category)

    def age_group(age):
        if age < 40:
            return '<40'
        elif age < 65:
            return '40-64'
        else:
            return '65+'
    df['Age_Group'] = df['Age'].apply(age_group)

    # One-hot encode
    df = pd.get_dummies(df, columns=[
        'Gender', 'Ethnicity', 'Diet Type', 'Type of Treatment',
        'BMI_Category', 'Age_Group'
    ], drop_first=True)

    # 3. skewed variable
    df['LOS_Log'] = np.log1p(df['Length of Stay'])


    # 4. Feature Selection
    df = df.drop(columns=[
        'Hospital ID', 'Adjusted Weight (kg)', 'Weight (kg)', 'Exercise Frequency', 'Length of Stay'
    ])

    df.to_csv(output_data_path, index=False)


In [None]:
@component(packages_to_install = ['pandas', 'numpy', 'scikit-learn', 'joblib', 'fsspec', 'gcsfs'])
def normalize_testing_data(dataset_path: InputPath('Dataset'),
                           scaler_path:str,
                            normalized_dataset_path: OutputPath('Dataset')
                            ):
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import joblib
    import gcsfs

    # Load the dataset
    df = pd.read_csv(dataset_path)
    if 'PatientID' not in df.columns:
      raise ValueError('Testing data must contain PatientID column')
    patient_ids = df['PatientID']
    features = df.drop(columns=['PatientID'])

    with gcsfs.GCSFileSystem().open(scaler_path, 'rb') as f:
      scaler = joblib.load(f)

    # Save the normalized dataset
    df_scaled = pd.DataFrame(scaler.transform(features), columns=features.columns)
    df_scaled['PatientID'] = patient_ids
    df_scaled.to_csv(normalized_dataset_path, index=False)

In [None]:
@component(packages_to_install = ['pandas', 'numpy', 'scikit-learn', 'joblib', 'fsspec', 'gcsfs'])
def predicting_model(testing_data_path: InputPath('Dataset'), model_path:str, prediction_dataset_path:OutputPath('Dataset')):
  import pandas as pd
  import joblib
  import gcsfs

  test_data = pd.read_csv(testing_data_path)
  if 'PatientID' not in test_data.columns:
    raise ValueError('Testing data must contain PatientID column')

  patient_ids = test_data['PatientID']
  X_test = test_data.drop(columns=['PatientID'])

  with gcsfs.GCSFileSystem().open(model_path, 'rb') as f:
    model = joblib.load(f)

  pd.DataFrame(
      {
          'PatientID': patient_ids,
          'predicted_target': model.predict(X_test)
      }
  ).to_csv(prediction_dataset_path, index=False)


In [None]:
@pipeline(name='healthcare_readmissions_testing_pipeline')
def healthcare_readmissions_testing_pipeline(
    healthcare_readmissions_dataset_path: str,
    scaler_uri: str,
    model_uri: str
):
    load_data_task = loading_data(
        input_dataset_path=healthcare_readmissions_dataset_path
    )

    preprocess_task = preprocess_data(
        input_data_path=load_data_task.output
    )

    normalize_data = normalize_testing_data(
        dataset_path=preprocess_task.output,
        scaler_path=scaler_uri
    )

    model_prediction = predicting_model(
        testing_data_path=normalize_data.output,
        model_path=model_uri
    )


In [None]:
from kfp.v2 import compiler

# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=healthcare_readmissions_testing_pipeline,
    package_path='healthcare_readmissions_testing_pipeline.json'  # This is the output file
)

In [None]:
from google.cloud import aiplatform

pipeline_job = aiplatform.PipelineJob(
    display_name='healthcare_readmissions_testing_pipeline',
    template_path='healthcare_readmissions_testing_pipeline.json',  # Updated to the correct pipeline file name
    pipeline_root=healthcare_readmissions_dataset,
    parameter_values={
      'healthcare_readmissions_dataset_path': f'{healthcare_readmissions_dataset}/healthcare_readmissions_dataset_test.csv',
      'scaler_uri':scaler_uri,
      'model_uri': model_uri

    },
    enable_caching=True
)

In [None]:
pipeline_job.run()

INFO:google.cloud.aiplatform.pipeline_jobs:Creating PipelineJob
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob created. Resource name: projects/652395460584/locations/us-central1/pipelineJobs/healthcare-readmissions-testing-pipeline-20250506234007
INFO:google.cloud.aiplatform.pipeline_jobs:To use this PipelineJob in another session:
INFO:google.cloud.aiplatform.pipeline_jobs:pipeline_job = aiplatform.PipelineJob.get('projects/652395460584/locations/us-central1/pipelineJobs/healthcare-readmissions-testing-pipeline-20250506234007')
INFO:google.cloud.aiplatform.pipeline_jobs:View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/healthcare-readmissions-testing-pipeline-20250506234007?project=652395460584
INFO:google.cloud.aiplatform.pipeline_jobs:PipelineJob projects/652395460584/locations/us-central1/pipelineJobs/healthcare-readmissions-testing-pipeline-20250506234007 current state:
PipelineState.PIPELINE_STATE_RUNNING
INFO:google.cloud