# Agentic Platform: LLM Gateway

This lab introduces the concept of an LLM Gateway. LLM Gateways let you track and throttle requests in a multi-tenant environment. Your tenancy could be by department, workload, customer organization, or even individual users on your platform.

There are many options for this from open source projects, private offerings, or DIY. In this lab we'll explore a DIY approach. As the single point of failure for your entire agent platform, it's generally something customers want to own and control themselves. The three main patterns we see are 
* Completely DIY 
* Using an SDK like LiteLLM and building your own infrastructure around it
* Using a 3P offering

The 3P providers guard a lot of the most useful features (security, team management, etc..) behind enterprise licenses. The open source versions of their proxies do deploy resources in your account, but generally, it's not suitable for scale and lacks the enterprise security features needed by most enterprises like oAuth support & easy integration into your existing identity services.

To get started, let's build a rate limiter using Redis. In the platform we use ElastiCache (redis) but fo this lab we'll be using redis in a docker container spun up using the docker compose file provided

# Components of an LLM Gateway
An LLM Gateway generally does 3 things
1. Unifies the types across different APIs and model providers 
2. Provides usage metrics
3. Enables rate limiting based on your defined tenancy 

# Usage Keys
To experiment with Redis, we've provided a docker-compose file in the main project. You can spin up the docker compose to get a local instance of Redis and a local DB instance. In the agent stack Redis is in a private subnet making testing harder to test. You can port foward through the jump box if you'd like to hit it directly, but for this lab we'll be hitting our local docker instance.

You should create a .env file in this directory and have these values once docker compose is stood up. We've also provided a local DynamoDB container in the compose file which you can connect to to run and debug locally.

```bash
REDIS_HOST=localhost
REDIS_PASSWORD=redis
REDIS_PORT=6379
DYNAMODB_USAGE_PLANS_TABLE=<yourDDBTable>
DYNAMODB_USAGE_LOGS_TABLE=<yourDDBTable>

# You need to set the env to local so that Redis doesn't expect SSL
ENVIRONMENT=local
```

In [None]:
from dotenv import load_dotenv

load_dotenv()


# Create our Usage Plan Keys
Usage plans are effectively rate limits. Compared to traditional rate limiting, LLM rate limiting holds a new set of requirements. Rate limits can be set at
* The global level, 
* Model level, 
* Requests per minute (RPM), 
* Tokens per minute (TPM) on the input and output tokens. 

For tenancy, it can be set at the individual level, team level, organization level, or at the service level meaning a background process (or agent level).

This is potentially a high throughput gateway so we need to consider scale as we launch more agentic systems. Additionally we need to optimize for flexibility. Because of these two requirements, using a NoSQL DB with a flexible identifier is key. We'll define our usage plan as follows:
* primary key is <entity id> where entity is a user, team, organization, or service. 
* secondary key is <entity type>. 

We place the entity id as the primary key to more evenly distribute the keys to prevent hot spotting (common issue in NoSQL tables) where common primary key types are grouped together on a shard and the load is unevenly distributed.

Secondly, we'll store the rate limits for input, output, rpm, and model specific limits as an attribute of the item in the NoSQL table. This allows us to rate limit based on an arbitrary lookup key and still maintain flexibility.

Find the key definition below


```bash
class UsagePlanEntityType(str, Enum):
    USER = "USER"
    SERVICE = "SERVICE"
    API_KEY = "API_KEY"
    DEPARTMENT = "DEPARTMENT"
    PROJECT = "PROJECT"

    def __str__(self) -> str:
        return self.value

class RateLimits(BaseModel):
    """Rate limits configuration"""
    input_tpm: int = Field(default=40000, description="Input tokens per minute limit")
    output_tpm: int = Field(default=10000, description="Output tokens per minute limit")
    rpm: int = Field(default=60, description="Requests per minute limit")

class UsagePlan(BaseModel):
    """Usage plan with rate limits"""
    entity_id: str
    entity_type: UsagePlanEntityType
    tenant_id: str = 'SYSTEM' # By default, we assume no tenancy
    budget_id: Optional[str] = None # Placeholder for future use.
    model_permissions: List[str]
    active: bool = Field(default=True)
    default_limits: RateLimits = Field(default_factory=RateLimits)
    model_limits: Dict[str, RateLimits] = Field(default_factory=dict)
    metadata: Optional[Dict] = Field(default_factory=dict)
    created_at: int = Field(default_factory=lambda: int(time.time()))

    def get_limits_for_model(self, model_id: str) -> RateLimits:
        """Get limits for a specific model, falling back to defaults"""
        return self.model_limits.get(model_id, self.default_limits)
```

