In [2]:
!uv init
!uv add aiohttp asyncio

[1m[31merror[39m[0m: Project is already initialized in `[36m/content[39m` (`pyproject.toml` file exists)
[2mResolved [1m13 packages[0m [2min 12ms[0m[0m
[2mAudited [1m12 packages[0m [2min 0.60ms[0m[0m


In [3]:
TOTAL_CONCURRENT_REQUESTS = 30
LLM_MODEL = "tgi"
NUM_TOKENS = 128
CR_URL = "https://tgi-cloud-run-630458277802.europe-west4.run.app/v1/completions" # replace with yours

In [None]:
import aiohttp
import asyncio
import time
import numpy as np
import random
import nest_asyncio
import google.auth
import requests

from google.auth.transport.requests import Request
from google.auth import impersonated_credentials

nest_asyncio.apply()

# Service account to impersonate, you must use a impersonated SA or it won't let you get the correct token
IMPERSONATE_SERVICE_ACCOUNT = "project-level-cloud-run-invoke@rocketech-de-pgcp-sandbox.iam.gserviceaccount.com"

# List of question templates
question_templates = [
    "How would you design an Airflow DAG to handle a daily ETL pipeline that processes data from {source} to {destination}?",
    "Write an Airflow task to validate the schema of a {file_type} file stored in {storage} before processing.",
    "How would you optimize an Airflow DAG that is experiencing performance bottlenecks due to {issue}?",
    "Write a Python function to dynamically generate Airflow tasks based on a list of {input_parameter}.",
    "How would you handle task dependencies in an Airflow DAG where {task_A} must run only if {task_B} succeeds?",
    "Write an Airflow sensor to wait for a file named {file_name} to appear in {storage} before proceeding.",
    "How would you configure retries and retry delays for an Airflow task that interacts with {external_service}?",
    "Write a Cloud Composer DAG to orchestrate a data pipeline that processes data from {source} and loads it into {destination}.",
    "How would you monitor and alert for failed tasks in an Airflow DAG running on Cloud Composer?",
    "Write an Airflow task to backfill data from {start_date} to {end_date} for a specific {dataset}.",
    "How would you secure sensitive information (e.g., API keys) in an Airflow DAG running on Cloud Composer?",
    "Write an Airflow DAG to handle a scenario where {task} fails and needs to trigger a rollback process.",
    "How would you scale an Airflow DAG to process {large_dataset} efficiently in Cloud Composer?",
    "Write a Python script to automate the deployment of an Airflow DAG to a Cloud Composer environment.",
    "How would you debug an Airflow DAG that is stuck in a {state} state in Cloud Composer?",
]

# Placeholder values for dynamic question generation
placeholders = {
    "source": ["BigQuery", "Google Cloud Storage", "Pub/Sub", "MySQL", "PostgreSQL"],
    "destination": ["BigQuery", "Google Cloud Storage", "Dataflow", "Snowflake", "Redshift"],
    "file_type": ["CSV", "JSON", "Parquet", "Avro"],
    "storage": ["Google Cloud Storage", "AWS S3", "Azure Blob Storage"],
    "issue": ["high task concurrency", "long-running tasks", "resource contention"],
    "input_parameter": ["table names", "file paths", "dates"],
    "task_A": ["data extraction", "data transformation", "data validation"],
    "task_B": ["data loading", "data cleaning", "data aggregation"],
    "external_service": ["BigQuery", "Cloud SQL", "REST API"],
    "file_name": ["data.csv", "input.json", "output.parquet"],
    "start_date": ["2023-01-01", "2022-12-01", "2023-03-15"],
    "end_date": ["2023-01-31", "2022-12-31", "2023-03-31"],
    "dataset": ["sales", "user_activity", "inventory"],
    "task": ["data extraction", "data transformation", "data loading"],
    "large_dataset": ["1TB of logs", "10 million rows", "100GB of images"],
    "state": ["running", "queued", "failed"],
}

def generate_question(template, placeholders):
    """Generate a question by replacing placeholders with random values."""
    for key, values in placeholders.items():
        if f"{{{key}}}" in template:
            template = template.replace(f"{{{key}}}", random.choice(values))
    return template

def generate(num_prompts):
    """Generate the specified number of unique questions."""
    for _ in range(num_prompts):
        template = random.choice(question_templates)
        yield generate_question(template, placeholders)

async def get_access_token():
    """Fetches an OAuth2 access token using Application Default Credentials (ADC)."""
    creds, _ = google.auth.default()
    creds.refresh(Request())  # Ensure the token is refreshed
    return creds.token

# Generate all prompts
all_prompts = list(generate(TOTAL_CONCURRENT_REQUESTS))

async def send_request(session, index, latencies, token):
    """Sends an authenticated request to the Cloud Run endpoint using an identity token."""

    payload = {
        "model": LLM_MODEL,
        "prompt": all_prompts[index],
        "max_tokens": NUM_TOKENS,
        "temperature": 0.90
    }

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }

    try:
        start_time = time.monotonic()
        async with session.post(CR_URL, json=payload, headers=headers) as response:
            end_time = time.monotonic()
            latency = end_time - start_time
            latencies.append(latency)

            if response.status == 200:
                return await response.text()
            else:
                return f"Error: {response.status} - {await response.text()}"
    except Exception as e:
        return f"Exception: {str(e)}"

async def main():
    """Main function to send multiple concurrent requests to Cloud Run."""
    latencies = []

    # Create a base credential from ADC
    base_credentials, project_id = google.auth.default()

    # Create an impersonated credential with the target service account
    impersonated_creds = impersonated_credentials.Credentials(
        base_credentials,
        target_principal=IMPERSONATE_SERVICE_ACCOUNT,
        target_scopes=["https://www.googleapis.com/auth/cloud-platform"],
        lifetime=3600  # Token lifetime in seconds
    )

    # Obtain an ID token for the Cloud Run service (audience = Cloud Run URL)
    request = Request()
    # Fetch an ID token using the impersonated credentials
    id_token_creds = impersonated_credentials.IDTokenCredentials(
        impersonated_creds, target_audience=CR_URL
    )

    # Refresh to get a valid token
    id_token_creds.refresh(request)

    # Extract the token
    id_token = id_token_creds.token

    async with aiohttp.ClientSession() as session:
        tasks = [send_request(session, index, latencies, id_token) for index in range(TOTAL_CONCURRENT_REQUESTS)]
        results = await asyncio.gather(*tasks)

        # Print responses
        for r in results:
            print(r)

    # Compute P99 Latency
    if latencies:
        p99_latency = np.percentile(latencies, 99)
        print(f"P99 Latency: {p99_latency:.4f} seconds")
    else:
        print("No latencies recorded.")

# Proper async handling in Jupyter Notebook
await main()