### Install packages locally

In [4]:
%%capture
!pip3 install google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 install google-cloud-aiplatform --upgrade

### Relevant imports

In [72]:
import uuid
from kfp import dsl
import kfp
from google.cloud import aiplatform
from kfp.v2.dsl import component
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, ClassificationMetrics, Metrics)
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [73]:
PROJECT_ID = 'kubeflow-demos'
BUCKET_NAME = 'test-pkl'

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)
REGION = 'us-central1'

print(PROJECT_ID)
print(BUCKET_NAME)
print(PIPELINE_ROOT)

kubeflow-demos
test-pkl
gs://test-pkl/pipeline_root


### Add @components annotation with packages_to_install array
Simulating a preprocess step that creates and retuerns an **ndarray**

In [74]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","numpy","pyarrow"])
def preprocess(artifact_uri: OutputPath()):
    from google.cloud import bigquery
    import google.auth
    
    creds, project = google.auth.default()
    client = bigquery.Client(project='kubeflow-demos', credentials=creds)

    query =     """
            SELECT * FROM `kubeflow-demos.telco.churn`
    """
    
    dataframe = client.query(query).to_dataframe()
    dataframe = dataframe.head()
    
    import pickle
    dataframe.to_pickle(artifact_uri)
    print(artifact_uri)

    

In [75]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","numpy","pyarrow"])
def train(artifact_uri: InputPath()):
    import pickle
    import pandas
    dataframe = pandas.read_pickle(artifact_uri)
    print(dataframe.head())
    

#### Use kubeflow wiring to pass data between steps

### Compile and run

In [84]:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

@kfp.dsl.pipeline(name="propensity-model")
def pipeline(
    project: str = PROJECT_ID,
    bucket: str = BUCKET_NAME,
    baseline_accuracy: float = 70.0
):
    proprocess_task = preprocess().set_caching_options(True)
    
    train_task = train(proprocess_task.output).set_caching_options(True)

from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, 
                            package_path="dag-"+TIMESTAMP+".json")

from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

response = api_client.create_run_from_job_spec(
    "dag-"+TIMESTAMP+".json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"baseline_accuracy": 80.0},
)

