### Installation
Install the packages required for executing this notebook.

In [1]:
# Install the packages
! pip3 install --user --no-cache-dir --upgrade "kfp>2" "google-cloud-pipeline-components>2" \
                                        google-cloud-aiplatform

Collecting kfp>2
  Downloading kfp-2.14.6-py3-none-any.whl.metadata (4.4 kB)
Collecting google-cloud-pipeline-components>2
  Downloading google_cloud_pipeline_components-2.21.0-py3-none-any.whl.metadata (5.7 kB)
Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.122.0-py2.py3-none-any.whl.metadata (44 kB)
Collecting kfp-pipeline-spec<3,>=2.14.3 (from kfp>2)
  Downloading kfp_pipeline_spec-2.14.6-py3-none-any.whl.metadata (433 bytes)
Downloading kfp-2.14.6-py3-none-any.whl (374 kB)
Downloading kfp_pipeline_spec-2.14.6-py3-none-any.whl (9.6 kB)
Downloading google_cloud_pipeline_components-2.21.0-py3-none-any.whl (1.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m72.6 MB/s[0m  [33m0:00:00[0m
[?25hDownloading google_cloud_aiplatform-1.122.0-py2.py3-none-any.whl (8.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.1/8.1 MB[0m [31m202.5 MB/s[0m  [33m0:00:00[0m
[?25hInstalling collected packages: kfp

## Restart the kernel
Once you've installed the additional packages, you need to restart the notebook kernel so it can find the packages.

In [2]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

## Check the versions of the packages you installed. The KFP SDK version should be >2.

In [1]:
! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
! pip3 freeze | grep aiplatform
! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

KFP SDK version: 2.14.6
google-cloud-aiplatform==1.122.0
google_cloud_pipeline_components version: 2.21.0


In [14]:
import kfp
import typing
from typing import Dict
from typing import NamedTuple
from kfp import dsl
from kfp.dsl import (Artifact,
                        Dataset,
                        Input,
                        Model,
                        Output,
                        Metrics,
                        ClassificationMetrics,
                        component, 
                        OutputPath, 
                        InputPath)
import google.cloud.aiplatform as aip
from google_cloud_pipeline_components.types import artifact_types

#### Project and Pipeline Configurations

In [3]:
#The Google Cloud project that this pipeline runs in.
PROJECT_ID = "atomic-wall-471907-n1"
# The region that this pipeline runs in
REGION = "us-central1"
# Specify a Cloud Storage URI that your pipelines service account can access. The artifacts of your pipeline runs are stored within the pipeline root.
PIPELINE_ROOT = "gs://ass1_temp_bucket/"

### Train Model

In [28]:
@dsl.component(
    packages_to_install=["pandas","tensorflow","scikit-learn", "fsspec","gcsfs"],
    base_image="python:3.10.7-slim"
)
def train_model(data_bucket:str, output_model: Output[Model]):
    """
    Function takes data file from the data bucket and trains a simple MLP model on it.
    """
    import random
    import pandas as pd
    from tensorflow import keras
    from sklearn.model_selection import train_test_split
    random.seed(67)

    # 1. Load dataset from data bucket
    #url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00267/data_banknote_authentication.txt"
    cols = ["variance", "skewness", "curtosis", "entropy", "class"]
    #df = pd.read_csv(url, header=None, names=cols)
    
    df = pd.read_csv(f"gs://{data_bucket}/data_banknote_authentication.txt", header=None, names=cols)

    X = df[["variance", "skewness", "curtosis", "entropy"]].values
    y = df["class"].values

    # 2. Split data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # 3. Normalize inside the model (so we don’t need a separate scaler)
    normalizer = keras.layers.Normalization()
    normalizer.adapt(X_train)

    # 4. Build simple MLP model
    model = keras.Sequential([
        normalizer,
        keras.layers.Dense(8, activation="relu"),
        keras.layers.Dense(4, activation="relu"),
        keras.layers.Dense(1, activation="sigmoid")
    ])

    model.compile(optimizer="adam", loss="binary_crossentropy", metrics=["accuracy"])

    # 5. Train model
    model.fit(X_train, y_train, epochs=15, batch_size=8, validation_split=0.2, verbose=1)

    # 6. Evaluate model
    loss, acc = model.evaluate(X_test, y_test, verbose=0)
    print(f" Test accuracy: {acc:.3f}")

    # 7. Save model in the same folder
    model.save("model.keras")
    print(" Model saved as model.keras")
    
    # 8. Add metadata
    metadata = {
        "accuracy": acc,
        "algo": "MLP",
        "file_type": ".keras"
    }
        
    # 9. Attach metadata to Vertex artifact (for pipelines)
    output_model.metadata.update(metadata)

In [29]:
# quick test
from kfp import local
local.init(runner=local.DockerRunner())
train_model(
    data_bucket = 'ass1_data_bucket'
)

13:00:13.799 - INFO - Executing task [96m'train-model'[0m
13:00:13.801 - INFO - Streamed logs:

    Found image 'python:3.10.7-slim'

    
    [notice] A new release of pip available: 22.2.2 -> 25.3
    [notice] To update, run: pip install --upgrade pip
    [KFP Executor 2025-10-29 13:01:16,880 INFO]: Looking for component `train_model` in --component_module_path `/tmp/tmp.SZsU6BZTM1/ephemeral_component.py`
    [KFP Executor 2025-10-29 13:01:16,880 INFO]: Loading KFP component "train_model" from /tmp/tmp.SZsU6BZTM1/ephemeral_component.py (directory "/tmp/tmp.SZsU6BZTM1" and module name "ephemeral_component")
    [KFP Executor 2025-10-29 13:01:16,881 INFO]: Got executor_input:
    {
        "inputs": {
            "parameterValues": {
                "data_bucket": "ass1_data_bucket"
            }
        },
        "outputs": {
            "artifacts": {
                "output_model": {
                    "artifacts": [
                        {
                            "name": 

<kfp.dsl.pipeline_task.PipelineTask at 0x7f5aee4aafb0>

### Compare models

In [37]:
@dsl.component(
    packages_to_install=["google-cloud-stroage"],
    base_image="python:3.10.7-slim"
)
def compare_model(new_model: Input[Model], model_bucket_metadata: str):
    """
    Function compares local model to existing one in model bucket
    :returns: "NEW" if accuracy of the new model is better, "EXISTING" if the old one is better.
    """
    import json, tempfile, os
    from google.cloud import storage
    
    # Get new model accuracy
    new_accuracy = float(new_model.metadata.get("accuracy", 0))

    # Get old model accuracy
    bucket_name, blob_path = model_bucket_metadata.replace("gs://", "").split("/", 1)
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_path)
    
    # Temporarily download the older model from the model bucket
    tmp = tempfile.NamedTemporaryFile(delete=False)
    blob.download_to_filename(tmp.name)
    with open(tmp.name, "r") as f:
        old_metadata = json.load(f)
        
    old_accuracy = float(old_metadata["accuracy"])
    
    os.remove(tmp.name)
    # Check whether the new model outperforms the old one
    decision = "NEW" if new_accuracy > old_accuracy else "EXISTING"
    return decision


### Upload Model and Metrics to Google Bucket 

In [9]:
@dsl.component(
    packages_to_install=["google-cloud-storage"],
    base_image="python:3.10.7-slim"
)
def upload_model_to_gcs(project_id: str, model_repo: str):
    '''upload model to gsc'''
    from google.cloud import storage
    from urllib.parse import urlparse
    import os
    
    # Parse bucket and prefix
    parsed = urlparse(model_repo)
    bucket_name = parsed.netloc
    prefix = parsed.path.lstrip("/")

    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)

    # --- Upload model.keras ---
    model_path = "model.keras"
    model_blob_name = os.path.join(prefix, os.path.basename(model_path))
    bucket.blob(model_blob_name).upload_from_filename(model_path)

    # --- Upload model_metadata.json ---
    metadata_path = "model_metadata.json"
    metadata_blob_name = os.path.join(prefix, os.path.basename(metadata_path))
    bucket.blob(metadata_blob_name).upload_from_filename(metadata_path)



#### Define the Pipeline

In [38]:
# Define the workflow of the pipeline.
@kfp.dsl.pipeline(
    name="banknote-authentication-training-pipeline")
def pipeline(project_id: str, data_bucket: str, model_repo: str, model_bucket_metadata: str):
    """
    Building the pipeline
    """
    # New stuff
    training_mlp_job_run_op = train_model(
        data_bucket = data_bucket
    )
    
    compare_model_job_run_op = compare_model(
        new_model = training_mlp_job_run_op.outputs["output_model"],
        model_bucket_metadata = model_bucket_metadata
    )
    
    with dsl.If(compare_model_job_run_op.outputs["decision"] == "NEW"):
        upload_model_to_gcs(
            project_id = project_id,
            model_repo = model_repo
        )
    """
    # Old stuff
    
    di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=trainset_filename
    )

 
    training_mlp_job_run_op = train_mlp(
        features=di_op.outputs["dataset"]
    )
    
     
    training_lr_job_run_op = train_lr(
        features=di_op.outputs["dataset"]
    )
    
    pre_di_op = download_data(
        project_id=project_id,
        bucket=data_bucket,
        file_name=testset_filename
    ).after(training_mlp_job_run_op, training_lr_job_run_op)
        
        
    comp_model__op = compare_model(mlp_metrics=training_mlp_job_run_op.outputs["metrics"],
                                       lr_metrics=training_lr_job_run_op.outputs["metrics"]).after(training_mlp_job_run_op, training_lr_job_run_op)  
    
    # defining the branching condition
    with dsl.If(comp_model__op.output=="MLP"):
        predict_mlp_job_run_op = predict_mlp(
            model=training_mlp_job_run_op.outputs["out_model"],      
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_mlp_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_mlp_job_run_op.outputs['out_model']
        ).after(predict_mlp_job_run_op)
        
    with dsl.If(comp_model__op.output=="LR"):
        predict_lr_job_run_op = predict_lr(
            model=training_lr_job_run_op.outputs["out_model"],     
            features=pre_di_op.outputs["dataset"]
        )
        upload_model_lr_to_gc_op = upload_model_to_gcs(
            project_id=project_id,
            model_repo=model_repo,
            model=training_lr_job_run_op.outputs['out_model']
        ).after(predict_lr_job_run_op) 
    """

KeyError: 'decision'

#### Compile the pipeline into a JSON file

In [12]:
from kfp import compiler
compiler.Compiler().compile(pipeline_func=pipeline,
        package_path='cloudbuild.yaml')

#### Submit the pipeline run

In [None]:
import google.cloud.aiplatform as aip

# Before initializing, make sure to set the GOOGLE_APPLICATION_CREDENTIALS
# environment variable to the path of your service account.
aip.init(
    project=PROJECT_ID,
    staging_bucket=PIPELINE_ROOT,
)

# Prepare the pipeline job
job = aip.PipelineJob(
    display_name="banknote-authentication-training-pipeline"",
    enable_caching=False,
    template_path="cloudbuild.yaml",
    pipeline_root=PIPELINE_ROOT,
    location=REGION,
    parameter_values={
        'project_id': PROJECT_ID, # makesure to use your project id 
        'data_bucket': 'ass1_data_bucket',  # makesure to use your data bucket name 
        'model_repo':'ass1_model_bucket' # makesure to use your model bucket name
        'model_bucket_metadata': 'gs://ass1_model_bucket/model_metadata.json'
    }
)

job.run()