In [None]:
# Import our key type from the platform.
from agentic_platform.service.llm_gateway.models.usage_types import UsagePlanEntityType,  UsagePlan, RateLimits

In [None]:
from typing import Tuple

# Create our usage plan for a user.
usage_plan: UsagePlan = UsagePlan(
    entity_id="123",
    entity_type=UsagePlanEntityType.USER,
    tenant_id="SYSTEM",
    model_permissions=["*"],
    active=True,
    default_limits=RateLimits(input_tpm=40000, output_tpm=10000, rpm=60),
    model_limits={}
)

print(usage_plan.model_dump_json(indent=2))


This will create a global usage limit of 40000/10000 input and output token limits respectively. We'll also limit all model usage to 60 requests per minute. The pk and sk will look like 123:USER Great!

Now we need to upload it to dynamoDB. We can use agentic platform APIs to upload it. 

In [None]:
from agentic_platform.service.llm_gateway.api.create_usage_plan_controller import CreateUsagePlanController
from agentic_platform.service.llm_gateway.models.gateway_api_types import CreateUsagePlanRequest, CreateUsagePlanResponse

# Create our usage plan.
create_usage_plan_request: CreateUsagePlanRequest = CreateUsagePlanRequest(
    entity_type=UsagePlanEntityType.USER,
    entity_id="123",
    tenant_id="SYSTEM",
    model_permissions=["*"],
    default_limits=RateLimits(input_tpm=40000, output_tpm=10000, rpm=60),
    model_limits={},
    metadata={}
)

response: CreateUsagePlanResponse = CreateUsagePlanController.create(create_usage_plan_request)
print(response.plan.model_dump_json(indent=2))

# Use Rate Limiter
Now tha twe have our API keys, lets build basic rate limiting logic in a rate limiter class that uses a sliding window. It's efficient but could be optimized further by handling edge cases where someone burns through the rate limit at the end of 1 minute and then again at second minute. We're going for the lowest latency approach so our rate limit implementation should be fine for now. 

In [None]:
from agentic_platform.service.llm_gateway.client.cache_client import RateLimiter
from agentic_platform.service.llm_gateway.models.usage_types import RateLimitResult

# Our rate limiter is async, so we need to use nest_asyncio to run it in a synchronous context.
import nest_asyncio
import asyncio
nest_asyncio.apply()

In [None]:
response: RateLimitResult = await RateLimiter.check_limit(plan=usage_plan, model_id="nova-micro", est_input=100, est_output=100)
print(response.model_dump_json(indent=2))

# Test throttling
Lets call it a bunch of times to see if it'll throttle

In [None]:
for i in range(10):
    response = await RateLimiter.check_limit(plan=usage_plan, model_id="nova-micro", est_input=5500, est_output=500)
    print(f"Request {i}: {response}")

    # Update rate limits.
    await RateLimiter.record_usage(plan=usage_plan, model_id="nova-micro", input_tokens=5500, output_tokens=500)


# Create Our Gateway
There's two approaches to a gateway
1) Make a passthrough API for the model provider you're using
2) Normalize all the model APIs to a common format. 

ChatCompletion format is probably the most common format overall. In 2025, new protocols are emerging and it's unclear if ChatCompletion will remain the standard long term. Different model providers have different modalities and features. You miss out on some of those features if you're limited to one providers specific format. Therefore it's important to provide passthrough endpoints as well to allow users to access model provider specific features.

