In [None]:
import requests
import concurrent.futures
import time

# Define the API endpoint you want to test
API_ENDPOINT_ASYNC_VER = "http://localhost:8000/async/student"  # 2.64s
API_ENDPOINT_SYNC_VER = "http://localhost:8000/sync/student"  # 2.30s

# Define the number of concurrent threads
# 쓰레드를 10000개로 설정하니까 내 노트북이 셧다운되네......?
CONCURRENT_THREADS = 10000

# Function to send a request to the API
def send_api_request():
    try:
        response = requests.get(API_ENDPOINT_SYNC_VER)
        print(f"Response Code: {response.status_code}")
    except requests.exceptions.RequestException as e:
        print(f"Request failed: {e}")

# Main function to execute the stress test
def stress_test():
    start_time = time.time()
    
    # Use ThreadPoolExecutor to manage threads
    with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENT_THREADS) as executor:
        # Submit tasks to the executor
        futures = [executor.submit(send_api_request) for _ in range(CONCURRENT_THREADS)]
        
        # Wait for all tasks to complete
        for future in concurrent.futures.as_completed(futures):
            future.result()

    end_time = time.time()
    print(f"Stress test completed in {end_time - start_time} seconds")

if __name__ == "__main__":
    stress_test()

In [None]:
"""
The error RuntimeError: asyncio.run() cannot be called from a running event loop typically occurs when you attempt to use asyncio.run() in an environment where an event loop is already running. This is common in interactive environments like Jupyter Notebook or certain Python IDEs. Here are some solutions to handle this situation:
==>> jupyter notebook에서는 이미 event loop가 동작 중이라 기본적으로는 추가로 실행하는 게 안 되네...


"""


import asyncio
import aiohttp
import time
import nest_asyncio


# Apply nest_asyncio to allow nested event loops
# jupyter notebook이랑 같이 쓰려면 이렇게 처리해주면 된다.
nest_asyncio.apply()

# Define the API endpoint you want to test
API_ENDPOINT_ASYNC_VER = "http://localhost:8000/async/student"  # 10만 개 호출 -> 37.158초 -> 22.978초 -> 23.111초 -> 22.219초
API_ENDPOINT_SYNC_VER = "http://localhost:8000/sync/student"  # 10만 개 호출 -> 26.680초 -> 21.848초 -> 21.100초 -> 21.098초

# Define the number of concurrent requests
CONCURRENT_REQUESTS = 100000

# Function to send a single request
async def send_request(session, url):
    try:
        async with session.get(url) as response:
            status = response.status
            print(f"Response status: {status}")
            return await response.text()
    except Exception as e:
        print(f"Request failed: {e}")

# Main function to perform stress testing
async def stress_test():
    start_time = time.time()
    
    # Create an aiohttp session
    async with aiohttp.ClientSession() as session:
        # Create a list of tasks for sending requests
        tasks = [send_request(session, API_ENDPOINT_ASYNC_VER) for _ in range(CONCURRENT_REQUESTS)]
        
        # Run the tasks concurrently
        await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"Stress test completed in {end_time - start_time} seconds")

# Run the stress test using asyncio
if __name__ == "__main__":
    asyncio.run(stress_test())