# APIM Backend Tester
I created this code to help test different approaches to circuit breakers and backends. This will also create and destroy resources for you to quickly test.

1. Create APIM Management Resource
1. Create several Azure OpenAI Resources
    > TODO: This could be extended to also work with AI Services
1. Import the OpenAI policy into APIM
1. Make sure Managed Identity is set on the APIM resource

Included:
- Auto creation of deployments in AOAI resource
- Capacity updates 
    - Useful for when you want to test 429s and the impact of load balancing
- Creation of backends and circuit breakers
- Policy creation and update
- Load tester

In [None]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
import requests,json,os
import pandas as pd
import time
import html
from multiprocessing import Pool
import collections as collection
import random
import copy

# from dotenv import load_dotenv
# load_dotenv()

In [None]:
# Update the variables with your values or create a .env and load them
apimResourceGroupName='rg-dai-use2'
apimServiceName='apim-dai-use2'
apimApiName='openai-api'

# Naming conventions used for the backends
backendPrefix='backend-openai-'
loadBalancerPrefix="lb-"
apimEndpoint=f"https://{apimServiceName}.azure-api.net/"
CAPACITY_MAX=10

In [None]:
# Get the authorization token to use to update resources
print(time.strftime("%Y-%m-%d %H:%M:%S"))

# credential = InteractiveBrowserCredential(tenantId=tenantId)
credential = DefaultAzureCredential()
token = credential.get_token("https://management.azure.com/.default")
headers= {'Authorization': f'Bearer {token.token}', 'Content-Type': 'application/json'}
# Print the time it will expire in yyyy-mm-dd hh:mm:ss
print("Expires on:",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(token.expires_on)))
# Check if the token is expired
if time.time() > token.expires_on:
    print("Token expired")

# Get the Subscription
Either use this or hard code your own

In [None]:
# Get the first subscription
url = "https://management.azure.com/subscriptions?api-version=2020-01-01"
response = requests.get(url, headers=headers)
response.raise_for_status()
subscriptionId = response.json()['value'][0]['subscriptionId']
print(f"Subscription ID: {subscriptionId}")

In [None]:
# Get the apimKey from the apim service
url = f"https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}/subscriptions/master/listSecrets?api-version=2022-08-01"
response = requests.post(url, headers=headers)
response.raise_for_status()
apimKey = response.json()['primaryKey']
if not apimKey:
    raise ValueError("APIM Key not found")


In [None]:
# Load the accounts from the subscription
accounts=[]

rgs = requests.get(f"https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups?api-version=2022-12-01",headers=headers).json()
for rg in rgs['value']:
    resourceGroupName=rg['name']
    response = requests.get(f"https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.CognitiveServices/accounts?api-version=2023-05-01",headers=headers)
    response.raise_for_status()
    if response.status_code == 200:
        for account in response.json()['value']:
            if account['kind'] == 'OpenAI':
                print(f"Running {account['name']}")
                endpoint=account["properties"]["endpoint"]
                accounts.append({'subscriptionId':subscriptionId,'resourceGroupName':resourceGroupName,'accountName':account['name'],'location':account['location'],'endpoint':endpoint})
print(f"Accounts: {accounts}")

# Define Models to Use
A lot of this depends on capacity but that is handled later. Use this as a guideline on the models you want to test

In [None]:
models=[]

models.append({'model':'gpt-35-turbo','version':'0125','kind':'OpenAI','skuName':'Standard'})
models.append({'model':'gpt-4o-mini','version':'2024-07-18','kind':'OpenAI','skuName':'GlobalStandard'})
models.append({'model':'gpt-4o','version':'2024-08-06','kind':'OpenAI','skuName':'GlobalStandard'})

In [None]:
# Load the capacities so we don't create resources that don't have quota
capacities=[]
for model in models:
    uri=f"https://management.azure.com/subscriptions/{subscriptionId}/providers/Microsoft.CognitiveServices/modelCapacities?api-version=2024-04-01-preview&modelFormat={model['kind']}&modelName={model['model']}&modelVersion={model['version']}"
    items=[]

    while uri:
        request=requests.get(uri, headers=headers,timeout=15)
        
        #loop through the value attribute of the response and generate embeddings for each content
        if 200 == request.status_code:
            response=request.json()
            for val in response['value']:
                if model['skuName'] == val['name'] and val['name'] != 'GlobalBatch' and val['properties']['availableCapacity'] > 0:
                    item={}
                    item['id']=val['id']
                    item['name']=val['name']
                    item['location']=val['location']
                    item['modelName']=model['model']
                    item['modelVersion']=model['version']
                    item['modelFormat']=model['kind']
                    item['availableCapacity']=val['properties']['availableCapacity']
                    capacities.append(item)

            #check if the odata.nextLink attribute is present in the response
            if 'nextLink' in response:
                uri=response['nextLink']
            else:
                uri=None
        else:
            print(request.status_code)
            uri=None


