### Running training on VertexAI pipeline

In [10]:
# ! pip3 install --upgrade --quiet google-cloud-aiplatform \
#                                  google-cloud-storage \
#                                  google-cloud-pipeline-components \
#                                  kfp

In [2]:
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

In [3]:
PROJECT_ID = "371403503716"  # @param {type:"string"}

# Set the project id
! gcloud config set project {PROJECT_ID} --quiet
REGION = "us-central1"  # @param {type: "string"}

Updated property [core/project].


To take a quick anonymous survey, run:
  $ gcloud survey



In [4]:
BUCKET_URI = "gs://sdofm-vertexai"  # @param {type:"string"}
# ! gsutil mb -l {REGION} -p {PROJECT_ID} {BUCKET_URI} # ONLY RUN IF MAKING NEW BUCKET

In [5]:
import sys

SERVICE_ACCOUNT = "[your-service-account]"  # @param {type:"string"}
IS_COLAB = "google.colab" in sys.modules
if (
    SERVICE_ACCOUNT == ""
    or SERVICE_ACCOUNT is None
    or SERVICE_ACCOUNT == "[your-service-account]"
):
    # Get your service account from gcloud
    if not IS_COLAB:
        shell_output = !gcloud auth list 2>/dev/null
        SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()

    else:  # IS_COLAB:
        shell_output = ! gcloud projects describe  $PROJECT_ID
        project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
        SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"

    print("Service Account:", SERVICE_ACCOUNT)

Service Account: 371403503716-compute@developer.gserviceaccount.com


In [12]:
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectCreator $BUCKET_URI
# ! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/storage.objectViewer $BUCKET_URI

In [8]:
import os
from typing import Any, Dict, List

import google.cloud.aiplatform as aip
import kfp
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job.component import (
    custom_training_job as CustomTrainingJobOp,
)
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp, ModelDeployOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from kfp import compiler
from kfp.dsl import importer_node

In [12]:
# import google.cloud.vertex as vai
import vertexai as vai

In [13]:
PIPELINE_ROOT = f"{BUCKET_URI}/pipeline_root/tpu_sdofm_pipeline"

In [15]:
from google.cloud.aiplatform import gapic

[print(i) for i in gapic.AcceleratorType]

AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED
AcceleratorType.NVIDIA_TESLA_K80
AcceleratorType.NVIDIA_TESLA_P100
AcceleratorType.NVIDIA_TESLA_V100
AcceleratorType.NVIDIA_TESLA_P4
AcceleratorType.NVIDIA_TESLA_T4
AcceleratorType.NVIDIA_TESLA_A100
AcceleratorType.NVIDIA_A100_80GB
AcceleratorType.NVIDIA_L4
AcceleratorType.NVIDIA_H100_80GB
AcceleratorType.TPU_V2
AcceleratorType.TPU_V3
AcceleratorType.TPU_V4_POD
AcceleratorType.TPU_V5_LITEPOD


[None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None,
 None]

In [16]:
TRAIN_TPU, TRAIN_NTPU = (
    gapic.AcceleratorType.TPU_V5_LITEPOD,
    # 'v5litepod-4',
    4,
)  # Using TPU V5 litepod

DEPLOY_GPU, DEPLOY_NGPU = (gapic.AcceleratorType.NVIDIA_TESLA_K80, 1)

In [17]:
if not TRAIN_NTPU or TRAIN_NTPU < 2:
    TRAIN_STRATEGY = "single"
else:
    TRAIN_STRATEGY = "tpu"

TRAIN_TPU_TOPOLOGY = "4x8"

In [18]:
WORKING_DIR = f"{PIPELINE_ROOT}/model"
CONTAINER_ARTIFACTS_DIR = "tpu-container-artifacts"

# !mkdir {CONTAINER_ARTIFACTS_DIR}

### Create image

In [9]:
# TRAIN_VERSION = "pytorch-tpu.2-1.cp310"
# TRAIN_IMAGE = f"us-docker.pkg.dev/vertex-ai/training/{DEPLOY_VERSION}:latest"
# dockerfile = f"""FROM {TRAIN_IMAGE}

# WORKDIR /home/walsh/repos/SDO-FM
# COPY . /src
# RUN pip install -r src/requirements.txt
# ENTRYPOINT ["python3", "/src/scripts/main.py"]
# """

# with open(os.path.join(CONTAINER_ARTIFACTS_DIR, "Dockerfile"), "w") as f:
#     f.write(dockerfile)

In [19]:
!gcloud auth configure-docker us-central1-docker.pkg.dev --quiet


{
  "credHelpers": {
    "gcr.io": "gcloud",
    "us.gcr.io": "gcloud",
    "eu.gcr.io": "gcloud",
    "asia.gcr.io": "gcloud",
    "staging-k8s.gcr.io": "gcloud",
    "marketplace.gcr.io": "gcloud"
  }
}
Adding credentials for: us-central1-docker.pkg.dev
Docker configuration file updated.


In [20]:
REPOSITORY = "tpu-training-repository"
IMAGE = "tpu-train"
# !gcloud artifacts repositories create $REPOSITORY --repository-format=docker --location=us-central1 --description="Vertex TPU training repository" --project "sdo-fm-2024"