It's also still important to own your own types so even though you're getting all the model results in ChatCompletion format. You still want to convert it to your own type object (even if that type is identical to ChatCompletion). This creates 2 way doors if you decide to use a different gateway or return results in a different format down the road. 

We've provided a sample FastAPI server below and will use the TestClient to execute http requests to it.

In [None]:
from fastapi import FastAPI, Request, HTTPException
import asyncio
from litellm import completion
import uvicorn
from threading import Thread
import boto3
import random
from multiprocessing import Process

from agentic_platform.service.llm_gateway.models.gateway_api_types import (
    ChatCompletionRequest, 
    ChatCompletionResponse,
    ConverseRequest, 
    ConverseResponse,
)

bedrock_client = boto3.client('bedrock-runtime')

app = FastAPI()

# Mimic what our rate limiter would look for
def rate_limit_allowed(request: Request) -> bool:
    identifier = request.headers.get('X-IDENTIFIER')
    print(f'Identifier is allowed')
    return True

# Define your API endpoints
@app.post("/chat/completions")
async def chat_completions(request: Request) -> ChatCompletionResponse:

    # Useful for seeing our auth headers
    rate_limit_allowed(request)

    payload = await request.json()
    response = completion(**payload)
    return ChatCompletionResponse(**response.model_dump())

@app.post("/model/{model_id}/converse")
async def converse(model_id: str, request: Request) -> ConverseResponse:

    # Useful for seeing our auth headers
    rate_limit_allowed(request)

    request_body = await request.json()
    request_body["modelId"] = model_id
    response = bedrock_client.converse(**request_body)
    return ConverseResponse(**response)

# Lets create a test client for our API server.
from fastapi.testclient import TestClient

test_client = TestClient(app)


## Call our API
Now we can call our API. Using a gateway, we can pass in authentication headers just like a normal endpoint. We can then use those headers to identify the correct usage plan and rate limit based off it. 

For lab purposes, we'll pass in an API key. In practice, this should use oAuth. API keys are essentially long lived secrets that aren't rotated. We don't recommend authentication this way, but understand that many API providers use API keys. With a gateway approach you can apply least priviledged access to the API key secret to just the gateway.

In [None]:
metering_identifier: str = 'whatever your identifier is'

async def test_chat_completions():
    completion_compatible_payload = {
        "model": "bedrock/anthropic.claude-3-sonnet-20240229-v1:0",
        "messages": [{ "content": "Hello, how are you?","role": "user"}]
    }

    return test_client.post(
        "/chat/completions", 
        json=completion_compatible_payload,
        headers={
            'X-IDENTIFIER': f'{metering_identifier}'
        }
    )

async def test_br_passthrough():
    model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
    br_payload = {
        "messages": [{ "content": [{'text': 'hello how are you?'}], "role": "user"}]
    }

    return test_client.post(
        f"/model/{model_id}/converse", 
        json=br_payload,
        headers={
            'X-IDENTIFIER': f'{metering_identifier}'
        }
    )


result = asyncio.run(test_chat_completions())
print(f'Chat Completion compatible response: {result.json()}')

print('\n--------------------------------\n')

result = asyncio.run(test_br_passthrough())
print(f'Bedrock passthrough response: {result.json()}')

# Bring it all together


<div style="background-color: #FEEFB3; color: #9F6000; padding: 15px; border-radius: 5px; border-left: 6px solid #9F6000; margin-bottom: 15px;">
<strong>⚠️ WARNING:</strong> You will need to have the platform stack deployed for these next steps and the necessary permissions to grab secrets from secrets manager 
</div>


Now that we understand what our LLM Gateway needs to look like, we can start calling it from our code. During the deployment of the platform, we configured a load balancer that points to our LLM gateway. The gateway is fronted by Cognito for authentication. To invoke our gateway, we'll grab the load balancers url & pull our secret to construct a request to it. 

The first step is to grab the load balancers name. You can find it in the console or use boto3 below

