# BigQuery Anti-Pattern Recognition - Deployment with AI Rewrite

**Happy path deployment with independent cells for easy debugging**

Each cell is self-contained and can be run independently after the initial setup.

This version includes AI-powered SQL rewriting capabilities using Vertex AI.

## 1. Setup and Configuration

In [None]:
# Install dependencies
!pip install -r requirements.txt -q

In [None]:
import subprocess
import requests
import json
import tempfile
import os


def run_cmd(cmd, description=""):
    """Run shell command with simple logging"""
    if description:
        print(f"\n{description}")
    print(f"Command: {cmd}")
    print("-" * 50)

    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

    if result.returncode == 0:
        print("Status: SUCCESS")
        if result.stdout.strip():
            print(f"Output: {result.stdout.strip()}")
    else:
        print(f"Status: FAILED (exit code {result.returncode})")
        if result.stdout.strip():
            print(f"Output: {result.stdout.strip()}")
        if result.stderr.strip():
            print(f"Error: {result.stderr.strip()}")

    return result


def run_sql(sql, description="", project_id=None):
    """Run BigQuery SQL with simple logging"""
    if description:
        print(f"\n{description}")
    print("SQL Query:")
    print(sql)
    print("-" * 50)

    if project_id is None:
        project_id = PROJECT_ID

    cmd = f'bq query --project_id={project_id} --use_legacy_sql=false --format=pretty "{sql}"'
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)

    if result.returncode == 0:
        print("Status: SUCCESS")
        print("Results:")
        print(result.stdout)
    else:
        print(f"Status: FAILED (exit code {result.returncode})")
        if result.stderr.strip():
            print(f"Error: {result.stderr.strip()}")

    return result


def test_api(url, payload=None, method="POST", description=""):
    """Test API endpoint with simple logging"""
    if description:
        print(f"\n{description}")
    print(f"API Call: {method} {url}")
    if payload:
        print(f"Payload: {json.dumps(payload, indent=2)}")
    print("-" * 50)

    try:
        if method == "POST":
            response = requests.post(url, json=payload, timeout=30)
        else:
            response = requests.get(url, timeout=30)

        print(f"Status: {response.status_code}")

        if response.status_code == 200:
            data = response.json()
            print("Response:")
            print(json.dumps(data, indent=2)[:500])  # First 500 chars

            # Special handling for antipattern responses
            if 'replies' in data and data['replies']:
                reply = data['replies'][0]
                if 'antipatterns' in reply:
                    print(
                        f"\nFound {len(reply['antipatterns'])} anti-patterns")
                    for i, ap in enumerate(reply['antipatterns'][:3]):
                        name = ap.get('name', 'Unknown')
                        description = ap.get('result', ap.get(
                            'description', 'No description'))
                        print(f"  {i+1}. {name}: {description[:100]}...")
                if 'optimized_sql' in reply:
                    print(f"\nOptimized SQL: {reply['optimized_sql']}")
        else:
            print(f"Response: {response.text[:200]}")

        return response

    except Exception as e:
        print(f"Status: FAILED")
        print(f"Error: {str(e)}")
        return None


print("Setup complete")

In [None]:
# Configuration - Edit these values for your project
PROJECT_ID = "your-project-id"  # CHANGE THIS
REGION = "us-central1"
ARTIFACT_REGISTRY = "antipattern-registry"
BQ_DATASET = "antipattern_demo"
BATCH_SERVICE_NAME = "antipattern-batch-job"
API_SERVICE_NAME = "antipattern-api-service"

# Deployment flags - Set to False to skip
DEPLOY_BATCH = True
DEPLOY_API = True
DEPLOY_UDF = True
ENABLE_AI_REWRITE = True  # Enable AI-powered SQL rewriting

# Generate derived values
ARTIFACT_REGISTRY_URL = f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{ARTIFACT_REGISTRY}"
BATCH_IMAGE = f"{ARTIFACT_REGISTRY_URL}/antipattern-batch:latest"
SERVICE_IMAGE = f"{ARTIFACT_REGISTRY_URL}/antipattern-service:latest"
OUTPUT_TABLE = f"{PROJECT_ID}.{BQ_DATASET}.antipattern_batch_results"
INFO_SCHEMA_TABLE = f"{PROJECT_ID}.region-us.INFORMATION_SCHEMA.JOBS"

