Infrastructure/main.tf

In [None]:
# infrastructure/main.tf

terraform {
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 4.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

# Enable required APIs
resource "google_project_service" "apis" {
  for_each = toset([
    "run.googleapis.com",
    "redis.googleapis.com",
    "aiplatform.googleapis.com",
    "pubsub.googleapis.com"
  ])
  service = each.key
}

# Create a VPC network
resource "google_compute_network" "vpc_network" {
  name                    = "chatbot-vpc"
  auto_create_subnetworks = false
  depends_on              = [google_project_service.apis]
}

# Create a subnet for Redis
resource "google_compute_subnetwork" "subnet" {
  name          = "chatbot-subnet"
  ip_cidr_range = "10.0.0.0/16"
  region        = var.region
  network       = google_compute_network.vpc_network.id
}

# Create Redis Instance (Cloud Memorystore)
resource "google_redis_instance" "chatbot_cache" {
  name               = "chatbot-redis-cache"
  tier               = "STANDARD_HA"
  memory_size_gb     = 5
  region             = var.region
  authorized_network = google_compute_network.vpc_network.id
  connect_mode       = "PRIVATE_SERVICE_ACCESS"

  depends_on = [google_project_service.apis]
}

# Create Pub/Sub Topic for async embedding requests
resource "google_pubsub_topic" "embedding_requests" {
  name = "query-embedding-requests"
}

# Deploy Cloud Run Service
resource "google_cloud_run_service" "chatbot_api" {
  name     = "chatbot-api"
  location = var.region

  template {
    spec {
      containers {
        image = "gcr.io/${var.project_id}/chatbot-api:latest" # Image will be built and pushed during deployment

        ports {
          container_port = 8080
        }

        # Environment variables for application configuration
        env {
          name  = "REDIS_HOST"
          value = google_redis_instance.chatbot_cache.host
        }
        env {
          name  = "REDIS_PORT"
          value = google_redis_instance.chatbot_cache.port
        }
        env {
          name  = "GOOGLE_CLOUD_PROJECT"
          value = var.project_id
        }
        env {
          name  = "EMBEDDING_MODEL_ID"
          value = "textembedding-gecko@003"
        }
        env {
          name  = "PREDICTION_ENDPOINT_ID"
          value = var.vertex_ai_endpoint_id # You would create this separately or via Terraform
        }
        env {
          name  = "EMBEDDING_TOPIC_ID"
          value = google_pubsub_topic.embedding_requests.name
        }
      }
    }
  }

  traffic {
    percent         = 100
    latest_revision = true
  }

  depends_on = [google_project_service.apis]
}

# Allow unauthenticated access to Cloud Run service (can be restricted later)
data "google_iam_policy" "noauth" {
  binding {
    role = "roles/run.invoker"
    members = [
      "allUsers",
    ]
  }
}

resource "google_cloud_run_service_iam_policy" "noauth" {
  service     = google_cloud_run_service.chatbot_api.name
  location    = google_cloud_run_service.chatbot_api.location
  policy_data = data.google_iam_policy.noauth.policy_data
}

# --- CLOUD LOAD BALANCER CONFIGURATION ---

# Reserve a global static IP
resource "google_compute_global_address" "chatbot_lb_ip" {
  name = "chatbot-lb-ip"
}

# Create a serverless network endpoint group (NEG) for Cloud Run
resource "google_compute_region_network_endpoint_group" "chatbot_neg" {
  name                  = "chatbot-neg"
  region                = var.region
  cloud_run {
    service = google_cloud_run_service.chatbot_api.name
  }
}

# Create a backend service for the load balancer
resource "google_compute_backend_service" "chatbot_backend" {
  name                  = "chatbot-backend"
  protocol              = "HTTP"
  port_name             = "http"
  load_balancing_scheme = "EXTERNAL_MANAGED"
  timeout_sec           = 30
  enable_cdn            = false

  backend {
    group = google_compute_region_network_endpoint_group.chatbot_neg.id
  }

  # Health check for the backend service
  health_checks = [google_compute_health_check.chatbot_hc.id]
}

# Health check configuration (points to /healthz endpoint)
resource "google_compute_health_check" "chatbot_hc" {
  name = "chatbot-health-check"
  http_health_check {
    port_specification = "USE_SERVING_PORT"
    request_path       = "/healthz"
    check_interval_sec = 5
    timeout_sec        = 3
  }
}

# Create a URL map to route all requests to our backend
resource "google_compute_url_map" "chatbot_url_map" {
  name            = "chatbot-url-map"
  default_service = google_compute_backend_service.chatbot_backend.id
}

# Create an SSL certificate (Google-managed)
resource "google_compute_managed_ssl_certificate" "chatbot_ssl" {
  name = "chatbot-ssl-cert"
  managed {
    domains = ["chatbot.${var.domain_name}."]
  }
}

# Create an HTTPS target proxy
resource "google_compute_target_https_proxy" "chatbot_https_proxy" {
  name    = "chatbot-https-proxy"
  url_map = google_compute_url_map.chatbot_url_map.id
  ssl_certificates = [
    google_compute_managed_ssl_certificate.chatbot_ssl.id
  ]
}

# Create the global forwarding rule
resource "google_compute_global_forwarding_rule" "chatbot_https_rule" {
  name                  = "chatbot-https-rule"
  ip_protocol           = "TCP"
  port_range            = "443"
  load_balancing_scheme = "EXTERNAL_MANAGED"
  ip_address            = google_compute_global_address.chatbot_lb_ip.address
  target                = google_compute_target_https_proxy.chatbot_https_proxy.id
}

# DNS record to point your domain to the Load Balancer IP
resource "google_dns_record_set" "chatbot_dns" {
  name         = "chatbot.${var.domain_name}."
  type         = "A"
  ttl          = 300
  managed_zone = var.dns_zone_name
  rrdatas      = [google_compute_global_address.chatbot_lb_ip.address]
}

 Application Code (application/main.py)
This is the complete, updated Flask application that runs on Cloud Run.

In [None]:
# application/main.py

import os
import json
import logging
import hashlib
import re
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from google.api_core.exceptions import ResourceExhausted, DeadlineExceeded

import redis
from flask import Flask, request, jsonify

# Import Google Cloud clients
from google.cloud import aiplatform
from google.cloud import pubsub_v1

# Initialize Flask App
app = Flask(__name__)

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- GCP Clients Initialization ---
# Initialize Cloud Memorystore (Redis) client
redis_client = redis.Redis(
    host=os.environ.get('REDIS_HOST', 'localhost'),
    port=int(os.environ.get('REDIS_PORT', 6379)),
    decode_responses=True
)

# Initialize Vertex AI
aiplatform.init(
    project=os.environ.get('GOOGLE_CLOUD_PROJECT'),
    location=os.environ.get('VERTEX_AI_LOCATION', 'us-central1')
)
# Initialize the Vertex AI Text Embedding Model
embedding_model = aiplatform.models.Model(os.environ.get('EMBEDDING_MODEL_ID', 'textembedding-gecko@003'))
# Initialize the Vertex AI Text Generation Model endpoint
prediction_endpoint = aiplatform.Endpoint(os.environ.get('PREDICTION_ENDPOINT_ID', ''))

# Initialize Pub/Sub publisher for batch embedding processing
publisher = pubsub_v1.PublisherClient()
EMBEDDING_PUBSUB_TOPIC = publisher.topic_path(
    os.environ.get('GOOGLE_CLOUD_PROJECT'),
    os.environ.get('EMBEDDING_TOPIC_ID', 'query-embedding-requests')
)

# --- Helper Functions ---
def normalize_query(query: str) -> str:
    """
    Normalize the user query to increase cache hit rate.
    """
    query = query.lower().strip()
    query = re.sub(r'[^\w\s]', '', query)
    words = query.split()
    stopwords = {'what', 'is', 'the', 'a', 'an', 'how', 'to', 'do', 'i', 'can', 'you'}
    filtered_words = [word for word in words if word not in stopwords]
    return " ".join(filtered_words)

def get_query_hash(normalized_query: str) -> str:
    """Generate a unique hash for the normalized query to use as the Redis key."""
    return hashlib.sha256(normalized_query.encode()).hexdigest()

@retry(
    retry=retry_if_exception_type((ResourceExhausted, DeadlineExceeded)),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    stop=stop_after_attempt(3),
    reraise=True
)
def call_vertex_ai_llm(context: str, question: str) -> str:
    """
    Calls the optimized Vertex AI LLM endpoint with retry logic.
    """
    instance = {
        "context": context,
        "message": question
    }
    parameters = {
        "temperature": 0.2,
        "maxOutputTokens": 256,
        "topP": 0.8,
        "topK": 40
    }
    try:
        response = prediction_endpoint.predict(
            instances=[instance],
            parameters=parameters
        )
        return response.predictions[0]['content']
    except Exception as e:
        logger.error(f"Error calling Vertex AI LLM: {e}")
        raise

# --- API Routes ---
@app.route('/healthz', methods=['GET'])
def health_check():
    """Health check endpoint for Load Balancer and Monitoring."""
    return jsonify({"status": "ok"}), 200

@app.route('/api/chat', methods=['POST'])
def chat_handler():
    """
    Main endpoint for user chat queries.
    """
    request_data = request.get_json()
    user_query = request_data.get('query', '')

    if not user_query:
        return jsonify({"error": "Query is required"}), 400

    logger.info(f"Received query: {user_query}")

    normalized_query = normalize_query(user_query)
    query_hash = get_query_hash(normalized_query)

    cached_response = redis_client.get(f"response:{query_hash}")
    if cached_response:
        logger.info("Cache HIT for response")
        return jsonify({"response": cached_response, "source": "cache"})

    cached_embedding = redis_client.get(f"embedding:{query_hash}")
    if cached_embedding:
        logger.info("Cache HIT for embedding")
        relevant_context = "[Context retrieved from Vector DB using cached embedding]"
    else:
        logger.info("Cache MISS for embedding")
        data = {"query": user_query, "normalized_query": normalized_query, "hash": query_hash}
        future = publisher.publish(EMBEDDING_PUBSUB_TOPIC, json.dumps(data).encode("utf-8"))
        logger.info(f"Published query for async embedding: {future.result()}")
        relevant_context = "[General context about products and regulations]"

    try:
        llm_response = call_vertex_ai_llm(relevant_context, user_query)
    except Exception as e:
        return jsonify({"error": "Our AI service is temporarily busy. Please try again."}), 503

    redis_client.setex(f"response:{query_hash}", 86400, llm_response)
    logger.info("Successfully generated and cached response.")
    return jsonify({"response": llm_response, "source": "vertex-ai"})

if __name__ == "__main__":
    app.run(debug=False, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))