In [None]:
#For downloading model to private storage. Use'll use cloud storage as model repository. Tuned model should have uploaded to cloud storage

from huggingface_hub import snapshot_download
import os
hf_token = ""
#hf_repo_id = "meta-llama/Llama-3.1-8B"
#hf_repo_id = "Qwen/Qwen3-32B"
#hf_repo_id = "google/gemma-3-1b-it"
hf_repo_id = "google/gemma-3-27b-it"
BASE_ARTIFACT_URI = "gs://jk-model-repo"
os.system(f"rm -rf /tmp/model")
print("Start downloading")
snapshot_download(repo_id=hf_repo_id, token=hf_token, local_dir=f"/tmp/model")
print("Uploading")
os.system(f"gcloud storage cp /tmp/model/*.* {BASE_ARTIFACT_URI}/{hf_repo_id}")
print("Done")

In [None]:
#Basic configuration
import os
import logging
import json
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(level=logging.INFO)
LOCATION = "asia-northeast3"
PROJECT_NUMBER = "1045259343465"
# 0.10.1 dev
vLLM_DOCKER_URI = "us-docker.pkg.dev/vertex-ai/vertex-vision-model-garden-dockers/pytorch-vllm-serve:20260130_0916_RC01"
# "20250601_0916_RC01" for ver 0.9.0
DEPLOY_TIMEOUT = 3600

In [2]:
SERVICE_ACCOUNT = "1045259343465-compute@developer.gserviceaccount.com"   #SA to access gcs
#Model configuration
MODEL_NAME_V1 = "Qwen2.5-VL-3B-Instruct GPU"
MODEL_PATH_V1 = "gs://jk-model-repo/Qwen/Qwen2.5-VL-3B-Instruct"
MODEL_ID = "/".join(MODEL_PATH_V1.split("/")[-2:])
TENSOR_PARALLEL_SIZE = 1
MAX_MODEL_LEN = 65536
MACHINE_TYPE = f"a2-highgpu-{TENSOR_PARALLEL_SIZE}g" #a2-highgpu-4g, g2-standard-48, a3-edgegpu-8g
TPU_TOPOLOGY = None
ACCELERATOR_TYPE = "NVIDIA_TESLA_A100" #NVIDIA_TESLA_A100, NVIDIA_L4, NVIDIA_H100_80GB
ACCELERATOR_COUNT = TENSOR_PARALLEL_SIZE

#MODEL_NAME_V1 = "Llama-3.1-8B"
#MODEL_PATH_V1 = "gs://jk-model-repo/meta-llama/Llama-3.1-8B"
#TENSOR_PARALLEL_SIZE = 1
#MAX_MODEL_LEN = 32768
#MACHINE_TYPE = "a2-highgpu-1g"
#ACCELERATOR_TYPE = "NVIDIA_TESLA_A100"

In [None]:
#Upload model pacakge to vertex model repository
from google.cloud import aiplatform
model_v1 = aiplatform.Model.upload(
    location=LOCATION,
    display_name = MODEL_NAME_V1,
    #local_model = local_model_v1,
    artifact_uri = MODEL_PATH_V1,
    #parent_model = prev_model.resource_name,
    #is_default_version=True,
    #serving_container_environment_variables={
    #    "VERTEX_CPR_MAX_WORKERS": "1",
    #    "PORT": "8080", #server runs on 5000, or 8080 by dafault
    #    "RUST_BACKTRACE": "full", #for stack trace printing,
    #},
    #serving_container_ports=[8080],
    #serving_container_args = ["--num-shard 1"]
    
    #For direct container upload rather then use LocalModel
    serving_container_image_uri=vLLM_DOCKER_URI,
    serving_container_invoke_route_prefix="/*",
    #serving_container_predict_route="/v1/chat/completions",
    serving_container_health_route="/health",
    serving_container_ports=[8080],
    serving_container_environment_variables={
        #"VLLM_ALLOW_LONG_MAX_MODEL_LEN": "1",
        #"VLLM_USE_V1": "1",
        #"HF_TOKEN": hf_token
        "MODEL_ID": MODEL_ID,
        "DEPLOY_SOURCE": "API_NATIVE_MODEL"
    },
    serving_container_args=["python",
                            "-m",
                            "vllm.entrypoints.api_server",
                            #f"--download-dir={BASE_PATH}",
                            #f"--model={MODEL_PATH_V1}",                                                
                            "--port=8080",                                               
                            f"--tensor-parallel-size={TENSOR_PARALLEL_SIZE}",
                            "--enable-prefix-caching",
                            "--enable-chunked-prefill",
                            f"--max-model-len={MAX_MODEL_LEN}",
                            "--served-model-name=openapi",
                            #"--enable-request-id-headers",   #enable to track id from response header
                            "--disable_chunked_mm_input",
                            "--limit-mm-per-prompt.image=0",
                            "--limit-mm-per-prompt.video=1",
                            "--max-num-seqs=1",
                            "--max-num-batched-tokens=64K",
                            "--allowed-local-media-path=/gcs/jk-mount-test",
                            f"--model={MODEL_ID}",
                            #"--gpu-memory-utilization=0.9",
                            #"--swap-space=16",
                            #"--max-num-batched-tokens=512",
                            #"--enforce-eager",  #Reduce memory but also slow
                            #"--max-num-seqs=128", #Temporal for testing
                            ],
    serving_container_shared_memory_size_mb=(16 * 1024),  # 16 GB
)

