## Running in Vertex Pipelines - Answers

Building the Docker container for exercise 1:

In [None]:
USER_NAME = "julian"
! make -C ../../ USER_NAME=$USER_NAME docker-push

Building and running full pipeline for exercises 2 + 3:

In [None]:
! mkdir -p _artifacts

In [None]:
from typing import NamedTuple

import kfp

from kfp.v2.dsl import (
    component,
    Input,
    InputPath,
    OutputPath,
    Output,
    Dataset,
    Metrics,
    Model
)

GCP_REGION = "europe-west3"

@component(
    base_image=f"{GCP_REGION}-docker.pkg.dev/gdd-cb-vertex/docker/fancy-fashion-{USER_NAME}",
    output_component_file="_artifacts/train.yaml",
)
def train(train_data_path: str, model: Output[Model]) -> None:
    """Trains the model on the given dataset."""
    
    from pathlib import Path
    import joblib
    
    from fancy_fashion.model import train_model
    from fancy_fashion.util import local_gcs_path
    
    trained_model = train_model(local_gcs_path(train_data_path))

    model_dir = Path(model.path)
    model_dir.mkdir(parents=True, exist_ok=True)
    joblib.dump(trained_model, model_dir / "model.pkl")


@component(
    base_image=f"{GCP_REGION}-docker.pkg.dev/gdd-cb-vertex/docker/fancy-fashion-{USER_NAME}",
    output_component_file="_artifacts/evaluate.yaml",
)
def evaluate(
    test_data_path: str, model: InputPath("Model"), metrics: Output[Metrics]
) -> NamedTuple("Outputs", [("loss", float), ("accuracy", float)]):
    # Exercise 3
    from pathlib import Path
    import joblib
    
    from fancy_fashion.model import evaluate_model
    from fancy_fashion.util import local_gcs_path
    
    model = joblib.load(Path(model) / "model.pkl")
    
    loss, accuracy = evaluate_model(model, local_gcs_path(test_data_path))

    metrics.log_metric("loss", loss)
    metrics.log_metric("accuracy", accuracy)


@component(
    base_image=f"{GCP_REGION}-docker.pkg.dev/gdd-cb-vertex/docker/fancy-fashion-{USER_NAME}",
    output_component_file="_artifacts/predict.yaml",
)
def predict(
    validation_data_path: str, model: InputPath("Model"), predictions: Output[Dataset]
) -> NamedTuple("Outputs", [("loss", float), ("accuracy", float)]):
    from pathlib import Path
    import joblib
    
    import pandas as pd

    from fancy_fashion.model import generate_predictions
    from fancy_fashion.util import local_gcs_path
    
    model = joblib.load(Path(model) / "model.pkl")

    predicted = generate_predictions(model, local_gcs_path(validation_data_path))
    
    predictions_dir = Path(predictions.path)
    predictions_dir.mkdir(parents=True, exist_ok=True)
    pd.DataFrame.from_records(predicted).to_parquet(predictions_dir / "predictions.parquet")
    

@kfp.dsl.pipeline(name="fancy-fashion-julian")
def pipeline(train_path: str, test_path: str, validation_path: str):
    train_task = train(train_path)
    evaluate_task = evaluate(test_path, train_task.outputs["model"]) 
    predict_task = predict(validation_path, train_task.outputs["model"])
    predict_task.after(evaluate_task)

In [None]:
from kfp.v2 import compiler

compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="_artifacts/pipeline.json",
)

In [None]:
from google.cloud.aiplatform.pipeline_jobs import PipelineJob

job = PipelineJob(
    display_name=f"fancy-fashion-{USER_NAME}",
    enable_caching=False,
    template_path="_artifacts/pipeline.json",
    parameter_values={
        "train_path": "gs://gdd-cb-vertex-fashion-inputs/train",
        "test_path": "gs://gdd-cb-vertex-fashion-inputs/test",
        "validation_path": "gs://gdd-cb-vertex-fashion-inputs/validation"
    },
    pipeline_root=f"gs://gdd-cb-vertex-fashion-artifacts/pipelines",
    location=GCP_REGION,
)

job.run(
    service_account=f"vmd-fashion@gdd-cb-vertex.iam.gserviceaccount.com"
)