Before you begin the fine-tune job, please make sure you have the Vertex AI Custom Training GPU quota in the GCP region, find out more information related to Vertex AI Custom Training GPU quota in https://cloud.google.com/vertex-ai/docs/quotas.

All the fine-tune job step by step guide is suggested to execute in the Vertex AI Workbench Instance, please reference the guide https://cloud.google.com/vertex-ai/docs/workbench/managed/create-managed-notebooks-instance-console-quickstart to create a Vertex AI Workbench Instance.

1. Init scirpt parameters

In [None]:
# Set SingleNode Training or MultiNode Training
MULTI_NODE = True # @param {type:"boolean"}

In [None]:
# Input your project id 
PROJECT_ID = ""  # @param {type:"string"}

In [None]:
# Input your GCP resource region
REGION = ""  # @param {type: "string"}

In [None]:
# Input your cloud bucket URI, example: gcs://<bucketname>
BUCKET_URI = ""  # @param {type:"string"}

In [None]:
# Input your cloud artifact repository name
REPO_NAME = "llmfinetune"  # @param {type:"string"}

In [None]:
# Input your custom training image uri
IMAGE_URI = REGION + "-docker.pkg.dev/" + PROJECT_ID + f"/{REPO_NAME}/11.8.0-runtime-ubuntu20.04" # @param {type:"string"}

In [None]:
# Input Vertex AI tensorboard display name
TENSORBOARD_DISPLAY_NAME = "llama2_tensorboard" # @param {type:"string"}

In [None]:
# Input service account, please create the service acccount before the training
SVC_ACCOUNT = "" # @param {type:"string"}

In [None]:
# Set the JOB execution mode, when set to True, the training job will be submitted with pending state, and you can login to the runtime to execute the training job manually
DEBUG = False # @param {type:"boolean"}

2. Init required GCP service API

In [None]:
import google.auth
import googleapiclient.discovery

def enable_gcp_service_apis(service_id: str):
    # Imports the Google Cloud client library
    import googleapiclient.discovery
    from oauth2client.client import GoogleCredentials

    credentials = GoogleCredentials.get_application_default()

    # Create a ServiceUsage client
    service_usage_client = googleapiclient.discovery.build("serviceusage", "v1", credentials=credentials)

    # The identifier of the service to be enabled
    # service_id = "aiplatform.googleapis.com"  # TODO: Update placeholder value.

    # Enables a service
    service_usage_client.services().enable(
            name=f"projects/{PROJECT_ID}/services/{service_id}"
        ).execute()

In [None]:
required_service_apis = ["aiplatform.googleapis.com", "artifactregistry.googleapis.com", "storage-component.googleapis.com"]

for service_id in required_service_apis:
    enable_gcp_service_apis(service_id)

3. Create Service Account(Skip this section if you has existing service account)

In [None]:
service_account_name = "llama2-finetune"

In [None]:
import os

from google.oauth2 import service_account  # type: ignore
import googleapiclient.discovery  # type: ignore

def create_service_account(project_id: str, name: str, display_name: str) -> str:
    """Creates a service account."""

    # credentials = service_account.Credentials.from_service_account_file(
    #     filename=os.environ["GOOGLE_APPLICATION_CREDENTIALS"],
    #     scopes=["https://www.googleapis.com/auth/cloud-platform"],
    # )

    service = googleapiclient.discovery.build("iam", "v1") #, credentials=credentials)

    my_service_account = (
        service.projects()
        .serviceAccounts()
        .create(
            name="projects/" + project_id,
            body={"accountId": name, "serviceAccount": {"displayName": display_name}},
        )
        .execute()
    )

    print("Created service account: " + my_service_account["email"])
    return my_service_account["email"]

In [None]:
SVC_ACCOUNT = create_service_account(project_id=PROJECT_ID, name=service_account_name, display_name=service_account_name)

In [None]:
import google.auth
import googleapiclient.discovery

def initialize_service() -> dict:
    """Initializes a Cloud Resource Manager service."""

    credentials, _ = google.auth.default(
        scopes=["https://www.googleapis.com/auth/cloud-platform"]
    )
    crm_service = googleapiclient.discovery.build(
        "cloudresourcemanager", "v1", credentials=credentials
    )
    return crm_service