print(f"Project: {PROJECT_ID}")
print(f"Region: {REGION}")
print(f"Batch Image: {BATCH_IMAGE}")
print(f"Service Image: {SERVICE_IMAGE}")
print(f"AI Rewrite Enabled: {ENABLE_AI_REWRITE}")

## 2. Enable APIs

In [None]:
# Enable all required APIs in one command
apis = [
    "cloudbuild.googleapis.com",
    "run.googleapis.com",
    "artifactregistry.googleapis.com",
    "bigquery.googleapis.com",
    "bigqueryconnection.googleapis.com",
    "aiplatform.googleapis.com"
]

cmd = f"gcloud services enable {' '.join(apis)} --project={PROJECT_ID}"
run_cmd(cmd, "Enabling APIs")

## 3. Create Artifact Registry

In [None]:
# Create Artifact Registry repository
cmd = f"""gcloud artifacts repositories create {ARTIFACT_REGISTRY} \
    --repository-format=docker \
    --location={REGION} \
    --description="BigQuery Anti-Pattern Recognition" \
    --project={PROJECT_ID}"""

run_cmd(cmd, "Creating Artifact Registry")
print(f"\nRegistry URL: {ARTIFACT_REGISTRY_URL}")

## 4. Create BigQuery Datasets and Tables

In [None]:
# Create BigQuery dataset
cmd = f"""bq mk --dataset \
    --project_id={PROJECT_ID} \
    --location={REGION} \
    --description="Anti-Pattern Recognition Demo" \
    {BQ_DATASET}"""

run_cmd(cmd, "Creating BigQuery dataset")
print(f"\nDataset: {PROJECT_ID}.{BQ_DATASET}")

In [None]:
# Create output table for standard anti-pattern detection
standard_table_sql = f"""CREATE OR REPLACE TABLE {OUTPUT_TABLE} (
  job_id STRING,
  user_email STRING,
  query STRING,
  recommendation ARRAY<STRUCT<name STRING, description STRING>>,
  slot_hours FLOAT64,
  optimized_sql STRING,
  process_timestamp TIMESTAMP
)"""

run_sql(standard_table_sql, "Creating output table")

## 5. Build Batch Container

In [None]:
# Build batch processing container (if enabled)
if DEPLOY_BATCH:
    print("Building batch container... (5-10 minutes)")

    cmd = f"""cd .. && gcloud builds submit . \
        --project={PROJECT_ID} \
        --config=demo/cloudbuild-batch.yaml \
        --substitutions=_CONTAINER_IMAGE_NAME={BATCH_IMAGE} \
        --machine-type=e2-highcpu-8"""

    result = run_cmd(cmd, "Building batch container")
    print(f"\nBatch Image: {BATCH_IMAGE}")
else:
    print("Skipping batch container build")

## 6. Build Service Container

In [None]:
# Build service container (if API or UDF enabled)
if DEPLOY_API or DEPLOY_UDF:
    print("Building service container... (5-10 minutes)")

    cmd = f"""cd .. && gcloud builds submit . \
        --project={PROJECT_ID} \
        --config=demo/cloudbuild-service.yaml \
        --substitutions=_CONTAINER_IMAGE_NAME={SERVICE_IMAGE} \
        --machine-type=e2-highcpu-8"""

    result = run_cmd(cmd, "Building service container")
    print(f"\nService Image: {SERVICE_IMAGE}")
else:
    print("Skipping service container build")

## 7. Deploy Cloud Run Job (Standard)

In [None]:
# Deploy Cloud Run Job for standard batch processing
if DEPLOY_BATCH:
    # Delete existing job if it exists
    delete_cmd = f"""gcloud run jobs delete {BATCH_SERVICE_NAME} \
        --region={REGION} --project={PROJECT_ID} --quiet"""
    subprocess.run(delete_cmd, shell=True, capture_output=True)

    # Create new job
    cmd = f"""gcloud run jobs create {BATCH_SERVICE_NAME} \
        --image={BATCH_IMAGE} \
        --max-retries=3 --task-timeout=15m --memory=2Gi --cpu=2 \
        --args="--read_from_info_schema" \
        --args="--read_from_info_schema_days" --args="180" \
        --args="--info_schema_table_name" --args="{INFO_SCHEMA_TABLE}" \
        --args="--processing_project_id" --args="{PROJECT_ID}" \
        --args="--output_table" --args="{OUTPUT_TABLE}" \
        --region={REGION} --project={PROJECT_ID}"""

    result = run_cmd(cmd, "Creating standard batch job")

    if result.returncode == 0:
        print(f"\nStandard batch job created successfully!")
        print(f"Batch Job: {BATCH_SERVICE_NAME}")
        print(f"Output Table: {OUTPUT_TABLE}")
    else:
        print(f"\nJob creation failed with return code: {result.returncode}")

