The goal of this notebook is to setup a custom Vertex Pipeline to upload and deploy Mistral Models to Vertex Endpoints. 
By using Vertex Pipelines and kubeflow we allow for MLOps best practices: automate, monitor, and govern OSS model deployment. The current approach is to do things through a Colab notebook which do not provide audit logs. This blocks many customer with strict security protocols from using the current workflow in production systems.

Note: the following is strictly demo code and meant to be an example of how to use custom Vertex Pipelines to deploy Mistral V2 Instruct
Author: @lukasgeiger

In [2]:
!pip3 install google-cloud-aiplatform "shapely<2" --upgrade
!pip3 install kfp google-cloud-pipeline-components

Collecting google-cloud-aiplatform
  Downloading google_cloud_aiplatform-1.57.0-py2.py3-none-any.whl.metadata (31 kB)
Downloading google_cloud_aiplatform-1.57.0-py2.py3-none-any.whl (5.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.1/5.1 MB[0m [31m18.8 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: google-cloud-aiplatform
  Attempting uninstall: google-cloud-aiplatform
    Found existing installation: google-cloud-aiplatform 1.56.0
    Uninstalling google-cloud-aiplatform-1.56.0:
      Successfully uninstalled google-cloud-aiplatform-1.56.0
Successfully installed google-cloud-aiplatform-1.57.0


In [3]:
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

In [1]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
!python3 -c "from google.cloud import aiplatform; print('AIPlatform SDK version: {}'.format(aiplatform.__version__))"

KFP SDK version: 2.4.0
google_cloud_pipeline_components version: 2.8.0
AIPlatform SDK version: 1.57.0


In [2]:
import kfp

from kfp import compiler, dsl
from kfp.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import v1
from google_cloud_pipeline_components.v1.endpoint import (EndpointCreateOp)

In [3]:
import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Project ID:  vertexpipelines-426320


In [4]:
if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

In [5]:
BUCKET_NAME = f"gs://{PROJECT_ID}-mistraldeploy-bucket"
!gsutil mb -l us-central1 {BUCKET_NAME}

Creating gs://vertexpipelines-426320-mistraldeploy-bucket/...
ServiceException: 409 A Cloud Storage bucket named 'vertexpipelines-426320-mistraldeploy-bucket' already exists. Try another name. Bucket names must be globally unique across all Google Cloud projects, including those outside of your organization.


In [6]:
SERVICE_ACCOUNT = None
shell_output = ! gcloud projects describe $PROJECT_ID
project_number = shell_output[-1].split(":")[1].strip().replace("'", "")
SERVICE_ACCOUNT = f"{project_number}-compute@developer.gserviceaccount.com"
print("Using this default Service Account:", SERVICE_ACCOUNT)

Using this default Service Account: 336668109844-compute@developer.gserviceaccount.com


In [7]:
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

'gs://vertexpipelines-426320-mistraldeploy-bucket/pipeline_root/'

In [8]:
@component(base_image="python:3.10",)
def upload_model_vllm(model_display_name: str,
                      machine_type: str,
                      accelerator_type: str,
                      accelerator_count: int,
                      max_model_len: int,
                      gpu_memory_utilization: float,
                     ) -> str:

    # import/pip statements required because each component spins up a seperate container

    import subprocess
    subprocess.check_call(["pip", "install", "google-cloud-aiplatform==1.56.0","kfp==2.4.0","google-cloud-pipeline-components==2.8.0"])
    from google.cloud import aiplatform
    import kfp
    import google_cloud_pipeline_components

    aiplatform.init(project="vertexpipelines-426320", location="us-central1", staging_bucket="gs://vertexpipelines-426320-mistraldeploy-bucket/")

    VLLM_DOCKER_URI = "us-docker.pkg.dev/vertex-ai/vertex-vision-model-garden-dockers/pytorch-vllm-serve:20240313_0916_RC00"

    vllm_args = [
        "--host=0.0.0.0",
        "--port=7080",
        f"--model=gs://vertex-model-garden-public-us/mistralai/Mistral-7B-Instruct-v0.2",
        f"--tensor-parallel-size={accelerator_count}",
        "--swap-space=16",
        f"--dtype=bfloat16",
        f"--gpu-memory-utilization={gpu_memory_utilization}",
        f"--max-model-len={max_model_len}",
        "--disable-log-stats",
    ]

    serving_env = {
        "MODEL_ID": "mistralai/Mistral-7B-Instruct-v0.2",
        "DEPLOY_SOURCE": "notebook",
    }

    serving_container_predict_route = "/generate"

    model = aiplatform.Model.upload( # returns aiplatform.Model object - I only return the name of the model in this component
        display_name=model_display_name,
        serving_container_image_uri=VLLM_DOCKER_URI,
        serving_container_command=[
            "python",
            "-m",
            (
                "vllm.entrypoints.api_server"
            ),
        ],
        serving_container_args=vllm_args,
        serving_container_ports=[7080],
        serving_container_predict_route=serving_container_predict_route,
        serving_container_health_route="/health",
        serving_container_environment_variables=serving_env,
    )

    return model.name

In [9]:
@component(base_image="python:3.10",)
def deploy_vllm_model(model_name: str,
                      endpoint_display_name: str,
                      machine_type: str,
                      accelerator_type: str,
                      accelerator_count: int,
                      max_model_len: int,
                      gpu_memory_utilization: float,
                      service_account: str,
                      min_replica_count: int,
                      max_replica_count: int,
                     ):

    import subprocess
    subprocess.check_call(["pip", "install", "google-cloud-aiplatform==1.56.0","kfp==2.4.0","google-cloud-pipeline-components==2.8.0"])
    from google.cloud import aiplatform
    import kfp
    import google_cloud_pipeline_components

    aiplatform.init(project="vertexpipelines-426320", location="us-central1", staging_bucket="gs://vertexpipelines-426320-mistraldeploy-bucket/")

    model = aiplatform.Model(model_name = model_name)

    endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)

    model.deploy(
        endpoint=endpoint,
        machine_type=machine_type,
        accelerator_type=accelerator_type,
        accelerator_count=accelerator_count,
        deploy_request_timeout=1800,
        service_account=service_account,
        min_replica_count=min_replica_count,
        max_replica_count=max_replica_count,
    )
    return

In [10]:
@pipeline(name = "Deploy Mistral Vertex Endpoint",
          description = "Deploying Mistral to a Vertex Endpoint",
          pipeline_root = PIPELINE_ROOT)
def deploy_mistral_vertex_endpoint(model_display_name: str = "Mistral Model", # might want to rename this per customer/environment type
                                   endpoint_display_name: str = "mistral-v2-endpoint",
                                   machine_type: str = "g2-standard-8",
                                   accelerator_type: str = "NVIDIA_L4",
                                   accelerator_count: int = 1,
                                   max_model_len: int = 4096,
                                   gpu_memory_utilization: float = 0.9,
                                   service_account: str = SERVICE_ACCOUNT,
                                   min_replica_count: int = 1,
                                   max_replica_count: int = 1,
                                  ):
    # I split this into two stages: 1) upload the model 2) deploy the model
    # But you can split it up however you want.

    model = upload_model_vllm(
        model_display_name = model_display_name,
        machine_type = machine_type,
        accelerator_type = accelerator_type,
        accelerator_count = accelerator_count,
        max_model_len = max_model_len,
        gpu_memory_utilization = gpu_memory_utilization,
    )

    endpoint = deploy_vllm_model(
        endpoint_display_name = endpoint_display_name,
        model_name = model.output,
        machine_type = machine_type,
        accelerator_type = accelerator_type,
        accelerator_count = accelerator_count,
        service_account = service_account,
        max_model_len = max_model_len,
        gpu_memory_utilization = gpu_memory_utilization,
        min_replica_count = min_replica_count,
        max_replica_count = max_replica_count
    )

    endpoint.after(model) # this determines order of operations. Otherwise pipelines would execute these consecutively

In [11]:
compiler.Compiler().compile(
    pipeline_func=deploy_mistral_vertex_endpoint,
    package_path="deploy_mistral_vertex_endpoint.json",  # Output JSON file
)

In [12]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

In [13]:
job = aiplatform.PipelineJob(
    display_name="deploy_mistral_vertex_endpoint",
    template_path="deploy_mistral_vertex_endpoint.json",
    job_id="deploy-mistral-vertex-endpoint-{0}".format(TIMESTAMP),
    enable_caching=True
) # could convert this job into a template to run "save" the pipeline

In [14]:
job.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/336668109844/locations/us-central1/pipelineJobs/deploy-mistral-vertex-endpoint-20240701202710
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/336668109844/locations/us-central1/pipelineJobs/deploy-mistral-vertex-endpoint-20240701202710')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/deploy-mistral-vertex-endpoint-20240701202710?project=336668109844