def modify_policy_add_role(
    crm_service: str, project_id: str, role: str, member: str
) -> None:
    """Adds a new role binding to a policy."""

    policy = get_policy(crm_service, project_id)

    binding = None
    for b in policy["bindings"]:
        if b["role"] == role:
            binding = b
            break
    if binding is not None:
        binding["members"].append(member)
    else:
        binding = {"role": role, "members": [member]}
        policy["bindings"].append(binding)

    set_policy(crm_service, project_id, policy)


def modify_policy_remove_member(
    crm_service: str, project_id: str, role: str, member: str
) -> None:
    """Removes a  member from a role binding."""

    policy = get_policy(crm_service, project_id)

    binding = next(b for b in policy["bindings"] if b["role"] == role)
    if "members" in binding and member in binding["members"]:
        binding["members"].remove(member)

    set_policy(crm_service, project_id, policy)


def get_policy(crm_service: str, project_id: str, version: int = 3) -> dict:
    """Gets IAM policy for a project."""

    policy = (
        crm_service.projects()
        .getIamPolicy(
            resource=project_id,
            body={"options": {"requestedPolicyVersion": version}},
        )
        .execute()
    )
    return policy


def set_policy(crm_service: str, project_id: str, policy: str) -> dict:
    """Sets IAM policy for a project."""

    policy = (
        crm_service.projects()
        .setIamPolicy(resource=project_id, body={"policy": policy})
        .execute()
    )
    return policy


In [None]:
# Role to be granted. For quick start, this notebook leverage an coarse access control policy, in production env we suggest you use the fine-gained access control policy.
role = "roles/owner"

# service account to be granted Role
member = f"serviceAccount:{SVC_ACCOUNT}"

# Initializes service.
crm_service = initialize_service()

# Grants your member the 'Log Writer' role for the project.
modify_policy_add_role(crm_service, PROJECT_ID, role, member)

# Gets the project's policy and prints all members with the 'Log Writer' role.
policy = get_policy(crm_service, PROJECT_ID)
binding = next(b for b in policy["bindings"] if b["role"] == role)
print(f'Role: {(binding["role"])}')
print("Members: ")
for m in binding["members"]:
    print(f"[{m}]")

4. Create Cloud Bucket(Skip this section if you has existing Cloud Bucket)

In [None]:
bucket_name = f"llama2-finetune-bucket-{PROJECT_ID}"

In [None]:
from google.cloud import storage

def create_bucket_class_location(bucket_name):
    """
    Create a new bucket in the US region with the coldline storage
    class
    """
    # bucket_name = "your-new-bucket-name"

    storage_client = storage.Client()

    bucket = storage_client.bucket(bucket_name)
    bucket.storage_class = "STANDARD"
    new_bucket = storage_client.create_bucket(bucket, location=REGION)

    print(
        "Created bucket {} in {} with storage class {}".format(
            new_bucket.name, new_bucket.location, new_bucket.storage_class
        )
    )
    return new_bucket.name

In [None]:
BUCKET_URI = "gs://" + create_bucket_class_location(bucket_name=bucket_name)

5. Create Vertex AI tensorboard instance

In [None]:
# Create tensorboard instance
from google.cloud import aiplatform

def create_tensorboard_sample(
    project: str,
    display_name: str,
    location: str,
):
    aiplatform.init(project=project, location=location)

    tensorboard = aiplatform.Tensorboard.create(
        display_name=display_name,
        project=project,
        location=location,
    )

    print(tensorboard.resource_name)
    return tensorboard.resource_name

In [None]:
# input Vertex AI tensorboard instance id, example projects/<project_number>/locations/<region>/tensorboards/<tensorboard_instance_id>
# TENSORBOARD= <TENSORBOARD_INSTANCE_ID> # @param {type:"string"}
TENSORBOARD = create_tensorboard_sample(project=PROJECT_ID, display_name=TENSORBOARD_DISPLAY_NAME, location=REGION)

6. Build custom training image

In [None]:
# create google cloud docker image artifact registry
from google.cloud import artifactregistry_v1
from google.cloud.artifactregistry_v1.types import Repository

def create_docker_image_repository(repo_name: str, project_id: str, location: str):
    # Create a client
    client = artifactregistry_v1.ArtifactRegistryClient()

    # Initialize Repository
    name = f"projects/{project_id}/locations/{location}/repositories/{repo_name}"
    repository = Repository(name=name, format_=Repository.Format.DOCKER)

    # Initialize request argument(s)
    parent_value = f"projects/{project_id}/locations/{location}"
    request = artifactregistry_v1.CreateRepositoryRequest(
        parent=parent_value,
        repository_id=repo_name,
        repository=repository,
    )

    # Make the request
    operation = client.create_repository(request=request)

    print("Waiting for operation to complete...")

    response = operation.result()

    # Handle the response
    print(response)