In [22]:
PROJECT_NAME = "sdo-fm-2024"
TRAIN_IMAGE = f"{REGION}-docker.pkg.dev/{PROJECT_NAME}/{REPOSITORY}/{IMAGE}:latest"
DEPLOY_IMAGE = TRAIN_IMAGE
TRAIN_IMAGE

'us-central1-docker.pkg.dev/sdo-fm-2024/tpu-training-repository/tpu-train:latest'

us-central1-docker.pkg.dev/sdo-fm-2024/sdofm-artifactregistry-docker/sdofm:v0.1.7

In [12]:
TRAIN_IMAGE
# us-central1-docker.pkg.dev/sdo-fm-2024/tpu-training-repository

'us-central1-docker.pkg.dev/371403503716/tpu-training-repository/tpu-train:latest'

In [44]:
%cd $CONTAINER_ARTIFACTS_DIR

/home/walsh/repos/SDO-FM/notebooks/tpu-container-artifacts


In [24]:
!docker build \
    --tag={TRAIN_IMAGE} \
    /home/walsh/SDO-FM

Sending build context to Docker daemon  87.48MB
Step 1/4 : FROM us-docker.pkg.dev/vertex-ai/training/pytorch-tpu.2-1.cp310:latest
 ---> 24ae74b067ec
Step 2/4 : COPY . /src
 ---> 695d49096ba0
Step 3/4 : RUN pip install -r /src/requirements.txt
 ---> Running in 9b6d858e6a3f