# Create OpenAI Deployments

In [None]:
deployments=[]

# For each available capacity, deploy the model
for capacity in capacities:
    for account in accounts:
        if account['location'] == capacity['location']:
            print(f"Deploying {capacity['modelName']} {capacity['modelVersion']} to {account['accountName']} in {account['location']}")
            url = f"https://management.azure.com/subscriptions/{account['subscriptionId']}/resourceGroups/{account['resourceGroupName']}/providers/Microsoft.CognitiveServices/accounts/{account['accountName']}/deployments/{capacity['modelName']}?api-version=2023-05-01"
            data = {
                "sku":{"name":capacity['name'],"capacity":CAPACITY_MAX},
                "properties": {
                    "model": {
                        "name": capacity['modelName'],
                        "version": capacity['modelVersion'],
                        "format": capacity['modelFormat']
                    }
                }
            }
            response = requests.put(url, headers=headers, json=data)
            print("Response:",response.status_code)
            deployments.append({'subscriptionId':account['subscriptionId'],'resourceGroupName':account['resourceGroupName'],'accountName':account['accountName'],'deploymentName':capacity['modelName'],'model':capacity['modelName'],'version':capacity['modelVersion'],'skuName':capacity['name'],'location':account['location'],'provisioningState':'Creating'})

# Create a copy of the deployments
deployments_copy = deployments.copy()

# For each deployment run until the provisioningState is succeeded
while deployments_copy:
    for deployment in deployments:
        url = f"https://management.azure.com/subscriptions/{deployment['subscriptionId']}/resourceGroups/{deployment['resourceGroupName']}/providers/Microsoft.CognitiveServices/accounts/{deployment['accountName']}/deployments/{deployment['model']}?api-version=2023-05-01"
        response = requests.get(url, headers=headers)
        print("Checking:",deployment['accountName'],"Model:",deployment['model'],"Response:",response.status_code)
        if response.status_code == 200:
            deployment['provisioningState'] = response.json()['properties']['provisioningState']
            if deployment['provisioningState'] == 'Succeeded' or deployment['provisioningState'] == 'Failed':
                print(f"Deployment {deployment['deploymentName']} is {deployment['provisioningState']}")
                deployments_copy.remove(deployment)
        else:
            deployments_copy.remove(deployment)
    time.sleep(1)

# Assign the Managed ID of APIM to the accounts

In [None]:
url=f"https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}?api-version=2023-03-01-preview"
response = requests.get(url, headers=headers)
print("Response:",response.status_code)
# response.raise_for_status()
apimService = response.json()
# Get the identity.principalId
principalId = apimService['identity']['principalId']
print(f"Principal ID: {principalId}")


for account in accounts:
    # create a new guid
    guid = os.urandom(16).hex()
    url=f"https://management.azure.com/subscriptions/{account['subscriptionId']}/resourceGroups/{account['resourceGroupName']}/providers/Microsoft.CognitiveServices/accounts/{account['accountName']}/providers/Microsoft.Authorization/roleAssignments/{guid}?api-version=2022-04-01"

    data = {
        "properties": {
            "roleDefinitionId": f"/subscriptions/{subscriptionId}/providers/Microsoft.Authorization/roleDefinitions/5e0bd9bd-7b93-4f28-af87-19fc36ad61bd",
            "principalId": principalId
        }
    }
    response = requests.put(url, headers=headers, json=data)
    print("Response:",response.status_code)
    # 201 is created
    # 409 is already exists




# Create the Backends By Model

In [None]:
# Create backends for each deployment
for backend in deployments:
    backendName = f'{backendPrefix}{backend["location"]}-{backend["model"]}-{backend["version"]}'
    # Get the account from the list
    account = next((account for account in accounts if account['accountName'] == backend['accountName']), None)

    url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}/backends/{backendName}?api-version=2023-09-01-preview'
    data = {
        "properties": {
            "description": f"Backend for AOAI {backend['location']} {backend['model']} {backend['version']}",
            "url": f"{account['endpoint']}openai",
            "protocol": "http",
            "circuitBreaker": {
                "rules":[
                    {
                        "failureCondition":{
                            "count":1,
                            "errorReasons": [
                                "Server errors"
                            ],
                            "interval": "PT1S",
                            "statusCodeRanges":[
                                {
                                    "min":429,
                                    "max":500
                                }
                            ]
                        },
                        "name":"breakerRule",
                        "tripDuration":"PT30S",
                        "acceptRetryAfter": True
                    }
                ]
            }
        }
    }
    response = requests.put(url, headers=headers, json=data)
    response.raise_for_status()

    print(backendName,response.status_code)


