In [1]:
# gets root directory to load config
import os
import sys

# Get the current directory
current_directory = os.getcwd()

# Move up one level to the parent directory
parent_directory = os.path.dirname(current_directory)

# Append the parent directory to sys.path
sys.path.append(parent_directory)

In [2]:
# %%writefile ../config/config.py

import os
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

pipeline_root = os.environ['PIPELINE_ROOT']
base_image = os.environ.get("CONTAINER_IMAGE")
serving_image = os.environ.get("SERVING_IMAGE")
project_id = os.environ['PROJECT_ID']
region = os.environ['REGION']
service_account = os.environ['SERVICE_ACCOUNT']
artifact_repo = os.environ['ARTIFACT_REPO']
gcs_url = os.environ['GCS_URL']
train_ratio = float(os.environ['TRAIN_RATIO'])
bucket_name = os.environ['BUCKET_NAME']

In [None]:
from kfp.v2 import dsl
from typing import NamedTuple

@dsl.component
def batch_predict(
    model_gcs_path: str,
    input_data_gcs_path: str,
    project: str,
    dataset_id: str,
    table_id: str,
    region: str,
    target_column: str = ""
) -> NamedTuple("Outputs", [("bigquery_table", str)]):
    """
    Load a model from GCS, preprocess input data, make predictions, and write the output to BigQuery.
    
    Args:
        model_gcs_path (str): GCS URI to the model file.
        input_data_gcs_path (str): GCS URI to the input data file.
        project (str): Google Cloud project ID.
        dataset_id (str): BigQuery dataset ID.
        table_id (str): BigQuery table ID.
        region (str): Google Cloud region.
        target_column (str): Name of the target column, if present in the input data.
    
    Returns:
        NamedTuple: Contains the BigQuery table reference where predictions are stored.
    """
    
    # Import libraries
    from google.cloud import storage, bigquery
    import pandas as pd
    import joblib
    import os
    
    # Load model from GCS
    storage_client = storage.Client()
    bucket_name, model_path = model_gcs_path.replace("gs://", "").split("/", 1)
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(model_path)
    model_filename = "/tmp/model.joblib"
    blob.download_to_filename(model_filename)
    model = joblib.load(model_filename)
    
    # Load input data for prediction
    _, input_data_path = input_data_gcs_path.replace("gs://", "").split("/", 1)
    blob = bucket.blob(input_data_path)
    input_data_filename = "/tmp/input_data.csv"
    blob.download_to_filename(input_data_filename)
    input_data = pd.read_csv(input_data_filename)
    
    # Preprocess input data
    if target_column:
        input_data.drop(columns=[target_column], inplace=True, errors='ignore')
    
    # Convert categorical columns to 'category' data type
    categorical_cols = input_data.select_dtypes(include=['object']).columns
    input_data[categorical_cols] = input_data[categorical_cols].astype('category')
    
    # Make predictions
    predictions = model.predict(input_data)
    
    # Write predictions to BigQuery
    bigquery_client = bigquery.Client(project=project)
    table_ref = f"{project}.{dataset_id}.{table_id}"
    job_config = bigquery.LoadJobConfig(
        schema=[
            bigquery.SchemaField("prediction", "FLOAT"),
        ],
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    )
    job = bigquery_client.load_table_from_dataframe(
        pd.DataFrame({"prediction": predictions}),
        table_ref,
        job_config=job_config,
    )
    job.result()  # Wait for the job to complete
    
    return (f"Predictions written to {table_ref}",)


In [None]:
# %%writefile ../mlplatform_pipeline.py

# integrate into a self contained script for automated run

from kfp.v2 import dsl, compiler
from kfp.v2.dsl import pipeline
from components.load_data import load_data
from components.preprocess_data import preprocess_data
from components.train_random_forest import train_random_forest
from components.train_decision_tree import train_decision_tree
from components.evaluate_model import evaluate_model
from components.deploy_model import deploy_model
from config.config import gcs_url, train_ratio, project_id, region, serving_image, service_account, pipeline_root

@pipeline(
    name="ml-platform-pipeline",
    description="A pipeline that performs data loading, preprocessing, model training, evaluation, and deployment",
    pipeline_root= pipeline_root
)
def mlplatform_pipeline(
    gcs_url: str = gcs_url,
    train_ratio: float = train_ratio,
    ):
    load_data_op = load_data(gcs_url=gcs_url)
    

if __name__ == "__main__":
    # Compiling the pipeline
    pipeline_filename = "mlplatform_pipeline.json"
    compiler.Compiler().compile(
        pipeline_func=mlplatform_pipeline,
        package_path=pipeline_filename
    )

    # Deploying the pipeline to Vertex AI
    from google.cloud import aiplatform
    aiplatform.init(project=project_id, location=region)
    _ = aiplatform.PipelineJob(
        display_name="ml-platform-pipeline",
        template_path=pipeline_filename,
        parameter_values={
            "gcs_url": gcs_url,
            "train_ratio": train_ratio
        },
        enable_caching=True
    ).submit(service_account=service_account)