Collecting lightning[extra]
  Downloading lightning-2.3.3-py3-none-any.whl (808 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 808.5/808.5 KB 3.5 MB/s eta 0:00:00
Collecting timm
  Downloading timm-1.0.7-py3-none-any.whl (2.3 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2.3/2.3 MB 4.6 MB/s eta 0:00:00
Collecting wandb
  Downloading wandb-0.17.4-py3-none-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.9 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6.9/6.9 MB 3.9 MB/s eta 0:00:00
Collecting omegaconf
  Downloading omegaconf-2.3.0-py3-none-any.whl (79 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 79.5/79.5 KB 4.3 MB/s eta 0:00:00
Collecting hydra-core
  Downloading

In [25]:
# !docker push {TRAIN_IMAGE}
!docker push 'us-central1-docker.pkg.dev/sdo-fm-2024/tpu-training-repository/tpu-train'

Using default tag: latest
The push refers to repository [us-central1-docker.pkg.dev/sdo-fm-2024/tpu-training-repository/tpu-train]

[1B96748a4a: Preparing 
[1Bc9617923: Preparing 
[1B86dfddbb: Preparing 
[1B25554830: Preparing 
[1B782f05d5: Preparing 
[1Be3413336: Preparing 
[1B8a94167e: Preparing 
[1Bb93c8531: Preparing 
[1B5bceeda1: Preparing 
[1Bfaa3ab68: Preparing 
[1B9d99e7a9: Preparing 
[1B1f16d39c: Preparing 
[1Bda9cf345: Preparing 
[1B081bcd40: Preparing 
[1B55027fac: Preparing 
[1B6bb27888: Preparing 
[1Bf2c8e538: Preparing 
[1Bc4b29d5e: Preparing 
[1B227dffc9: Preparing 
[1Bcf6cd56f: Preparing 
[1Bccd89e31: Preparing 
[1B01a9bf0a: Preparing 
[1Bff998c0a: Preparing 
[24B6748a4a: Pushed    5.87GB/5.859GB[18A[2K[16A[2K[15A[2K[11A[2K[8A[2K[7A[2K[3A[2K[23A[2K[23A[2K[23A[2K[23A[2K[23A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[23A[2K[24A[2K[24A

In [None]:
%cd ..

### Custom model pipeline

In [26]:
TRAINER_ARGS = ["--config-name=pretrain_32.2M_mae_HP_r512_e128_p16"]

In [27]:
MACHINE_TYPE = "ct5lp-hightpu-4t"

# TPU VMs do not require VCPU definition
TRAIN_COMPUTE = MACHINE_TYPE
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type ct5lp-hightpu-4t
Deploy machine type n1-standard-4


In [28]:
# create working dir to pass to job spec
WORKING_DIR = f"{PIPELINE_ROOT}/model"

MODEL_DISPLAY_NAME = "tpu_train_deploy"
print(TRAINER_ARGS, WORKING_DIR, MODEL_DISPLAY_NAME)

['--config-name=pretrain_32.2M_mae_HP_r512_e128_p16'] gs://sdofm-vertexai/pipeline_root/tpu_sdofm_pipeline/model tpu_train_deploy


In [29]:
WORKER_POOL_SPECS = [
    {
        "containerSpec": {
            "args": TRAINER_ARGS,
            "env": [{"name": "AIP_MODEL_DIR", "value": WORKING_DIR}],
            "imageUri": TRAIN_IMAGE,
        },
        "replicaCount": "1",
        "machineSpec": {
            "machineType": TRAIN_COMPUTE,
            # "accelerator_type": TRAIN_TPU,
            # "accelerator_count": TRAIN_NTPU,
            "tpuTopology": TRAIN_TPU_TOPOLOGY,
        },
        "nfs_mounts": [
            {"path": "/sdoml", "server": "10.14.32.2", "mount_point": "/mnt/sdoml"}
        ],
    }
]
WORKER_POOL_SPECS

[{'containerSpec': {'args': ['--config-name=pretrain_32.2M_mae_HP_r512_e128_p16'],
   'env': [{'name': 'AIP_MODEL_DIR',
     'value': 'gs://sdofm-vertexai/pipeline_root/tpu_sdofm_pipeline/model'}],
   'imageUri': 'us-central1-docker.pkg.dev/sdo-fm-2024/tpu-training-repository/tpu-train:latest'},
  'replicaCount': '1',
  'machineSpec': {'machineType': 'ct5lp-hightpu-4t', 'tpuTopology': '4x8'},
  'nfs_mounts': [{'path': '/sdoml',
    'server': '10.14.32.2',
    'mount_point': '/mnt/sdoml'}]}]

#### Define pipeline

In [30]:
@kfp.dsl.pipeline(name="train-endpoint-deploy")
def pipeline(
    project: str = PROJECT_ID,
    model_display_name: str = MODEL_DISPLAY_NAME,
    serving_container_image_uri: str = DEPLOY_IMAGE,
):

    custom_job_task = CustomTrainingJobOp(
        display_name="tpu model training",
        worker_pool_specs=WORKER_POOL_SPECS,
        network="projects/371403503716/global/networks/default",
    )

    import_unmanaged_model_task = importer_node.importer(
        artifact_uri=WORKING_DIR,
        artifact_class=artifact_types.UnmanagedContainerModel,
        metadata={
            "containerSpec": {
                "imageUri": serving_container_image_uri  # "us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest",
            },
        },
    ).after(custom_job_task)

    model_upload_op = ModelUploadOp(
        project=project,
        display_name=model_display_name,
        unmanaged_container_model=import_unmanaged_model_task.outputs["artifact"],
    )

    endpoint_create_op = EndpointCreateOp(
        project=project,
        display_name="tpu-pipeline-created-endpoint",
    )

    _ = ModelDeployOp(
        endpoint=endpoint_create_op.outputs["endpoint"],
        model=model_upload_op.outputs["model"],
        deployed_model_display_name=model_display_name,
        dedicated_resources_machine_type=DEPLOY_COMPUTE,
        dedicated_resources_min_replica_count=1,
        dedicated_resources_max_replica_count=1,
        dedicated_resources_accelerator_type=DEPLOY_GPU.name,
        dedicated_resources_accelerator_count=DEPLOY_NGPU,
    )

In [31]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="tpu_train_sdofm_pipeline.json",
)

In [32]:
DISPLAY_NAME = "tpu_sdofm_training_pretrain_32.2M_mae_HP_r512_e128_p16"

job = aip.PipelineJob(
    display_name=DISPLAY_NAME,
    template_path="tpu_train_sdofm_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
)

job.run()

Creating PipelineJob
PipelineJob created. Resource name: projects/371403503716/locations/us-central1/pipelineJobs/train-endpoint-deploy-20240719141431
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/371403503716/locations/us-central1/pipelineJobs/train-endpoint-deploy-20240719141431')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/train-endpoint-deploy-20240719141431?project=371403503716
PipelineJob projects/371403503716/locations/us-central1/pipelineJobs/train-endpoint-deploy-20240719141431 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/371403503716/locations/us-central1/pipelineJobs/train-endpoint-deploy-20240719141431 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/371403503716/locations/us-central1/pipelineJobs/train-endpoint-deploy-20240719141431 current state:
PipelineState.PIPELINE_STATE_RUNNING
PipelineJob projects/371403503716/l

RuntimeError: Job failed with:
code: 9
message: " The DAG failed because some tasks failed. The failed tasks are: [custom-training-job].; Job (project_id = sdo-fm-2024, job_id = 5016340657187323904) is failed due to the above error.; Failed to handle the job: {project_number = 371403503716, job_id = 5016340657187323904}"


In [86]:
SERVICE_ACCOUNT

'371403503716-compute@developer.gserviceaccount.com'

In [95]:
! gcloud projects add-iam-policy-binding 371403503716 \
 --member='serviceAccount:service-371403503716@compute-system.iam.gserviceaccount.com' \
 --role='roles/google.cloud.aiplatform.v1.PipelineService.CreatePipelineJob'

ERROR: Policy modification failed. For a binding with condition, run "gcloud alpha iam policies lint-condition" to identify issues in condition.
[1;31mERROR:[0m (gcloud.projects.add-iam-policy-binding) INVALID_ARGUMENT: Role roles/google.cloud.aiplatform.v1.PipelineService.CreatePipelineJob is not supported for this resource.


In [97]:
! gsutil iam ch serviceAccount:{SERVICE_ACCOUNT}:roles/google.cloud.aiplatform.v1.PipelineService.CreatePipelineJob