# Create the Load Balancers
This just does a simple weight and priority for testing so nothing that would go to Production

In [None]:
# Create a load balancer
for model in models:
    deploymentName = model['model']
    endpoints=([d for d in deployments if d['deploymentName'] == deploymentName])
    loadBalancerName=f"{loadBalancerPrefix}{deploymentName}"

    i=1
    services=[]
    for endpoint in endpoints:
        # print(endpoint['skuName'])
        backendName = f'{backendPrefix}{endpoint["location"]}-{endpoint["model"]}-{endpoint["version"]}'
        service={
            "id":f"/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}/backends/{backendName}",
            "priority":"1",
            "weight":i
        }
        services.append(service)
        i+=1

    data = {
        "properties": {
            "description": f"Load balancer for {deploymentName}",
            "type":"Pool",
            "pool": {
                "services": services
            }

        }
    }
    print(data)

    url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}/backends/{loadBalancerName}?api-version=2023-09-01-preview'
    response = requests.put(url, headers=headers, json=data)
    # response.raise_for_status()
    print(loadBalancerName,response.status_code)    
    print(response.content)


# Create the APIM Policy
Update the base policy as needed but this has helper `##CONTENT##` inside so it will update quickly

In [None]:
# Load the apim policy template
with open("apim_rootPolicy_Template.xml") as f:
    policy_template = f.read()

content=[]
# Loop through the deployment name and create the case when logic
for model in models:
	deploymentName = model['model']
	content.append(f'<when condition="@(context.Variables.GetValueOrDefault<string>("path").ToLower().StartsWith("/deployments/{deploymentName}/"))">')
	content.append(f'<set-variable name="backend-id" value="{loadBalancerPrefix}{deploymentName}" />')
	content.append('</when>')
content='\n'.join(content)
# Replace the ##CONTENT## with the content
policy = policy_template.replace("##CONTENT##",content)
# Write the policy to a file
with open("apim_rootPolicy.xml","w") as f:
	f.write(policy)


In [None]:
# Reset the policy
url = f'https://management.azure.com/subscriptions/{subscriptionId}/resourceGroups/{apimResourceGroupName}/providers/Microsoft.ApiManagement/service/{apimServiceName}/apis/{apimApiName}/policies/policy?api-version=2022-08-01'
xmlPolicy = policy.replace('\n','').replace('\r','')

data = {
    "properties": {
        "format": "rawxml",
        "value": xmlPolicy
    }
}
response = requests.put(url, headers=headers, json=data)
print(response.status_code)
print(response.content)


# Test the Models

In [None]:
for model in models:
    # Call chat completion
    url=f"https://{apimServiceName}.azure-api.net/openai/deployments/{model['model']}/chat/completions?api-version=2024-06-01"
    print(url)
    apimHeaders={
        "Content-Type": "application/json",
        "api-key": apimKey
    }

    data = {
        "messages":[{"role":"user","content":"Tell me a joke"}]
    }
    startTime=time.time()
    response = requests.post(url, headers=apimHeaders, json=data)
    endTime=time.time()

    print(response.status_code)
    if "x-ms-region" in response.headers:
        print("Region:",response.headers['x-ms-region'])
        # print(response.headers)

    print('x-ratelimit-remaining-requests',response.headers.get('x-ratelimit-remaining-requests',None))
    print('x-ratelimit-remaining-tokens',response.headers.get('x-ratelimit-remaining-tokens',None))


    # Time taken
    print("Time taken:",endTime-startTime)
    print(response.json())


# Load Tester Run