In [None]:
import boto3
from typing import List, Dict

# Initialize the client
elbv2 = boto3.client('elbv2')

# List all load balancers
load_balancers: List[Dict] = elbv2.describe_load_balancers()['LoadBalancers']

# Get the load balancer name. It should be prefixed by k8s-platform
dns_name: str = [lb['DNSName'] for lb in load_balancers if 'k8s-platform' in lb['LoadBalancerName']][0]
dns_name

# Get our Secret for Auth
Now we need to get our secret containing our machine 2 machine client auth token. In the deployment script we've set up two client applications in cognito. The first one is for users and teh second one is for machine 2 machine oAuth.

In [None]:
# Get our Secret for Auth
import json
# The name should be prefixed by whatever you named your stack prefix followed by -m2m-credentials
secret_name: str = 'agent-platform-123-m2m-credentials'
secret = boto3.client('secretsmanager').get_secret_value(SecretId=secret_name)
secret_value: str = secret['SecretString']

# Parse the secret value
secret_value_dict: Dict = json.loads(secret_value)


# Call API
There are two main ways you interface with the proxy. (1) direct https requests and (2) through the SDKs. Lets start with the direct. 

At this point, our gateway is just a normal https endpont. We can query it using httpx or the requests package. The API is authenticated using oAuth so we need to pass in the Authentication header with the value 'Bearer <token>'. 

Lets get started

In [None]:
import requests

def get_token():
    client_id = secret_value_dict.get('client_id')
    client_secret = secret_value_dict.get('client_secret')
    token_url = secret_value_dict.get('token_url')
    scopes = secret_value_dict.get('scopes')

    data={
        'grant_type': 'client_credentials',
        'client_id': client_id,
        'client_secret': client_secret,
        'scope': scopes
    }

    response = requests.post(
        token_url,
        headers={'Content-Type': 'application/x-www-form-urlencoded'},
        data=data
    )

    token_data = response.json()
    # Extract the access token
    token = token_data['access_token']
    return token

def construct_auth_header(token: str) -> str:
    return f'Bearer {token}'

m2m_token = get_token()
auth_header = construct_auth_header(m2m_token)


In [None]:
# Make a request.
model_id = "anthropic.claude-3-sonnet-20240229-v1:0"
bedrock_payload = {
    "messages": [{ "content": [{'text': 'hello how are you?'}], "role": "user"}]
}

response = requests.post(
    f'http://{dns_name}/llm-gateway/model/{model_id}/converse',
    headers={ 'Authorization': auth_header },
    json=bedrock_payload,
    timeout=5
)

print(response.json())

Excellent! We just called our LLM Gateway! We can make this even better by wrapping the request in our own type so we can get back structured output using the converters from our agent platform.

In [None]:
from agentic_platform.core.converter.llm_request_converters import ConverseRequestConverter
from agentic_platform.core.converter.llm_response_converters import ConverseResponseConverter
from agentic_platform.core.models.llm_models import LLMRequest, LLMResponse, Message

def call_gateway(request: LLMRequest) -> LLMResponse:
    # Convert the request to the gateway format
    gateway_request = ConverseRequestConverter.convert_llm_request(request)

    # Drop the model_id because it's pulled from the request url.
    model_id = gateway_request.pop('modelId')

    # Call the gateway
    response = requests.post(
        f'http://{dns_name}/llm-gateway/model/{model_id}/converse',
        headers={'Authorization': auth_header},
        json=gateway_request,
        timeout=5
    )

    # Convert the response to our own type
    return ConverseResponseConverter.to_llm_response(response.json())

request: LLMRequest = LLMRequest(
    model_id="anthropic.claude-3-sonnet-20240229-v1:0",
    hyperparams={'temperature': 0.5},
    messages=[ Message(role="user", text="Hello, how are you?") ],
    system_prompt="You are a helpful assistant."
)

response: LLMResponse = call_gateway(request)

print(response.model_dump_json(indent=2))



