In [None]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp,ModelDeployOp)
from google_cloud_pipeline_components.types import artifact_types

In [None]:
PROJECT_ID = "DE2024"  # Replace with your Google Cloud project ID
REGION = "us-central1"           # Adjust the region as needed
PIPELINE_ROOT = "gs://mlops_team4_de2024/California_Houses.csv"  # Replace with your GCS bucket URI


In [None]:
# Train Test Split
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_test_split(dataset: Input[Dataset], dataset_train: Output[Dataset], dataset_test: Output[Dataset]):
    '''Splits the California housing dataset into training and testing sets.'''
    import pandas as pd
    import logging 
    import sys
    from sklearn.model_selection import train_test_split as tts

    logging.basicConfig(stream=sys.stdout, level=logging.INFO) 
    
    # Load data from the dataset
    alldata = pd.read_csv(dataset.path, index_col=None)
    train, test = tts(alldata, test_size=0.3)
    
    # Save the splits
    train.to_csv(dataset_train.path + ".csv" , index=False, encoding='utf-8-sig')
    test.to_csv(dataset_test.path + ".csv" , index=False, encoding='utf-8-sig')


In [None]:
# Training Component
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2"],
    base_image="python:3.10.7-slim"
)
def train_regression_model(features: Input[Dataset], model: Output[Model]):
    '''Train a regression model using Linear Regression.'''
    import pandas as pd
    from sklearn.linear_model import LinearRegression        
    import pickle 
    
    # Load the training data
    data = pd.read_csv(features.path+".csv")
    
    # Train a Linear Regression model
    model_lr = LinearRegression()
    X = data.drop('median_house_value', axis=1)  # We can change 'median_house_value' to our target column name
    y = data['median_house_value']
    model_lr.fit(X, y)

    # Save the model to the specified path
    file_name = model.path + f".pkl"
    with open(file_name, 'wb') as file:  
        pickle.dump(model_lr, file)   


In [None]:
# Model Evaluation
@dsl.component(
    packages_to_install=["pandas", "scikit-learn==1.3.2", "numpy"],
    base_image="python:3.10.7-slim"
)
def evaluate_model(
    test_set: Input[Dataset],
    model: Input[Model],
    metrics: Output[Metrics]
):
    '''Evaluate the trained regression model.'''
    import pandas as pd
    import pickle
    from sklearn.metrics import mean_absolute_error, mean_squared_error
    import numpy as np

    # Load test data and model
    data = pd.read_csv(test_set.path + ".csv")
    X_test = data.drop('median_house_value', axis=1)
    y_test = data['median_house_value']
    
    model_file = model.path + ".pkl"
    loaded_model = pickle.load(open(model_file, 'rb'))

    # Predictions
    y_pred = loaded_model.predict(X_test)

    # Calculate metrics
    mae = mean_absolute_error(y_test, y_pred)
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))

    # Log metrics
    metrics.log_metric("mean_absolute_error", mae)
    metrics.log_metric("root_mean_squared_error", rmse)


In [None]:
# Uploading model and metrics to Google Bucket
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str, model: Input[Model]):
    '''Upload the trained model to Google Cloud Storage.'''
    from google.cloud import storage   
    import logging 

    logging.basicConfig(level=logging.INFO)    
  
    # Upload the model to GCS
    client = storage.Client(project=project_id)
    bucket = client.bucket(model_repo)
    blob = bucket.blob('model.pkl')
    source_file_name = model.path + '.pkl'
   
    blob.upload_from_filename(source_file_name)    
    
    print(f"File {source_file_name} uploaded to {model_repo}.")


In [None]:
# Defining pipeline
@kfp.dsl.pipeline(
    name="california-housing-training-pipeline")
def pipeline(project_id: str, data_bucket: str, dataset_uri: str, model_repo: str, thresholds_dict_str: str, model_repo_uri: str):    
    
    # Import the dataset
    dataset_op = kfp.dsl.importer(
        artifact_uri=dataset_uri,
        artifact_class=Dataset,
        reimport=False,
    )
     
    # Step 1: Split dataset into training and testing sets
    train_test_split_op = train_test_split(dataset=dataset_op.output)
        
    # Step 2: Train the regression model
    training_model_op = train_regression_model(features=train_test_split_op.outputs["dataset_train"])
    
    # Step 3: Evaluate the model
    model_evaluation_op = evaluate_model(
        test_set=train_test_split_op.outputs["dataset_test"],
        model=training_model_op.outputs["model"],
        thresholds_dict_str=thresholds_dict_str, # Model performance threshold
    )
    
    with dsl.If(
        model_evaluation_op.outputs["approval"] == True,
        name="approve-model",
    ):
        # Step 4: Upload the model to GCS if approved
        upload_model_to_gcs_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_model_op.outputs['model']
        )    
        
        # Step 5: Create an unmanaged model to deploy
        import_unmanaged_model_task = dsl.importer(
            artifact_uri=model_repo_uri,
            artifact_class=artifact_types.UnmanagedContainerModel,
            metadata={
                "containerSpec": {
                    "imageUri": "us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest",  
                },
            },
        ).after(upload_model_to_gcs_op)      
       
        # Step 6: Upload the model to Vertex AI
        model_upload_op = ModelUploadOp(
            project=project_id,
            display_name="california-housing-model",
            unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
        ).after(import_unmanaged_model_task)       
               

        # Step 7: Create an endpoint for model deployment
        create_endpoint_op = EndpointCreateOp(
            project=project_id,
            display_name="california-housing-service",
        ).after(model_upload_op)      
        
        # Step 8: Deploy the model to the endpoint
        model_deploy_op = ModelDeployOp(
            model=model_upload_op.outputs["model"],
            endpoint=create_endpoint_op.outputs['endpoint'],
            deployed_model_display_name="california-housing-model",
            dedicated_resources_machine_type="n1-standard-4",
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            traffic_split={"0": 100},
        ).after(create_endpoint_op)     


In [None]:
# Compiling pipeline
from kfp import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path='california_housing_training_pipeline.yaml'
)


In [None]:
# Submitting pipeline
import google.cloud.aiplatform as aip

# Initialize the AI Platform
aip.init(
    project=PROJECT_ID,
    location=REGION,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="california-housing-predictor",
    enable_caching=False,
    template_path="california_housing_training_pipeline.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, 
        'data_bucket': 'mlops_team4_de2024',  
        'dataset_uri': 'gs://your-data-bucket-name/path/to/California_Houses.csv',  
        'model_repo': 'your-model-bucket-name',  # TODO: model bucket name 
        'thresholds_dict_str': '{"mae":5000}',  # Can set MAE threshold for approval
        'model_repo_uri': 'gs://your-model-bucket-name'  # TODO: GCS path for model repository
    }
)

# Run the pipeline job
job.run()