In [None]:
def run_chat(deployments,data,timeout=15):
    deployment = deployments[random.randint(0,len(deployments)-1)]
    headers={"Content-Type":"application/json","api-key":deployment["key"]}
    url=f"{deployment['endpoint']}openai/deployments/{deployment['deploymentName']}/chat/completions?api-version=2024-06-01"

    status_code=None
    timeSpan=0
    detail=None
    region=None
    total_tokens=0
    resp_headers={}

    try:
        startTime=time.time()
        response=requests.post(url=url,json=data,headers=headers,timeout=timeout)
        endTime=time.time()
        # response.raise_for_status()

        status_code=response.status_code
        timeSpan=endTime-startTime

        region=None
        if response.status_code == 200:
            region=response.headers["x-ms-region"]
            detail=response.json().get('choices', [])[0].get('message', {}).get('content',None) 
        else:
            try:
                detail = response.json()
            except ValueError:
                detail = response.content        
        
        total_tokens=response.json().get('usage', {}).get('total_tokens', 0) if response.status_code == 200 else 0
        resp_headers=response.headers
    except Exception as e:
        if hasattr(e, 'status_code'):
            status_code=e.status_code
        detail=str(e)
        if hasattr(e, 'response') and hasattr(e.response, 'json') and callable(e.response.json):
            try:
                detail = e.response.json().get('error', {}).get('message')
            except ValueError:
                detail = e.response.content

    return {
            "status_code":status_code,
            "response":detail,
            "timeSpan":timeSpan,
            "region":region,
            "endpoint":deployment['endpoint'],
            "deploymentName":deployment['deploymentName'],
            "total_tokens": total_tokens,
            "headers":resp_headers
        }

threadCnt=5
testCnt=100

data = {
    "messages":[{"role":"user","content":"Tell me a joke and be wordy"}]
}

enabled=[]
for model in models:
    enabled.append({'deploymentName':model['model'],'endpoint':apimEndpoint,'key':apimKey})

runs = int(input("Enter times to run or 0 for threaded") or 0)
output=[]
if runs > 0:
    # Run this 10 times
    for i in range(runs):
        print(f"Test {i}")
        rt = run_chat(enabled,data)
        output+=[rt]
        print(rt)
else:
    # As the user for threadCnt or leave it as 5
    threadCnt = int(input(f"Enter the number of threads to run or leave blank for {threadCnt}:") or threadCnt)
    testCnt = int(input(f"Enter the number of runs to run or leave blank for {testCnt}:") or testCnt)
    p=Pool(threadCnt)
    start = time.time()
    print("Started:",time.strftime("%Y-%m-%d %H:%M:%S"))
    results = [p.apply_async(run_chat,(enabled,data) ) for i in range(testCnt)]
    output = [p.get() for p in results]
    end = time.time()
    print("Time taken (sec):",end-start)

    ct = collection.Counter([each_result['status_code'] for each_result in output])
    print(ct)
    ct = collection.Counter([each_result['region'] for each_result in output])
    print(ct)
    ct = collection.Counter([each_result['deploymentName'] for each_result in output])
    print(ct)



# Misc Testing

In [None]:
print("By Model")
ct = collection.Counter([(each_result['deploymentName'],each_result['status_code']) for each_result in output])
for key, count in ct.items():
    print(f"{key[1]}\t{count}\t{key[0]}")

print("")
print("By Region")
ct = collection.Counter([(each_result['region'],each_result['status_code']) for each_result in output])
for key, count in ct.items():
    print(f"{key[1]}\t{count}\t{key[0]}")

In [None]:
for item in output[:10]:
    print("endpoint:",item['endpoint'])
    print("deploymentName:",item['deploymentName'])
    if item['status_code'] == 200:
        # print(json.dumps(item['response'],indent=4))
        print("response:",item['response'])
        print("total_tokens:",item['total_tokens'])
        print("timeSpan:",item['timeSpan'])
        print("region:",item['region'])
    else:
        print(item['status_code'])
        print(item['response'])
    if item['headers']:
        print('x-ratelimit-remaining-requests',item['headers'].get('x-ratelimit-remaining-requests',None))
        print('x-ratelimit-remaining-tokens',item['headers'].get('x-ratelimit-remaining-tokens',None))
    print("")    


In [None]:
# Get the outputs where the status_code is not 200
errors = [item for item in output if item['status_code'] != 200]
print(f"Errors: {errors}")

# Update Capacity

In [None]:
capacity=2
capacity=int(input(f"Enter the capacity or {capacity}:") or capacity)
for deployment in deployments:
    url = f"https://management.azure.com/subscriptions/{deployment['subscriptionId']}/resourceGroups/{deployment['resourceGroupName']}/providers/Microsoft.CognitiveServices/accounts/{deployment['accountName']}/deployments/{deployment['deploymentName']}?api-version=2023-05-01"
    data = {
        "sku":{"name":deployment['skuName'],"capacity":capacity},
        "properties": {
            "model": {
                "name": deployment['model'],
                "version": deployment['version'],
                "format": "OpenAI"
            }
        }
    }
    response = requests.put(url, headers=headers, json=data)
    print("Response:",response.status_code)