else:
    print("Skipping batch job deployment")

## 8. Deploy Cloud Run Job (AI-Enhanced)

In [None]:
# Deploy Cloud Run Job with AI rewrite capabilities
if DEPLOY_BATCH and ENABLE_AI_REWRITE:
    AI_BATCH_SERVICE_NAME = f"{BATCH_SERVICE_NAME}-ai"

    # Delete existing AI job if it exists
    delete_cmd = f"""gcloud run jobs delete {AI_BATCH_SERVICE_NAME} \
        --region={REGION} --project={PROJECT_ID} --quiet"""
    subprocess.run(delete_cmd, shell=True, capture_output=True)

    # Create new AI-enhanced job
    cmd = f"""gcloud run jobs create {AI_BATCH_SERVICE_NAME} \
        --image={BATCH_IMAGE} \
        --max-retries=3 --task-timeout=30m --memory=4Gi --cpu=2 \
        --args="--read_from_info_schema" \
        --args="--read_from_info_schema_days" --args="180" \
        --args="--info_schema_table_name" --args="{INFO_SCHEMA_TABLE}" \
        --args="--processing_project_id" --args="{PROJECT_ID}" \
        --args="--output_table" --args="{OUTPUT_TABLE}" \
        --args="--rewrite_sql" \
        --region={REGION} --project={PROJECT_ID}"""

    result = run_cmd(cmd, "Creating AI-enhanced batch job")

    if result.returncode == 0:
        print(f"\nAI-enhanced batch job created successfully!")
        print(f"AI Batch Job: {AI_BATCH_SERVICE_NAME}")
        print(f"AI Output Table: {OUTPUT_TABLE}")
    else:
        print(
            f"\nAI job creation failed with return code: {result.returncode}")

else:
    print("Skipping AI-enhanced batch job deployment")

## 9. Deploy Cloud Run Service (API)

In [None]:
# Deploy Cloud Run Service for REST API
if DEPLOY_API:
    cmd = f"""gcloud run deploy {API_SERVICE_NAME} \
        --image={SERVICE_IMAGE} \
        --region={REGION} \
        --allow-unauthenticated \
        --memory=2Gi \
        --cpu=2 \
        --timeout=300 \
        --port=8080 \
        --set-env-vars=PROJECT_ID={PROJECT_ID} \
        --project={PROJECT_ID}"""

    result = run_cmd(cmd, "Deploying API service")

    # Extract service URL from stderr (where gcloud outputs it)
    SERVICE_URL = None
    if result.stderr and "Service URL:" in result.stderr:
        import re
        # Extract URL from stderr with ANSI color codes
        url_match = re.search(
            r'Service URL: \x1b\[1m(https://[^\x1b]+)\x1b\[m', result.stderr)
        if url_match:
            SERVICE_URL = url_match.group(1)
            print(f"\nExtracted Service URL: {SERVICE_URL}")
        else:
            print("\nCould not extract service URL from deployment output")
    else:
        print("\nNo service URL found in deployment output")

else:
    print("Skipping API service deployment")
    SERVICE_URL = None

## 10. Create BigQuery Connection and UDF

