# Kubeflow Pipelines, Metadata, Artifacts, Tasks,...

In [None]:
!pip install -q google-cloud-aiplatform kfp

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp import compiler, dsl
from kfp.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath

from google.cloud import aiplatform

## Pipeline Specification

In [None]:
PROJECT_ID = "YOUR PROJECT"
BUCKET_NAME = "YOUR BUCKET"
REGION = "YOUR REGION"
PIPELINE_ROOT = f"gs://{BUCKET}/pipeline_root/"

In [None]:
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "db-dtypes"],
    base_image="python:3.9",
    output_component_file="create_dataset.yaml"
)
def get_dataframe(
    bq_table: str,
    output_data_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd
    import os

    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    bqclient = bigquery.Client(project=project_number)
    table = bigquery.TableReference.from_string(
        bq_table
    )
    rows = bqclient.list_rows(
        table
    )
    dataframe = rows.to_dataframe(
        create_bqstorage_client=True,
    )
    dataframe = dataframe.sample(frac=1, random_state=2)
    dataframe.to_csv(output_data_path)

In [None]:
@component(
    packages_to_install=["scikit-learn==1.3.2", "pandas", "joblib", "db-dtypes"],
    base_image="python:3.10",
    output_component_file="beans_model_component.yaml",
)
def sklearn_train(
    dataset: Input[Dataset],
    metrics: Output[Metrics],
    model: Output[Model]
):
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump

    import pandas as pd
    df = pd.read_csv(dataset.path)
    labels = df.pop("Class").tolist()
    data = df.values.tolist()
    x_train, x_test, y_train, y_test = train_test_split(data, labels)

    skmodel = DecisionTreeClassifier()
    skmodel.fit(x_train,y_train)
    score = skmodel.score(x_test,y_test)
    print('accuracy is:',score)

    metrics.log_metric("accuracy",(score * 100.0))
    metrics.log_metric("framework", "Scikit Learn")
    metrics.log_metric("dataset_size", len(df))
    dump(skmodel, model.path + ".joblib")

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.10",
    output_component_file="beans_deploy_component.yaml",
)
def deploy_model(
    model: Input[Model],
    project: str,
    region: str,
    vertex_endpoint: Output[Artifact],
    vertex_model: Output[Model]
):
    from google.cloud import aiplatform

    aiplatform.init(project=project, location=region)

    deployed_model = aiplatform.Model.upload(
        display_name="beans-model-pipeline",
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.1-3:latest"
    )
    endpoint = deployed_model.deploy(machine_type="n1-standard-4")

    # Save data to the output params
    vertex_endpoint.uri = endpoint.resource_name
    vertex_model.uri = deployed_model.resource_name

In [None]:
@pipeline(
    # Default pipeline root. You can override it when submitting the pipeline.
    pipeline_root=PIPELINE_ROOT,
    # A name for the pipeline.
    name="mlmd-pipeline",
)
def pipeline(
    bq_table: str = "",
    output_data_path: str = "data.csv",
    project: str = PROJECT_ID,
    region: str = REGION
):
    dataset_task = get_dataframe(
        bq_table=bq_table
    )

    model_task = sklearn_train(
        dataset=dataset_task.output
    )

    deploy_task = deploy_model(
        model=model_task.outputs["model"],
        project=project,
        region=region
    )

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="mlmd_pipeline.json"
)

## Pipeline Execution

In [None]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [None]:
run1 = aiplatform.PipelineJob(
    display_name="mlmd-pipeline",
    template_path="mlmd_pipeline.json",
    job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
    parameter_values={"bq_table": "sara-vertex-demos.beans_demo.small_dataset"},
    enable_caching=True
)

In [None]:
run2 = aiplatform.PipelineJob(
    display_name="mlmd-pipeline",
    template_path="mlmd_pipeline.json",
    job_id="mlmd-pipeline-large-{0}".format(TIMESTAMP),
    parameter_values={"bq_table": "sara-vertex-demos.beans_demo.large_dataset"},
    enable_caching=True,
)

In [None]:
run1.submit()

In [None]:
run2.submit()