In [1]:
from typing import NamedTuple
from google.cloud import aiplatform

import kfp
from kfp import compiler, dsl
from kfp.dsl import (
    Artifact, Dataset, Input, InputPath,
    Model, Output, OutputPath, component,
    ParallelFor
)

from google_cloud_pipeline_components.v1.custom_job.utils import create_custom_training_job_op_from_component
import components

In [2]:
BUCKET_URI = f"gs://protocell"
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root"
LOCATION = "europe-central2"
PROJECT = "protocell-404013"
DATA = f"{BUCKET_URI}/data"

In [3]:
# https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.Tensorboard
aiplatform.init(location=LOCATION, project=PROJECT, staging_bucket=BUCKET_URI)
tensorboard = aiplatform.Tensorboard(location=LOCATION, project=PROJECT, tensorboard_name = "2373397003624251392")

In [4]:
@dsl.pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="classification",
)
def pipeline(dataset: str = f"{DATA}/classification.npz", epochs: int = 10, foo: Input[Dataset] = None):
    
    importer = dsl.importer(
        artifact_uri=dataset,
        artifact_class=Dataset,
        reimport=False,
    )
    
    data = components.split_data(ratio=0.1, dataset=importer.output)
    
    train_classifier_op = create_custom_training_job_op_from_component(
        component_spec = components.train_classifier,
        display_name = "train_classfier_display_name",
        tensorboard = tensorboard.resource_name,
        base_output_directory = PIPELINE_ROOT,
        service_account = "429426973958-compute@developer.gserviceaccount.com"
    )
    
    train_classifier = train_classifier_op(epochs=epochs, dataset=data.outputs["train"], location = LOCATION)
    

    components.visualize(model=train_classifier.outputs["classifier"])
    

## Build the pipeline

In [5]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="classifier.yaml"
)

In [6]:
from kfp.registry import RegistryClient
repo = f"https://{LOCATION}-kfp.pkg.dev/{PROJECT}/kubeflows"
client = RegistryClient(host=repo)

In [7]:
templateName, versionName = client.upload_pipeline(
  file_name="classifier.yaml",
  tags=["latest"],
  extra_headers={"description":"testing"}
)

In [8]:
aiplatform.PipelineJob(
    display_name = "PipelineJob_display_name",
    template_path = "classifier.yaml",
    pipeline_root = PIPELINE_ROOT,
    enable_caching=False,
).run()

Creating PipelineJob
PipelineJob created. Resource name: projects/429426973958/locations/europe-central2/pipelineJobs/classification-20231231013255
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/429426973958/locations/europe-central2/pipelineJobs/classification-20231231013255')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/europe-central2/pipelines/runs/classification-20231231013255?project=429426973958
PipelineJob projects/429426973958/locations/europe-central2/pipelineJobs/classification-20231231013255 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/429426973958/locations/europe-central2/pipelineJobs/classification-20231231013255 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/429426973958/locations/europe-central2/pipelineJobs/classification-20231231013255 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/429426973958/locations/europe-ce