In [1]:
import datetime
# google_cloud_pipeline_components includes pre-built KFP components for interfacing with Vertex AI services.
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google.cloud import aiplatform as vertexai
from kfp.v2 import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import component

In [2]:
PROJECT_ID = "fast-planet-243821"
REGION = "us-central1"
GCS_BUCKET = f"gs://{PROJECT_ID}_pipeline-artifacts"
TIMESTAMP=datetime.datetime.now().strftime('%Y%m%d%H%M%S')
DISPLAY_NAME = "cifar-{}".format(TIMESTAMP)
MODEL_DIR = "image-estimator"
MODEL_DISPLAY_NAME = "image-estimator-{}".format(TIMESTAMP)
GCS_BASE_OUTPUT_DIR= f"{GCS_BUCKET}/{MODEL_DIR}-{TIMESTAMP}"

In [3]:
@component(
    packages_to_install=["tensorflow","tensorflow_datasets"]
)
def train_cifar_model(output_model_uri: str) -> str:
    ##import subprocess
    ## Install dependencies from requirements.txt
    #subprocess.run(["pip", "install", "-r", "requirements.txt"])
    
    import tensorflow_datasets as tfds
    import tensorflow as tf
    print(f"input: {output_model_uri}")
    # Your training code here
    # Use the provided notebook code or replace with your custom training logic
    BUFFER_SIZE = 10000
    BATCH_SIZE = 64

    model = tf.keras.Sequential([
        tf.keras.layers.Conv2D(32, 3, activation='relu', input_shape=(32, 32, 3)),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Conv2D(16, 3, activation='relu'),
        tf.keras.layers.MaxPooling2D(),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(
        loss=tf.keras.losses.sparse_categorical_crossentropy,
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.01),
        metrics=['accuracy'])

    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(output_model_uri + '/checkpoint',
                                                             save_freq='epoch')

    def scale(image, label):
        image = tf.cast(image, tf.float32)
        image /= 255.0
        return image, label

    datasets, info = tfds.load(name='cifar10', with_info=True, as_supervised=True)
    train_dataset = datasets['train'].map(scale).shuffle(BUFFER_SIZE).repeat().batch(BATCH_SIZE)

    model.fit(x=train_dataset, epochs=10, steps_per_epoch=200,
              callbacks=[checkpoint_callback])
    model.save(output_model_uri)
    return output_model_uri

In [4]:
@component(
    packages_to_install=["google-cloud-aiplatform"]
)
def upload_model_to_vertex(model_uri: str, project: str, region: str, model_display_name: str) -> str:
    # Upload the trained model to Vertex AI
    from google.cloud import aiplatform
    
    aiplatform.init(project=project, location=region)
    model = aiplatform.Model.upload(
        display_name=model_display_name,
        artifact_uri=model_uri,
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-2:latest",
    )

    print(f"Model uploaded to Vertex AI: {model.resource_name}")
    return model.resource_name

In [5]:
@component(
    packages_to_install=["google-cloud-aiplatform"]
)
def deploy_model_to_endpoint(model_uri: str, project: str, region: str, endpoint_display_name: str) -> str:
    # Deploy the model to a Vertex AI endpoint
    from google.cloud import aiplatform
    
    aiplatform.init(project=project, location=region)
    model = aiplatform.Model(model_name=model_uri, project=project)
    endpoint = model.deploy()
    
    print(f"Model deployed to Vertex AI endpoint: {endpoint.resource_name}")
    return endpoint.resource_name

In [6]:
# Define the pipeline
@dsl.pipeline(
    name="cifar-model-training",
    description="A pipeline for training a CIFAR model and deploying it to Vertex AI.",
)
def cifar_pipeline(
    output_model_uri: str,
    project: str,
    region: str,
    model_display_name: str,
    endpoint_display_name: str,
    requirements_file_path:str = 'requirements.txt',
):
    print(f"input: {output_model_uri}")
    train_task = train_cifar_model(output_model_uri)
    upload_task = upload_model_to_vertex(train_task.output, project, region, model_display_name)
    deploy_task = deploy_model_to_endpoint(upload_task.output, project, region, endpoint_display_name)

# Compile the pipeline
compiler.Compiler().compile(
    pipeline_func=cifar_pipeline,
    package_path='cifar_pipeline_job.json'
)

input: {{pipelineparam:op=;name=output_model_uri}}




In [7]:
# Initialize Vertex AI client
vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=GCS_BUCKET)

In [8]:
# Specify pipeline parameters
pipeline_params = {
    "output_model_uri": GCS_BASE_OUTPUT_DIR,
    "project": PROJECT_ID,
    "region": REGION,
    "model_display_name": MODEL_DISPLAY_NAME,
    "endpoint_display_name": MODEL_DISPLAY_NAME,
}

In [9]:
# Create a pipeline run
job = vertexai.PipelineJob(
    display_name="cifar-pipeline-job",
    template_path=f"cifar_pipeline_job.json",
    parameter_values=pipeline_params,
)
# Run the pipeline and wait for completion
job.run(sync=True)

Creating PipelineJob
PipelineJob created. Resource name: projects/515007598211/locations/us-central1/pipelineJobs/cifar-model-training-20231208215153
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/515007598211/locations/us-central1/pipelineJobs/cifar-model-training-20231208215153')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/cifar-model-training-20231208215153?project=515007598211
PipelineJob projects/515007598211/locations/us-central1/pipelineJobs/cifar-model-training-20231208215153 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/515007598211/locations/us-central1/pipelineJobs/cifar-model-training-20231208215153 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/515007598211/locations/us-central1/pipelineJobs/cifar-model-training-20231208215153 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/515007598211/locatio