In [None]:
#importer to get the task.py file to the second component

In [None]:
# Create run

In [1]:
%%capture
!pip3 install google-cloud-aiplatform==1.0.0 --upgrade
!pip3 install kfp google-cloud-pipeline-components==0.1.1 --upgrade
!pip3 install scikit-learn
!pip3 install google-cloud-aiplatform --upgrade
!pip3 install pandas
!pip3 install python-dotenv

In [2]:
import uuid
from kfp import dsl
import kfp
from google.cloud import aiplatform
from kfp.v2.dsl import component
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output, OutputPath, component, ClassificationMetrics, Metrics)
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [8]:
#https://stackoverflow.com/a/54028874
%load_ext dotenv
%dotenv

import os
PROJECT_ID = os.environ['PROJECT_ID']
BUCKET_NAME = os.environ['BUCKET']

PIPELINE_ROOT = 'gs://{}/pipeline_root'.format(BUCKET_NAME)
REGION = 'us-central1'

print(PROJECT_ID)
print(BUCKET_NAME)
print(PIPELINE_ROOT)

kubeflow-demos
user-group-demo
gs://user-group-demo/pipeline_root


In [22]:
#test-pkl/task.py
@component(packages_to_install=["google-cloud-storage"])
def download_file(bucket_name: str, source_blob_name: str, output_file_path: OutputPath()):

    from google.cloud import storage
    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(output_file_path)

    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, output_file_path
        )
    )


In [34]:
@component(packages_to_install=["pandas", "google-cloud-aiplatform", "google-cloud-storage", "pillow", "numpy"])
def train(input_file_path: InputPath()):
    import os
    import sys

    from google.cloud import aiplatform
    from google.cloud.aiplatform import gapic as aip

    PROJECT_ID = "kubeflow-demos"
    BUCKET_NAME = "gs://test-pkl"  # @param {type:"string"}
    REGION = "us-central1"

    aiplatform.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_NAME)

    TRAIN_GPU, TRAIN_NGPU = (aip.AcceleratorType.NVIDIA_TESLA_V100, 2)
    TRAIN_VERSION = "pytorch-gpu.1-7"
    TRAIN_IMAGE = "us-docker.pkg.dev/vertex-ai/training/{}:latest".format(TRAIN_VERSION)

    print("Training:", TRAIN_IMAGE, TRAIN_GPU, TRAIN_NGPU)

    MACHINE_TYPE = "n1-standard"

    VCPU = "16"
    TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
    print("Train machine type", TRAIN_COMPUTE)

    TRAIN_NCOMPUTE_MASTER = 1
    TRAIN_NCOMPUTE_WORKER = 2


    from datetime import datetime

    TIMESTAMP = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    print (TIMESTAMP)

    JOB_NAME = "cifar10_resnet_custom_job_" + TIMESTAMP

    ARGS = [
        "--dist-url=" + "env://",
        "--multiprocessing-distributed",
        "--num_epochs=2"
    ]

    base_output_dir = '{}/jobs/{}'.format(BUCKET_NAME, JOB_NAME)

    job = aiplatform.CustomTrainingJob(
        display_name=JOB_NAME,
        script_path=input_file_path,
        container_uri=TRAIN_IMAGE,
        staging_bucket=base_output_dir,
    #    requirements=["tensorflow_datasets==1.3.0"],
    #    model_serving_container_image_uri=DEPLOY_IMAGE,
    )

    MODEL_DISPLAY_NAME = "cifar10-pytorch-" + TIMESTAMP
    print(MODEL_DISPLAY_NAME)
    
    # Start the training
    if TRAIN_GPU:
        model = job.run(
            #model_display_name=MODEL_DISPLAY_NAME,
            args=ARGS,
            replica_count=TRAIN_NCOMPUTE_WORKER + TRAIN_NCOMPUTE_MASTER,
            machine_type=TRAIN_COMPUTE,
            accelerator_type=TRAIN_GPU.name,
            accelerator_count=TRAIN_NGPU,
        )
    else:
        model = job.run(
            model_display_name=MODEL_DISPLAY_NAME,
            args=ARGS,
            replica_count=1,
            machine_type=TRAIN_COMPUTE,
            accelerator_count=0,
        )

In [35]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [36]:
@kfp.dsl.pipeline(name="download-file" + str(uuid.uuid4()))
def pipeline(baseline_accuracy: float = 70.0):
    download_file_task = download_file('test-pkl','task.py')
    train_task = train(download_file_task.output)

In [37]:
from kfp.v2 import compiler

compiler.Compiler().compile(pipeline_func=pipeline, 
                            package_path="dag-"+TIMESTAMP+".json")

In [38]:
from kfp.v2.google.client import AIPlatformClient

api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)



In [39]:
response = api_client.create_run_from_job_spec(
    "dag-"+TIMESTAMP+".json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"baseline_accuracy": 80.0},
)