In [None]:
# Create BigQuery connection and UDF
if DEPLOY_UDF and SERVICE_URL:
    CONNECTION_NAME = f"ext-{API_SERVICE_NAME}"

    # Create connection
    conn_cmd = f"""bq mk --connection \
        --display_name='Anti-Pattern Recognition Connection' \
        --connection_type=CLOUD_RESOURCE \
        --project_id={PROJECT_ID} \
        --location={REGION} \
        {CONNECTION_NAME}"""

    result = run_cmd(conn_cmd, "Creating BigQuery connection")

    # Get service account
    sa_cmd = f"""bq --project_id={PROJECT_ID} --format=json show \
        --connection {PROJECT_ID}.{REGION}.{CONNECTION_NAME}"""

    sa_result = subprocess.run(
        sa_cmd, shell=True, capture_output=True, text=True)
    if sa_result.returncode == 0:
        connection_info = json.loads(sa_result.stdout)
        service_account = connection_info['cloudResource']['serviceAccountId']
        print(f"Service Account: {service_account}")

        # Grant permissions
        perm_cmd = f"""gcloud projects add-iam-policy-binding {PROJECT_ID} \
            --member="serviceAccount:{service_account}" \
            --role='roles/run.invoker'"""

        run_cmd(perm_cmd, "Granting permissions")

    # Create UDF
    function_sql = f"""CREATE OR REPLACE FUNCTION {BQ_DATASET}.get_antipatterns(query STRING)
RETURNS JSON
REMOTE WITH CONNECTION `{PROJECT_ID}.{REGION}.{CONNECTION_NAME}`
OPTIONS (endpoint = '{SERVICE_URL}');"""

    run_sql(function_sql, "Creating UDF")

    print(f"\nUDF Name: {BQ_DATASET}.get_antipatterns")
    print(
        f"Example: SELECT {BQ_DATASET}.get_antipatterns('SELECT * FROM table') as antipatterns;")

elif DEPLOY_UDF:
    print("Cannot create UDF - API service not deployed")
else:
    print("Skipping UDF deployment")

## 11. Test Standard Batch Job

In [None]:
# Test standard batch job execution
if DEPLOY_BATCH:
    print("Executing standard batch job... (may take a few minutes)")

    cmd = f"""gcloud run jobs execute {BATCH_SERVICE_NAME} \
        --region={REGION} \
        --project={PROJECT_ID} \
        --wait"""

    result = run_cmd(cmd, "Executing standard batch job")

    if result.returncode == 0:
        print("\nStandard batch job executed successfully!")

        # Check results
        check_sql = f"SELECT COUNT(*) as result_count FROM {OUTPUT_TABLE}"
        run_sql(check_sql, "Checking standard results table")
    else:
        print(
            f"\nStandard batch job execution failed with return code: {result.returncode}")

else:
    print("Skipping standard batch job test")

## 12. Test AI-Enhanced Batch Job

In [None]:
# Test AI-enhanced batch job execution
if DEPLOY_BATCH and ENABLE_AI_REWRITE:
    print("Executing AI-enhanced batch job... (may take longer due to AI processing)")

    cmd = f"""gcloud run jobs execute {AI_BATCH_SERVICE_NAME} \
        --region={REGION} \
        --project={PROJECT_ID} \
        --wait"""

    result = run_cmd(cmd, "Executing AI-enhanced batch job")

    if result.returncode == 0:
        print("\nAI-enhanced batch job executed successfully!")

        # Check results with optimized SQL
        check_sql = f"""SELECT
            COUNT(*) as total_results,
            COUNTIF(optimized_sql IS NOT NULL) as queries_with_optimized_sql
        FROM {OUTPUT_TABLE}"""
        run_sql(check_sql, "Checking AI results table")
    else:
        print(
            f"\nAI batch job execution failed with return code: {result.returncode}")

else:
    print("Skipping AI-enhanced batch job test")

## 13. Test API Service (Standard)

In [None]:
# Test standard API service
if DEPLOY_API and SERVICE_URL:
    test_query = "SELECT * FROM `bigquery-public-data.samples.shakespeare` ORDER BY word_count DESC LIMIT 10"
    payload = {'calls': [[test_query]]}

    test_api(SERVICE_URL, payload, "POST", "Testing standard API service")
else:
    print("Skipping API test")

## 14. Test API Service (AI Rewrite)

In [None]:
# Test AI rewrite API endpoint
if DEPLOY_API and SERVICE_URL and ENABLE_AI_REWRITE:
    test_query = "SELECT * FROM dataset.table WHERE col2 LIKE '%test%' AND col1 = 'value' ORDER BY col3"
    payload = {'calls': [[test_query]]}
    rewrite_url = f"{SERVICE_URL}/rewrite"

    test_api(rewrite_url, payload, "POST", "Testing AI rewrite API endpoint")
else:
    print("Skipping AI rewrite API test")

## 15. Test UDF

In [None]:
# Test BigQuery UDF
if DEPLOY_UDF:
    # Create the test SQL query
    test_sql = f"""SELECT
        'SELECT * FROM dataset.table ORDER BY column' as test_query,
        {BQ_DATASET}.get_antipatterns('SELECT * FROM dataset.table ORDER BY column') as antipatterns"""

    run_sql(test_sql, "Testing UDF")
else:
    print("Skipping UDF test")