In [4]:
from google.cloud import aiplatform
endpoint = aiplatform.Endpoint.create(
    display_name=f"{MODEL_NAME_V1} public dedicated test endpoint",
    labels={"sample-key": "sample-value"},
    location=LOCATION,
    dedicated_endpoint_enabled=True,
)

In [5]:
#Public endpoint with dedicated network
for i in range(1):
    response = None
    try:
        response = endpoint.deploy(
            model = model_v1,
            machine_type=MACHINE_TYPE,
            tpu_topology=TPU_TOPOLOGY,    
            min_replica_count=1,
            max_replica_count=1,
            service_account=SERVICE_ACCOUNT,
            #traffic_percentage=50
            #traffic_split={'a':50, 'b':50}
            #Configs for GPU
            #accelerator_type="NVIDIA_L4",
            #machine_type="a2-highgpu-1g",
            accelerator_type=ACCELERATOR_TYPE,
            accelerator_count=ACCELERATOR_COUNT,
            deploy_request_timeout=DEPLOY_TIMEOUT
        )
    except:
        print(response)
        print(f"Retrying {i}")
        continue
    print("Created!")
    break

None
Retrying 0


In [None]:
#Sample test
from pydantic import BaseModel
from enum import Enum

class CarType(str, Enum):
    sedan = "sedan"
    suv = "SUV"
    truck = "Truck"
    coupe = "Coupe"

class CarDescription(BaseModel):
    brand: str
    model: str
    car_type: CarType

prediction_input = {
    "stream": False,
    "chat_template_kwargs": {"enable_thinking": False},
    "messages": [
        {
            "role": "user",
            "content": "Generate a JSON with the brand, model and car_type of the most iconic car from the 90's"
        }
    ],
    "guided_json": CarDescription.model_json_schema()
}

In [None]:
#Public and dedicated endpoint predict
from google.cloud import aiplatform
#ENDPOINT_ID = "4304072351789613056"
#endpoint = aiplatform.Endpoint(ENDPOINT_ID, location=LOCATION)
headers = {
    "Content-Type": "application/json",
    #"x-request-id": "ebb94475-1ca2-4e4b-baa3-8d039c0e616e", #works when --enable-request-id-headers option enabled
    "x-vertex-ai-timeout-ms": "60000"
}
response = endpoint.invoke(request_path="/v1/chat/completions", headers=headers, body=json.dumps(prediction_input, indent=2).encode('utf-8'))
print(response.headers)
print(response)
print(response.json()['choices'][0]['message']['content'])

In [None]:
payload = {
    "stream": False,
    "chat_template_kwargs": {"enable_thinking": False},
    "messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text", "text": "Describe the video content in detail."
                },
                {
                    "type": "video_url",
                    "video_url": {"url": "/gcs/jk-mount-test/free-videos.mp4"}
                    #"video_url": {"url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerFun.mp4"}
                },
            ]
        }
    ]
}
response = endpoint.invoke(request_path="/v1/chat/completions", headers=headers, body=json.dumps(payload, indent=2).encode('utf-8'))
print(response.headers)
print(response)
print(response.json()['choices'][0]['message']['content'])

