### Import Required Libraries
This cell imports the necessary Python libraries for asynchronous HTTP requests and performance measurement.

In [26]:
import requests
import time
import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor, as_completed

### Define Target URL and Sample Inputs
This cell sets the target inference endpoint URL and defines a list of input texts for sentiment analysis.

In [27]:
# API Endpoint URL (assuming Docker container is running and port 8000 is mapped)
API_URL_SINGLE = "http://localhost:8000/predict"
API_URL_BATCH = "http://localhost:8000/predict_batch"

### Define Asynchronous Request Function
This function sends a single asynchronous POST request to the inference API and returns the result.

In [28]:
# Sample texts for inference
sample_texts = [
    "This is a fantastic product! I'm really happy with it.",
    "I am incredibly disappointed with the service I received.",
    "The weather today is just okay, nothing special.",
    "What an amazing experience, I would recommend it to everyone!",
    "This is the worst movie I have ever seen.",
    "The food was decent, but the ambiance was lacking.",
    "HuggingFace simplifies NLP for developers.",
    "I'm feeling quite neutral about this situation.",
    "The quick brown fox jumps over the lazy dog.",
    "Pure excitement and joy after achieving the goal!"
] * 2 # Multiply to have more requests for parallel demo (20 requests)

print(f"Sending {len(sample_texts)} requests in total for each method.")


Sending 20 requests in total for each method.


### Define Main Function to Send All Requests in Parallel
This function builds and sends all the POST requests concurrently using asyncio and aiohttp.

In [29]:
# --- Synchronous Sequential Requests (for baseline) ---
print("\n--- Testing Synchronous Sequential Requests ---")
sequential_results = []
start_time = time.time()
for i, text in enumerate(sample_texts):
    payload = {"text": text}
    try:
        response = requests.post(API_URL_SINGLE, json=payload, timeout=10)
        response.raise_for_status() 
        result_json = response.json()
        sequential_results.append(result_json)
        # print(f"Req {i+1} | Text: \"{text[:30]}...\" | Resp: {result_json}")
    except requests.exceptions.RequestException as e:
        sequential_results.append({"error": str(e)})
        # print(f"Req {i+1} | Text: \"{text[:30]}...\" | Error: {e}")
end_time = time.time()
print(f"Synchronous sequential requests completed in {end_time - start_time:.4f} seconds.")
# print("Sequential Results:", sequential_results[:5]) # Print first 5 results


--- Testing Synchronous Sequential Requests ---
Synchronous sequential requests completed in 0.4753 seconds.


### Run the Parallel Request Demo
This block initiates the event loop and runs the asynchronous function, printing each model prediction along with total time taken.

In [30]:
# --- Parallel Requests using concurrent.futures.ThreadPoolExecutor ---
print("\n--- Testing Parallel Requests with ThreadPoolExecutor ---")
threadpool_results = []
def send_request_threadpool(text_payload, req_id):
    try:
        response = requests.post(API_URL_SINGLE, json=text_payload, timeout=15) 
        response.raise_for_status()
        # print(f"ThreadPool Req {req_id} Success: {response.json()}")
        return response.json()
    except requests.exceptions.Timeout:
        # print(f"ThreadPool Req {req_id} Error: Timeout")
        return {"error": "Timeout"}
    except requests.exceptions.RequestException as e:
        # print(f"ThreadPool Req {req_id} Error: {e}")
        return {"error": str(e)}
    


--- Testing Parallel Requests with ThreadPoolExecutor ---


In [31]:
start_time = time.time()
with ThreadPoolExecutor(max_workers=10) as executor: # Adjust max_workers as needed
    futures = [executor.submit(send_request_threadpool, {"text": text}, i+1) for i, text in enumerate(sample_texts)]
    for future in as_completed(futures):
        threadpool_results.append(future.result())
end_time = time.time()
print(f"ThreadPoolExecutor requests completed in {end_time - start_time:.4f} seconds.")
print("ThreadPool Results:", threadpool_results[:5]) # Print first 5 results

ThreadPoolExecutor requests completed in 6.3904 seconds.
ThreadPool Results: [{'label': 'POSITIVE', 'score': 0.9998810291290283}, {'label': 'NEGATIVE', 'score': 0.9997461438179016}, {'label': 'NEGATIVE', 'score': 0.9997548460960388}, {'label': 'NEGATIVE', 'score': 0.6382151246070862}, {'label': 'POSITIVE', 'score': 0.9998643398284912}]


In [32]:
# --- Parallel Requests using asyncio and aiohttp ---
print("\n--- Testing Parallel Requests with asyncio and aiohttp ---")
aiohttp_results = []
async def post_async(session, text_payload, req_id):
    try:
        async with session.post(API_URL_SINGLE, json=text_payload, timeout=aiohttp.ClientTimeout(total=15)) as response:
            response.raise_for_status()
            result = await response.json()
            # print(f"Asyncio Req {req_id} Success: {result}")
            return result
    except asyncio.TimeoutError:
        # print(f"Asyncio Req {req_id} Error: Timeout")
        return {"error": "Timeout"}
    except aiohttp.ClientResponseError as e:
        # print(f"Asyncio Req {req_id} Error: HTTP {e.status} - {e.message}")
        return {"error": f"HTTP {e.status} - {e.message}"}
    except aiohttp.ClientError as e:
        # print(f"Asyncio Req {req_id} Error: ClientError - {e}")
        return {"error": f"ClientError - {str(e)}"}
    except Exception as e:
        # print(f"Asyncio Req {req_id} Error: Unexpected {type(e).__name__} - {e}")
        return {"error": f"Unexpected {type(e).__name__} - {str(e)}"}



--- Testing Parallel Requests with asyncio and aiohttp ---


In [33]:
async def main_async_requests():
    start_time_async = time.time()
    # Using a TCPConnector to limit concurrent connections from aiohttp side if needed,
    # though server-side Gunicorn workers handle true parallelism.
    connector = aiohttp.TCPConnector(limit_per_host=10) # Limit concurrent connections from this client
    async with aiohttp.ClientSession(connector=connector) as session:
        tasks = [post_async(session, {"text": text}, i+1) for i, text in enumerate(sample_texts)]
        # Use return_exceptions=True to gather all results even if some tasks fail
        results_from_gather = await asyncio.gather(*tasks, return_exceptions=True) 
        aiohttp_results.extend(results_from_gather)
    end_time_async = time.time()
    print(f"asyncio/aiohttp requests completed in {end_time_async - start_time_async:.4f} seconds.")
    print("Asyncio/aiohttp Results:", aiohttp_results[:5]) # Print first 5 results


In [34]:
# --- Summary of Results ---
await main_async_requests()

asyncio/aiohttp requests completed in 0.7252 seconds.
Asyncio/aiohttp Results: [{'label': 'POSITIVE', 'score': 0.9998810291290283}, {'label': 'NEGATIVE', 'score': 0.9997461438179016}, {'label': 'NEGATIVE', 'score': 0.6382151246070862}, {'label': 'POSITIVE', 'score': 0.9998643398284912}, {'label': 'NEGATIVE', 'score': 0.9997548460960388}]
