In [1]:
import time
import json
import tiktoken
import threading
import pandas as pd
from openai import AzureOpenAI
from threading import Semaphore
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
from tenacity import (
    retry,
    stop_after_attempt,
    wait_fixed,
    wait_random,
)

# Data Format Example

The input data, `message_sequences`, should be structured as a list of message dictionaries, where each message has a `role` (either "system" or "user") and `content`. Here’s a very simple example:

In [2]:
message_sequences = []
for i in range(1, 101):
    message_sequences.append([
    {"role": "system", "content": "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
     {"role": "user", "content": f"What is {i} + {i}?"}
    ])
message_sequences[:3]

[[{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 1 + 1?'}],
 [{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 2 + 2?'}],
 [{'role': 'system',
   'content': "You are a helpful AI assistant.  Always answer in the following json format: {'content': '2'}."},
  {'role': 'user', 'content': 'What is 3 + 3?'}]]

## Custom TokenSemaphore for Token Rate Limiting

The Python standard Semaphore from the threading module starts with an internal counter, which you specify upon creation. This counter decrements each time acquire() is called and increments when release() is called. However, the standard Semaphore doesn't support acquiring or releasing more than one unit of the counter at a time, which means it can't directly manage multiple tokens per request out-of-the-box if those requests consume a **variable** number of tokens.


The following custom class allows you to specify how many tokens to acquire or release at a time, giving you the flexibility needed for handling variable token counts per API request.

In [3]:
class TokenSemaphore:
    def __init__(self, max_tokens):
        self.tokens = max_tokens
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)

    def acquire(self, required_tokens):
        with self.lock:
            while self.tokens < required_tokens:
                self.condition.wait()
            self.tokens -= required_tokens

    def release(self, released_tokens):
        with self.lock:
            self.tokens += released_tokens
            self.condition.notify_all()

In [4]:
class APIRequester:
    def __init__(self, model_name, temperature=1.0, max_tokens=15, rate_limit=20, token_rate_limit=8000):
        self.client = AzureOpenAI(
                # https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning
                api_version="2024-02-15-preview",
                # https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
               azure_endpoint= "https://chm-openai.openai.azure.com/",
            )
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens
        self.rate_limit = rate_limit  # max requests per minute, adjust according to your limit
        self.request_semaphore = Semaphore(self.rate_limit)
        self.token_rate_limit = token_rate_limit  # Token rate limit per minute
        self.token_semaphore = TokenSemaphore(self.token_rate_limit)  # Custom semaphore for token limit
    
    # Adapted from https://github.com/openai/openai-cookbook/blob/970d8261fbf6206718fe205e88e37f4745f9cf76/examples/api_request_parallel_processor.py#L339-L389
    def num_tokens_consumed_from_request(
            self,
            request_json: List,
    ):
        # for gpt-4 / gpt-3.5-turbo, the encoding is "cl100k_base"
        encoding = tiktoken.get_encoding("cl100k_base")
        n = 1 # number of completions 
        completion_tokens = n * 500 # self.max_tokens 
        # chat completions
        num_tokens = 0
        try:
            for message in request_json:
                num_tokens += 4  # every message follows <im_start>{role/name}\n{content}<im_end>\n
                for key, value in message.items():
                    num_tokens += len(encoding.encode(value))
                    if key == "name":  # if there's a name, the role is omitted
                        num_tokens -= 1  # role is always required and always 1 token
            num_tokens += 2  # every reply is primed with <im_start>assistant
            return num_tokens + completion_tokens
        except KeyError:
            print(f"Invalid request JSON: {request_json}")
            return 0
        
    def handle_last_retry_error(retry_state):
        print(f"All retry attempts failed for: {retry_state.args[0]}\nReturning None for this request.")
        return None  # Custom behavior after all retries fail

    @retry(wait=wait_fixed(2) + wait_random(0, 2),
            stop=stop_after_attempt(2),
            before_sleep= lambda retry_state: print("Retrying..."),
            retry_error_callback=handle_last_retry_error)
    def get_response(self, system_user_message: List):
        estimated_tokens = self.num_tokens_consumed_from_request(system_user_message)
        # Acquire both semaphores to manage requests and tokens
        self.request_semaphore.acquire()
        try:
            self.token_semaphore.acquire(estimated_tokens)  # Acquire tokens
            try:
                response = self.client.chat.completions.create(
                            model=self.model_name,
                            messages=system_user_message,
                            temperature=self.temperature,
                            max_tokens=self.max_tokens,
                            response_format={"type": "json_object"},
                        )
                json_response = response.choices[0].message.content
                # comment this line for now to save the results with the new lines for better visualization in the saved dataframe
                #json_response = json.loads(json_response)

                json.loads(json_response) # Attempt to parse the JSON, will raise an error if not valid JSON
                return json_response
            except json.JSONDecodeError:
                print(f"Invalid JSON response for message {system_user_message}")
                raise  # This re-raises the last exception, triggering a retry
            except Exception as e:
                print(f"Error while processing: {str(e)}")
                raise
            finally:
                self.token_semaphore.release(estimated_tokens)  # Release tokens after the request
        finally:
            self.request_semaphore.release()  # Release request semaphore after handling tokens
            time.sleep(60 / self.rate_limit)  # Pause to respect the rate limit to evenly distribute requests over time
         
    def get_responses_parallel(self, messages_list):
        results = []
        started = time.time()
        with ThreadPoolExecutor(max_workers=self.rate_limit) as executor:
            future_to_message = {executor.submit(self.get_response, message): message for message in messages_list}
            for future in as_completed(future_to_message):
                message = future_to_message[future]
                try:
                    result = future.result()
                    # Extract the user's query from the message (response format is a JSON string like '{"content": "4"}')
                    user_query = next((msg['content'] for msg in message if msg['role'] == 'user'), "Unknown query")
                    results.append({"input": user_query, "content": result})
                except Exception as e:
                      results.append({"input": "Error", "content": f"Error processing message: {str(e)}"})
        print(f"Total time taken: {time.time() - started} seconds")
        return results 
            

Ensure you stay within the requests and tokens rate limit for the selected model

In Our Azure AI settings, gpt-35-turbo has a Request Rate limit (Requests per minute) of 720 and Tokens Rate limit of 120000

In [5]:
gpt35_turbo_api = APIRequester(model_name = "gpt-35-turbo", temperature = 1.0, max_tokens = 20, rate_limit = 100, token_rate_limit = 10000) 

In [18]:
results_parallel = gpt35_turbo_api.get_responses_parallel(message_sequences)

Total time taken: 2.0825588703155518 seconds


As u can notice, the order of the responses is not the same as the order of the queries, because the requests are processed in parallel

In [13]:
df_parallel = pd.DataFrame(results_parallel)
df_parallel.to_csv("results_parallel.csv", index=False)
df_parallel

Unnamed: 0,input,content
0,What is 8 + 8?,"{""content"": ""16""}"
1,What is 3 + 3?,"{""content"": ""6""}"
2,What is 9 + 9?,"{""content"": ""18""}"
3,What is 10 + 10?,"{""content"": ""20""}"
4,What is 7 + 7?,"{""content"": ""14""}"
...,...,...
95,What is 85 + 85?,"{""content"": ""170""}"
96,What is 91 + 91?,"{""content"": ""182""}"
97,What is 95 + 95?,"{\n ""content"": ""190""\n}"
98,What is 100 + 100?,"{""content"": ""200""}"


### Now a one-per-one request method to compare it with the parallel method

In [8]:
class APIRequesterSimpler:
    def __init__(self, model_name, temperature=1.0, max_tokens=15):
        self.client = AzureOpenAI(
                # https://learn.microsoft.com/en-us/azure/ai-services/openai/reference#rest-api-versioning
                api_version="2023-07-01-preview",
                # https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/create-resource?pivots=web-portal#create-a-resource
               azure_endpoint= "https://chm-openai.openai.azure.com/",
            )
        self.model_name = model_name
        self.temperature = temperature
        self.max_tokens = max_tokens

    def get_response(self, system_user_message: List):
                try:
                    response = self.client.chat.completions.create(
                                model=self.model_name,
                                messages= system_user_message,
                                temperature= self.temperature,
                                max_tokens=self.max_tokens,
                                response_format= { "type": "json_object" },
                            )
                    json_response = response.choices[0].message.content
                    json.loads(json_response)  # Attempt to parse the JSON, will raise an error if not valid JSON
                    return json_response
                except json.JSONDecodeError:
                    print(f"Invalid JSON response for message {system_user_message}\nReturning None for this request.")
                    return '{"content": "None"}'  # This re-raises the last exception, triggering a retry
                except Exception as e:
                    print(f"Error while processing: {str(e)}")
                    raise
                 

In [9]:
gpt35_turbo_api2 = APIRequesterSimpler(model_name = "gpt-35-turbo", temperature = 1.0, max_tokens = 20)

In [10]:
started = time.time()
results_2 = []
for i in message_sequences:
    results_2.append(gpt35_turbo_api2.get_response(i))
print(f"Time taken: {time.time() - started}")


Time taken: 18.58565092086792


#### As you can notice, processing these requests one by one took about 18.6 seconds. However, using the parallel processing method, this time was significantly reduced to approximately 2.6 seconds, making it 7 times faster.

