In [1]:
import os
import time
import json
from openai import AzureOpenAI
from threading import Semaphore
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
from tenacity import (
    retry,
    stop_after_attempt,
    wait_fixed,
    wait_random,
)

# Data Format Example

The input data, `message_sequences`, should be structured as a list of message dictionaries, where each message has a `role` (either "system" or "user") and `content`. Here’s a very simple example:

In [2]:
message_sequences = []
for i in range(1, 60):
    message_sequences.append([
    {"role": "system", "content": "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
     {"role": "user", "content": f"What is {i} + {i}?"}
    ])
message_sequences[:3]

[[{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 1 + 1?'}],
 [{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 2 + 2?'}],
 [{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 3 + 3?'}]]

In [3]:
class APIPlayer:
    def __init__(self, model_name, temperature=1.0, max_tokens=15, rate_limit=20):
        self.client = AzureOpenAI(
                # https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning
                api_version="2023-07-01-preview",
                # https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
               azure_endpoint= "https://chm-openai.openai.azure.com/",
            )
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.rate_limit = rate_limit  # max requests per minute, adjust according to your limit
        self.semaphore = Semaphore(self.rate_limit)
    def handle_retry_error(retry_state):
        print(f"All retry attempts failed for: {retry_state.args[0]}\nReturning None for this request.")
        return '{"content": "None"}'  # Custom behavior after all retries fail

    @retry(wait=wait_fixed(2) + wait_random(0, 2),
            stop=stop_after_attempt(2),
            before_sleep= lambda retry_state: print("Retrying..."),
            retry_error_callback=handle_retry_error)
    def get_response(self, system_user_message: List):
            with self.semaphore:
                try:
                    response = self.client.chat.completions.create(
                                model=self.model_name,
                                messages= system_user_message,
                                temperature= self.temperature,
                                max_tokens=self.max_tokens,
                                response_format= { "type": "json_object" },
                            )
                    json_response = response.choices[0].message.content
                    json.loads(json_response)  # Attempt to parse the JSON, will raise an error if not valid JSON
                    time.sleep(60 / self.rate_limit)  # Pause to respect the rate limit to evenly distribute requests over time
                    return json_response
                except json.JSONDecodeError:
                    print(f"Invalid JSON response for message {system_user_message}")
                    raise  # This re-raises the last exception, triggering a retry
                except Exception as e:
                    print(f"Error while processing: {str(e)}")
                    raise
         
    def get_responses_parallel(self, messages_list):
        results = []
        started = time.time()
        with ThreadPoolExecutor(max_workers=self.rate_limit) as executor:
            future_to_message = {executor.submit(self.get_response, message): message for message in messages_list}
            for future in as_completed(future_to_message):
                message = future_to_message[future]
                try:
                    result = future.result()
                    # Extract the user's query from the message (response format is a JSON string like '{"content": "4"}')
                    user_query = next((msg['content'] for msg in message if msg['role'] == 'user'), "Unknown query")
                    result_content = json.loads(result)['content'] # if result is 
                    results.append({"input": user_query, "content": result_content})
                except Exception as e:
                      results.append({"input": "Error", "content": f"Error processing message: {str(e)}"})
        print(f"Total time taken: {time.time() - started} seconds")
        return results               

Set `AZURE_OPENAI_API_KEY` in your environment variables 

In [4]:
# os.environ["AZURE_OPENAI_API_KEY"] = "your_token"

## or in .env file like this:
#AZURE_OPENAI_API_KEY = your_token

Ensure you stay within the rate limit for the selected model

In Our Azure AI settings, gpt-35-turbo has a Rate limit (Requests per minute) of 720

In [17]:
player = APIPlayer(model_name = "gpt-35-turbo", temperature = 1.0, max_tokens = 20, rate_limit = 100) 

In [18]:
results_parallel = player.get_responses_parallel(message_sequences)

Total time taken: 1.7316970825195312 seconds


In [19]:
results_parallel[:5]

[{'input': 'What is 9 + 9?', 'content': '18'},
 {'input': 'What is 4 + 4?', 'content': '8'},
 {'input': 'What is 11 + 11?', 'content': '22'},
 {'input': 'What is 5 + 5?', 'content': '10'},
 {'input': 'What is 6 + 6?', 'content': '12'}]

As u can notice, the order of the responses is not the same as the order of the queries because the responses are processed in parallel

### Now a one-per-one request method to compare it with the parallel method

In [9]:
class APIPlayer_simpler:
    def __init__(self, model_name, temperature=1.0, max_tokens=15):
        self.client = AzureOpenAI(
                # https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning
                api_version="2023-07-01-preview",
                # https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
               azure_endpoint= "https://chm-openai.openai.azure.com/",
            )
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens

    def get_response(self, system_user_message: List):
                try:
                    response = self.client.chat.completions.create(
                                model=self.model_name,
                                messages= system_user_message,
                                temperature= self.temperature,
                                max_tokens=self.max_tokens,
                                response_format= { "type": "json_object" },
                            )
                    json_response = response.choices[0].message.content
                    json.loads(json_response)  # Attempt to parse the JSON, will raise an error if not valid JSON
                    return json_response
                except json.JSONDecodeError:
                    print(f"Invalid JSON response for message {system_user_message}\nReturning None for this request.")
                    return '{"content": "None"}'  # This re-raises the last exception, triggering a retry
                except Exception as e:
                    print(f"Error while processing: {str(e)}")
                    raise
                 

In [10]:
player2 = APIPlayer_simpler(model_name = "gpt-35-turbo", temperature = 1.0, max_tokens = 20)

In [11]:
started = time.time()
results_2 = []
for i in message_sequences:
    results_2.append(player2.get_response(i))
print(f"Time taken: {time.time() - started}")


Time taken: 14.707165002822876


In [12]:
results_2[:5]

['{"content": "2"}',
 '{"content": "4"}',
 '{"content": "6"}',
 '{"content": "8"}',
 '{"content": "10"}']

#### As you can notice, the method processing one request at a time took approximately 14.7 seconds. While the parallel processing method significantly reduced this time to about 1.7 seconds, making it 8.6 times faster.