In [None]:
#VLLM prometheus metric
import requests
payload = {
    "stream": False,
    "chat_template_kwargs": {"enable_thinking": False},
    "messages": [
        {
            "role": "user",
            "content": [
                {
                    "type": "text", 
                    "text": "Describe the video content in detail."
                },
                {
                    "type": "video_url",
                    "video_url": {"url": "/gcs/jk-mount-test/free-videos.mp4"}
                    #"video_url": {"url": "http://commondatastorage.googleapis.com/gtv-videos-bucket/sample/ForBiggerFun.mp4"}
                },
            ]
        }
    ]
}
headers = {
    "Content-Type": "application/json",
}
url = f"https://2000057830412910592.asia-northeast3-610420230010.prediction.vertexai.goog/v1/projects/1045259343465/locations/asia-northeast3/endpoints/2000057830412910592 //chat/completions"
response = requests.post(url, data=json.dumps(payload), headers=headers)
response.text

In [None]:
#VLLM prometheus metric
import google.auth
import requests
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)
headers = {
    "Authorization": f"Bearer {creds.token}",
}
url = f"https://{ENDPOINT_ID}.{LOCATION}-{PROJECT_NUMBER}.prediction.vertexai.goog/v1/projects/{PROJECT_NUMBER}/locations/{LOCATION}/endpoints/{ENDPOINT_ID}/invoke/metrics"
response = requests.post(url, data='', headers=headers)
response.text

In [None]:
#For dedicated endpoint, endpoint.predict(instances=instances, use_dedicated_endpoint=True)
#Vertex AI predict (Need to modify payload)
import google.auth
import requests
creds, project = google.auth.default()
auth_req = google.auth.transport.requests.Request()
creds.refresh(auth_req)

url = f"https://{ENDPOINT_ID}.{LOCATION}-{PROJECT_NUMBER}.prediction.vertexai.goog/v1/projects/{PROJECT_NUMBER}/locations/{LOCATION}/endpoints/{ENDPOINT_ID}/invoke/v1/chat/completions"
headers = {
    "Authorization": f"Bearer {creds.token}",
    "Content-Type": "application/json",
    "x-vertex-ai-timeout-ms": "30000"
}
payload = {
    "stream": True,
    "chat_template_kwargs": {"enable_thinking": True},
    "messages": [
        {
            "role": "user",
            "content": "Hello"
        }
    ]
}
response = requests.post(url, json=payload, headers=headers)
print(response)
import time
def stream_parse(response):
    """
    스트리밍 Server-Sent Events (SSE) 응답을 올바르게 파싱합니다.

    Args:
        response: requests.Response 객체 (stream=True로 요청).
    """
    result = ""
    # response.iter_lines()를 사용하여 스트림을 한 줄씩 처리합니다.
    # 이 메서드가 네트워크 청크를 자동으로 합쳐 완전한 줄을 만들어줍니다.
    for line in response.iter_lines():
        # keep-alive를 위한 빈 줄은 건너뜁니다.
        if not line:
            continue

        # bytes 타입을 string으로 디코딩합니다.
        decoded_line = line.decode("utf-8")

        # SSE 데이터는 "data:"로 시작합니다.
        if decoded_line.startswith("data:"):
            # "data:" 접두사와 양쪽 공백을 제거합니다.
            data_str = decoded_line[5:].strip()

            # API가 보내는 스트림 종료 신호입니다.
            if data_str == "[DONE]":
                break

            # "data: " 뒤에 내용이 없는 경우를 대비합니다.
            if not data_str:
                continue
            
            try:
                # 문자열을 JSON 객체로 파싱합니다.
                chunk = json.loads(data_str)
                
                # get() 메서드를 사용하여 안전하게 데이터에 접근합니다.
                choices = chunk.get("choices", [])
                if choices:
                    delta = choices[0].get("delta", {})
                    content = delta.get("content", "")
                    if content:
                        result += content
                        # 콘솔에 수신되는 내용을 실시간으로 출력합니다.
                        print(content, end="", flush=True)
                        time.sleep(0.001)

            except json.JSONDecodeError:
                print(f"\n[오류] JSON 디코딩 실패: {data_str}")
    
    print()  # 스트림이 끝나면 줄바꿈을 해줍니다.
    return result
result = stream_parse(response)