In [None]:
# create the docker image repository
create_docker_image_repository(repo_name=REPO_NAME, project_id=PROJECT_ID, location=REGION)

In [None]:
# execute below command in terminal with the working directory ./custom_container
!echo "Y" | gcloud auth configure-docker $REGION-docker.pkg.dev 
!docker build -t $IMAGE_URI .
!docker push $IMAGE_URI

7. Create Vertex AI custome training JOB

In [None]:
# Input cluster spec parameters
HEAD_NODE_ACCELERATOR_COUNT = 1
HEAD_NODE_COUNT = 1

# If multinode training, set worker node spec parameters
WORKER_NODE_ACCELERATOR_COUNT = 1
WORKER_NODE_COUNT = 1

In [None]:
# Setup python package dependacy
from datetime import datetime
from google.cloud import aiplatform

# init aiplaform sdk
aiplatform.init(project=PROJECT_ID, staging_bucket=BUCKET_URI, location=REGION)

In [None]:
# Execute this cell to launch an SingleNode training
if DEBUG == True:
    command = []
else:
    command = ["python", "train_launch.py"] # Tobe update

if MULTI_NODE == False:
    worker_pool_specs = [
        # `WorkerPoolSpec` for worker pool 0, primary replica, required  
        {
            "machine_spec": {
                "machine_type": "a2-highgpu-1g", # "a2-highgpu-1g","g2-standard-12","n1-standard-4"
                "accelerator_type": "NVIDIA_TESLA_A100", #"NVIDIA_L4",
                "accelerator_count": HEAD_NODE_ACCELERATOR_COUNT,       
            },
            "replica_count": HEAD_NODE_COUNT,
            "container_spec": {
                "image_uri": IMAGE_URI,
                "command": command,
                "args": [],
                "env": [],                
            },
            "disk_spec": {
                "boot_disk_size_gb": 1000,            
            }
        },
    ]

    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    JOB_NAME  = "llama2finetunecustomjob " + TIMESTAMP

    my_job = aiplatform.CustomJob(
        display_name=JOB_NAME,    
        worker_pool_specs=worker_pool_specs,
        base_output_dir=BUCKET_URI,
    )

In [None]:
# Execute this cell to launch an MultiNode training
if DEBUG == True:
    command = []
else:
    command = ["python", "train_launch.py"] # Tobe update

if MULTI_NODE == True:
    worker_pool_specs = [
        # `WorkerPoolSpec` for worker pool 0, primary replica, required  
        {
            "machine_spec": {
                "machine_type": "a2-highgpu-1g", # "a2-highgpu-1g","g2-standard-12","n1-standard-4"
                "accelerator_type": "NVIDIA_TESLA_A100", #"NVIDIA_L4",
                "accelerator_count": HEAD_NODE_ACCELERATOR_COUNT,       
            },
            "replica_count": HEAD_NODE_COUNT,
            "container_spec": {
                "image_uri": IMAGE_URI,
                "command": command,
                "args": [],
                "env": [],                
            },
        },
        {
        "machine_spec": {
                "machine_type": "a2-highgpu-1g", # "a2-highgpu-1g","g2-standard-12","n1-standard-4"
                "accelerator_type": "NVIDIA_TESLA_A100", #"NVIDIA_L4",
                "accelerator_count": WORKER_NODE_ACCELERATOR_COUNT,          
        },
        "replica_count": WORKER_NODE_COUNT,        
        "container_spec": {
            "image_uri": IMAGE_URI,
            "command": command,
                "args": [],
                "env": [],
        },        
        "disk_spec": {
                "boot_disk_size_gb": 1000,            
        }        
        },
    ]

    TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
    JOB_NAME  = "llama2finetunecustomjob " + TIMESTAMP

    my_job = aiplatform.CustomJob(
        display_name=JOB_NAME,    
        worker_pool_specs=worker_pool_specs,
        base_output_dir=BUCKET_URI,
    )

In [None]:
# Submit Vertex AI custom trainig JOB
my_job.submit(    
    enable_web_access=True, # For debugging
    service_account=SVC_ACCOUNT,
    tensorboard=TENSORBOARD,
)