In [7]:
%%capture
!pip3 install google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 install scikit-learn
!pip3 install google-cloud-aiplatform --upgrade
!pip3 install pandas
!pip3 install imbalanced-learn

In [8]:
import uuid
import kfp
from google.cloud import aiplatform
from kfp.v2.dsl import component
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component)
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [9]:
#https://stackoverflow.com/a/54028874
%load_ext dotenv
%dotenv
import os
PROJECT_ID = os.environ['PROJECT_ID']
BUCKET_NAME = os.environ['BUCKET']

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)
REGION = 'us-central1'

print(PROJECT_ID)
print(BUCKET_NAME)
print(PIPELINE_ROOT)

The dotenv extension is already loaded. To reload it, use:
  %reload_ext dotenv
kubeflow-demos
user-group-demo
gs://user-group-demo/pipeline_root


In [10]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","pyarrow"])
def preprocess(output_csv_path: OutputPath('CSV')):
    #1
    from google.cloud import bigquery
    import google.auth
    
    creds, project = google.auth.default()
    client = bigquery.Client(project='kubeflow-demos', credentials=creds)

    query =     """
            SELECT * FROM `kubeflow-demos.imdb.imdb_review_sentiment_strings`
    """
    print(query)
    
    dataframe = client.query(query).to_dataframe()
    print(dataframe.head())
    
    dataframe.to_csv(output_csv_path)
    print("done")
       
@component(packages_to_install=["pandas", "imbalanced-learn", "google-cloud-aiplatform", "pyarrow"])
def train(input_csv_path: InputPath('CSV'), saved_model: Output[Model], artifact_uri: OutputPath(str), experiment_name: str, run_name: str, num_epochs: int, project_id: str, bucket: str):
    from google.cloud import aiplatform
    from typing import NamedTuple
    
    aiplatform.init(
        project=project_id,
        location='us-central1',
        staging_bucket=bucket,
        experiment=experiment_name,
    )

    aiplatform.start_run(run_name)
    parameters = {"epochs": num_epochs}
    aiplatform.log_params(parameters)
    
    #1
    import pandas as pd
    from imblearn.under_sampling import  RandomUnderSampler
    df_review = pd.read_csv(input_csv_path)
    print(len(df_review))
    
    df_positive = df_review[df_review['sentiment']=='positive'][:9000]
    df_negative = df_review[df_review['sentiment']=='negative'][:1000]

    df_review_imb = pd.concat([df_positive, df_negative])
    df_review_imb.value_counts(['sentiment'])
    
    print(len(df_review_imb))
    rus = RandomUnderSampler(random_state=0)
    df_review_bal, df_review_bal['sentiment']=rus.fit_resample(df_review_imb[['review']], df_review_imb['sentiment'])
    
    print(len(df_review_bal))
    from sklearn.model_selection import train_test_split

    train, test = train_test_split(df_review_bal, test_size=0.33, random_state=42)
    train_x, train_y = train['review'], train['sentiment']
    test_x, test_y = test['review'], test['sentiment']

    print("train x values count")
    print(len(train_x))
    print("train y values count")
    print(train_y.value_counts())
    
    from sklearn.feature_extraction.text import TfidfVectorizer

    tfidf = TfidfVectorizer(stop_words='english')
    train_x_vector = tfidf.fit_transform(train_x)
    test_x_vector = tfidf.transform(test_x)
    
    print(train_x_vector)
    from sklearn.svm import SVC

    svc = SVC(kernel='linear')
    svc.fit(train_x_vector, train_y)
    
    print(svc.score(test_x_vector, test_y))

    #aiplatform.log_metrics({"accuracy": accu})
    import joblib
    
    joblib.dump(svc, os.path.join(saved_model.path.replace("saved_model",""), 'model.joblib'))
    print(" saved_model.path: "+ saved_model.path)
    print(" saved_model.uri: "+ saved_model.uri)
    with open(artifact_uri, 'w') as f:
        f.write(saved_model.uri.replace("saved_model",""))
    
    print(saved_model.uri)  

In [11]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [12]:
@kfp.dsl.pipeline(name="train-scikit" + str(uuid.uuid4()))
def pipeline(
    project: str = PROJECT_ID,
    bucket: str = BUCKET
):
    preprocess_task = preprocess()
    
    train_task = train(preprocess_task.output, "test-"+TIMESTAMP, "test-run-"+TIMESTAMP, 1, project_id=PROJECT_ID, bucket=BUCKET)
    
    model_upload_op = gcc_aip.ModelUploadOp(
        project=PROJECT_ID,
        display_name="model"+TIMESTAMP, 
#        artifact_uri="gs://user-group-demo/pipeline_root/141610882258/train-scikitf989f632-b955-4bb1-a72d-0480d1c08627-20210620145355/train_-6780204423378370560/", # GCS location of model
        artifact_uri=train_task.outputs["artifact_uri"], # GCS location of model
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
    )
    
    endpoint_create_op = gcc_aip.EndpointCreateOp(
        project=PROJECT_ID,
        display_name="pipelines"+TIMESTAMP,
    )

    model_deploy_op = gcc_aip.ModelDeployOp( 
        project=PROJECT_ID,
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name="model_display_name",
        machine_type="n1-standard-4",
    )


In [13]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, 
                            package_path="dag-"+TIMESTAMP+".json")

In [14]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

In [15]:
response = api_client.create_run_from_job_spec(
    "dag-"+TIMESTAMP+".json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={},
)