# Call through the SDK
There's lots of reasons to call our gateway through the SDK. Many frameworks like LangChain and Pydantic accept a configured boto3 client as input for one. Secondly, it can be nice to just use the SDK. To get the boto3 client to work with our proxy we need to make a couple configuration changes to it.

## Configuration changes
**Signing Requests**: 

Botocore uses your credentials to sign the request using sigV4. However, we want to use our oAuth credentials so we'll need to configure our client not to sign requests. If you do, the client won't respect our authentication headers when we pass them in. 

**Configure endpoint**

Boto3 allows you to specify a proxy url by passing in endpoint_url. We'll be using our gateway endpoint for that url

**Register Event**

Boto3 allows you to register "events" before requests get sent out. We can use this to pass in our auth token as an Authentication header before boto3 sends the request to our proxy. For this lab, we'll just use our bearer token. In practice you'll want to use contextvars (python package) to store user's oAuth tokens for retrieval. This way we can propagate the users identity cleanly through our system and only access the token when we need it (like calling our gateway)

In [None]:
from botocore.config import Config
import botocore
from contextvars import ContextVar
from functools import partial

# Set up our contextvar to store our token
token_var: ContextVar = ContextVar('token')

# This would normally be done by a middleware right after authentication.
token_var.set(m2m_token)

# Configure a function to add our auth token to the request.
def _add_headers(request, **kwargs):
    # Get the token from the contextvar
    token: str = token_var.get()
    request.headers['Authorization'] = f"Bearer {token}"

# Keep the request unsigned. Our agent doesn't have an IAM role to sign the request so this will fail without this.
config = Config(
    retries={'max_attempts': 1},
    signature_version=botocore.UNSIGNED
)

# Create our client and specify the endpoint url
endpoint_url = f'http://{dns_name}/llm-gateway'
client = boto3.client(
    'bedrock-runtime',
    endpoint_url=endpoint_url,
    config=config
)

# Add API key header to requests using partial
client.meta.events.register_first(
    'before-send.bedrock-runtime.Converse',
    partial(_add_headers)
)

In [None]:
# Now call our gateway through the configured client.
bedrock_payload = {
    "modelId": "anthropic.claude-3-sonnet-20240229-v1:0",
    "messages": [{ "content": [{'text': 'hello how are you?'}], "role": "user"}]
}

response = client.converse(**bedrock_payload)
print(response)

# We have a proxy!
Nice we have a proxy that works with both regular requests and our boto3 client! Lastly, lets see how this works using frameworks

In [None]:
from pydantic_ai import Agent
from pydantic_ai.providers.bedrock import BedrockProvider
from pydantic_ai.models.bedrock import BedrockConverseModel

import nest_asyncio
nest_asyncio.apply()


# Configure the provider to use our boto3 client.
model: BedrockConverseModel = BedrockConverseModel(
    model_name="anthropic.claude-3-sonnet-20240229-v1:0",
    provider=BedrockProvider(bedrock_client=client)
)

# model.request()

# Create an agent with the model.
agent = Agent(system_prompt='You are a helpful assistant.', model=model)

response = agent.run_sync("Hello, how are you?")

print(response.data)

# Use LangChain
Next we'll show you how to use our gateway with LangChain. It's as simple as passing in our prebuild client and running the command.

In [None]:
from langchain_aws import ChatBedrockConverse
from langchain_core.messages import AIMessage, BaseMessage

llm = ChatBedrockConverse(
    model="anthropic.claude-3-sonnet-20240229-v1:0",
    temperature=0,
    client=client
)

# Invoke the llm
messages = [
    ("system", "You are a helpful assistant."),
    ("human", "Hello! How are you today?"),
]

response: AIMessage =llm.invoke(messages)
print(response)

# Conclusion
In this lab, we went through the concept of an LLM gateway, constructed one locally and then called our LLM gateway deployed in our agent platform! We successfully converted the gateway responses to our types and also showed how the gateway integrates into existing frameworks. 

In the next lab, we'll be discussing distributed long term memory implementations