# Agent: Python asyncio

## 기본 개념

asyncio는 Python의 비동기 프로그래밍 라이브러리이다. LLM(대규모 언어 모델) 사용 시 여러 요청을 효율적으로 처리하기 위해 매우 유용하다. 비동기 프로그래밍은 I/O 집약적인 작업에서 성능을 크게 향상시킬 수 있다.

## 1. asyncio와 async/await를 이용한 비동기 프로그래밍

`async def`와 `asyncio.run`은 Python의 비동기 프로그래밍에서 중요한 요소이다.

## async def

`async def`는 비동기 함수(코루틴)를 정의하는 구문이다. 일반 함수와 달리 이 함수는 실행 중 `await` 키워드를 사용하여 다른 비동기 작업이 완료될 때까지 실행을 일시 중단한다.

In [1]:
import asyncio

async def fetch_data():
    print("데이터 가져오기 시작")
    # 비동기적으로 I/O 작업이 완료될 때까지 대기
    await asyncio.sleep(2)  # 네트워크 요청이나 파일 I/O 같은 작업을 시뮬레이션
    print("데이터 수신 완료")
    return "가져온 데이터"

## asyncio.run()

`asyncio.run()`은 최상위 수준에서 비동기 함수를 실행하기 위한 함수이다.:

1. 새로운 이벤트 루프를 생성하고
2. 지정된 코루틴을 실행하고
3. 완료되면 이벤트 루프를 닫는다.

In [2]:
import asyncio

# 위 셀에서 정의한 fetch_data 함수를 사용한다.
async def main():
    result = await fetch_data()
    print(f"결과: {result}")

# 프로그램의 진입점에서 asyncio.run()으로 비동기 코드 실행
# Jupyter/Colab 환경에서는 asyncio.run()이 이미 실행 중인 이벤트 루프와 충돌할 수 있다.
# 따라서 top-level await를 사용하거나 아래와 같이 실행하는 것이 좋다.
await main()

데이터 가져오기 시작
데이터 수신 완료
결과: 가져온 데이터


## 2. 실제 작동 방식

비동기 프로그래밍이 어떻게 작동하는지 보여주는 실용적인 예제이다.

In [3]:
import asyncio
import time

async def task(name, delay):
    print(f"작업 {name} 시작")
    await asyncio.sleep(delay)  # I/O 작업 시뮬레이션
    print(f"작업 {name} 완료")
    return f"{name}의 결과"

async def main():
    start = time.time()
    
    # 여러 작업을 동시에 실행
    results = await asyncio.gather(
        task("A", 3),
        task("B", 2),
        task("C", 1)
    )
    
    end = time.time()
    print(f"\n모든 작업 완료. 소요 시간: {end - start:.2f}초")
    print(f"결과: {results}")

# 프로그램 실행
await main()

작업 A 시작
작업 B 시작
작업 C 시작
작업 C 완료
작업 B 완료
작업 A 완료

모든 작업 완료. 소요 시간: 3.00초
결과: ['A의 결과', 'B의 결과', 'C의 결과']


이 예제에서는 3개의 작업이 동시에 실행되며, 가장 긴 작업(3초)만큼만 기다리면 모든 작업이 완료된다. 순차적으로 실행했다면 3~6초가 걸린다. 

`asyncio.run()`(또는 `await`)을 사용하면 이벤트 루프 관리를 간소화할 수 있어 비동기 코드를 쉽게 작성할 수 있다. 

## 3. LLM과 asyncio의 결합 이유

LLM API 호출은 대개 네트워크 지연 시간이 발생하는 I/O 작업이다. 동기식 코드에서는 응답을 기다리는 동안 프로그램이 아무 작업도 수행하지 못하지만, 비동기 코드에서는 대기 시간 동안 다른 작업을 수행한다.

### LLM 사용을 위한 asyncio 기본 패턴

In [6]:
# aiohttp는 비동기 HTTP 클라이언트/서버 라이브러리이다.
%pip install -q aiohttp

Note: you may need to restart the kernel to use updated packages.


In [None]:
import asyncio
import aiohttp

async def call_llm_api(session, prompt, model="gpt-3.5-turbo"):
    """LLM API에 비동기 요청을 보내는 함수입니다."""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    json_data = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}]
    }
    
    async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=json_data) as response:
        result = await response.json()
        if "error" in result:
            return f"오류: {result['error']['message']}"
        return result["choices"][0]["message"]["content"]

async def process_multiple_prompts(prompts):
    """여러 프롬프트를 병렬로 처리하는 함수입니다."""
    async with aiohttp.ClientSession() as session: # ClientSession을 한 번만 생성하여 재사용합니다.
        tasks = [call_llm_api(session, prompt) for prompt in prompts]
        results = await asyncio.gather(*tasks)
        return results

# 실행 코드
prompts = [
    "파이썬의 장점을 설명해주세요.",
    "머신러닝이란 무엇인가요?",
    "비동기 프로그래밍의 이점은 무엇인가요?"
]

async def main():
    if API_KEY == "YOUR_OPENAI_API_KEY":
        print("API_KEY를 설정해주세요.")
        return
    
    results = await process_multiple_prompts(prompts)
    for prompt, result in zip(prompts, results):
        print(f"프롬프트: {prompt}\n응답: {result}\n")

# 프로그램 실행
await main()

## 4. 고급 패턴: 속도 제한 및 재시도

API에는 보통 분당 요청 제한(Rate Limit)이 있습니다. 이를 준수하고, 일시적인 네트워크 오류에 대응하기 위해 재시도 로직을 추가하면 더 안정적인 코드를 만들 수 있습니다. `tenacity` 라이브러리를 사용하면 재시도 로직을 쉽게 구현할 수 있습니다.

In [None]:
!pip install tenacity

In [None]:
import asyncio
import aiohttp
from tenacity import retry, stop_after_attempt, wait_exponential

class AsyncLLMClient:
    """LLM API 호출을 관리하는 비동기 클라이언트 클래스입니다."""
    
    def __init__(self, api_key, rate_limit=10):
        self.api_key = api_key
        self.semaphore = asyncio.Semaphore(rate_limit)  # 동시 요청 제한
    
    @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
    async def call_api(self, session, prompt, model="gpt-3.5-turbo"):
        """재시도 로직이 있는 API 호출 함수입니다."""
        async with self.semaphore:  # 속도 제한 적용
            print(f"'{prompt[:10]}...' 요청 시작")
            headers = {
                "Authorization": f"Bearer {self.api_key}",
                "Content-Type": "application/json"
            }
            json_data = {
                "model": model,
                "messages": [{"role": "user", "content": prompt}]
            }
            
            async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=json_data) as response:
                # 오류 처리
                response.raise_for_status() # 200번대 응답이 아니면 예외 발생
                
                result = await response.json()
                print(f"'{prompt[:10]}...' 요청 완료")
                return result["choices"][0]["message"]["content"]
    
    async def process_batch(self, prompts):
        """여러 프롬프트를 효율적으로 처리하는 함수입니다."""
        async with aiohttp.ClientSession() as session:
            tasks = [self.call_api(session, prompt) for prompt in prompts]
            # return_exceptions=True로 설정하면 개별 작업의 예외를 모아서 반환합니다.
            return await asyncio.gather(*tasks, return_exceptions=True)

#### `AsyncLLMClient` 사용 예제

In [None]:
async def main_advanced():
    if API_KEY == "YOUR_OPENAI_API_KEY":
        print("API_KEY를 설정해주세요.")
        return
        
    client = AsyncLLMClient(api_key=API_KEY, rate_limit=5)
    results = await client.process_batch(prompts)
    
    for prompt, result in zip(prompts, results):
        if isinstance(result, Exception):
            print(f"프롬프트: {prompt}\n오류: {result}\n")
        else:
            print(f"프롬프트: {prompt}\n응답: {result}\n")

await main_advanced()

## 5. 스트리밍 응답 처리

LLM이 생성하는 텍스트를 실시간으로 받아 처리하고 싶을 때 스트리밍을 사용합니다. 사용자에게 더 빠른 피드백을 줄 수 있습니다.

In [14]:
import json
import os
import aiohttp
from dotenv import load_dotenv

# .env 파일에서 환경변수 로드
load_dotenv()
API_KEY = os.getenv("OPENAI_API_KEY")

def parse_stream_chunk(line):
    """스트리밍 응답 라인을 파싱하는 도우미 함수"""
    if line.startswith(b'data: '):
        line_data = line[len(b'data: '):].strip()
        if line_data == b'[DONE]':
            return None # 스트림 종료
        try:
            chunk = json.loads(line_data)
            return chunk['choices'][0]['delta']
        except json.JSONDecodeError:
            return None # 잘못된 JSON 데이터
    return None

async def stream_llm_response(prompt, model="gpt-4o-mini"):
    """LLM의 스트리밍 응답을 처리하는 함수입니다."""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    json_data = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "stream": True
    }
    
    async with aiohttp.ClientSession() as session:
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json=json_data) as response:
            print(f"프롬프트: {prompt}\n응답: ", end="")
            async for line in response.content:
                if line.strip():
                    # 스트림 데이터 파싱 및 처리
                    chunk = parse_stream_chunk(line)
                    if chunk and 'content' in chunk:
                        print(chunk['content'], end='', flush=True)
            print() # 마지막에 줄바꿈

# 실행 예시
await stream_llm_response("양자 컴퓨팅에 대해 시처럼 설명해줘.")

프롬프트: 양자 컴퓨팅에 대해 시처럼 설명해줘.
응답: 양자 세상, 신비의 바다,  
입자는 춤추고, 파동은 노래해.  
0과 1의 경계 허물고,  
양자 비트는 둘이 아냐, 세상이야.

중첩의 힘, 동시에 존재,  
한 순간에 여러 길을 걷는 그대.  
측정의 순간, 진실이 드러나,  
우선순위 없이 우주를 어루만져.

엉킴 속의 연결, 뗄 수 없는 선,  
멀리 있어도 하나의 마음으로.  
양자 얽힘, 영원한 동행,  
우리는 서로에게 빛을 전해.

계산의 속도, 상상을 초월,  
복잡한 문제를 한순간에 풀어.  
미래의 열쇠, 새로운 시대,  
양자 컴퓨터, 혁신의 씨앗을 뿌려.

신비로운 법칙 속에 숨은 진실,  
우리가 꿈꾸는 기술의 경계.  
양자 세계, 무한한 가능성,  
우리의 연구는 계속, 끝없는 여정.


## 6. 대규모 처리를 위한 작업자 풀

처리해야 할 프롬프트가 수백, 수천 개에 달할 경우, `asyncio.Queue`와 작업자(worker) 패턴을 사용하면 시스템 리소스를 더 효율적으로 관리하고 안정적으로 작업을 처리한다.

In [17]:
import asyncio
import aiohttp
import json
import os
from dotenv import load_dotenv

# .env 파일에서 환경변수 로드
load_dotenv()
API_KEY = os.getenv("OPENAI_API_KEY")

class AsyncLLMClient:
    """비동기 LLM API 클라이언트"""
    def __init__(self, api_key, rate_limit=5):
        self.api_key = api_key
        self.rate_limit = rate_limit
        
    async def call_api(self, session, prompt, model="gpt-4o-mini"):
        """OpenAI API 호출"""
        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }
        
        json_data = {
            "model": model,
            "messages": [{"role": "user", "content": prompt}],
            "stream": False
        }
        
        async with session.post("https://api.openai.com/v1/chat/completions", 
                               headers=headers, json=json_data) as response:
            data = await response.json()
            return data['choices'][0]['message']['content']

async def worker(queue, results, client, session):
    """작업 큐에서 프롬프트를 처리하는 작업자 함수입니다."""
    while True:
        idx, prompt = await queue.get() # 큐에서 아이템 가져오기
        if idx is None:  # 종료 신호
            break
            
        response = await client.call_api(session, prompt)
        results[idx] = response
        queue.task_done()

async def process_large_batch(prompts, worker_count=5):
    """대량의 프롬프트를 작업자 풀로 처리하는 함수입니다."""
    client = AsyncLLMClient(API_KEY, rate_limit=worker_count)
    queue = asyncio.Queue()
    results = [""] * len(prompts)
    
    # 큐에 작업 추가
    for i, prompt in enumerate(prompts):
        await queue.put((i, prompt))
    
    # 워커 종료 신호 추가
    for _ in range(worker_count):
        await queue.put((None, None))
    
    async with aiohttp.ClientSession() as session:
        # 작업자 시작
        workers = [asyncio.create_task(worker(queue, results, client, session)) 
                   for _ in range(worker_count)]
        
        # 모든 작업 완료 대기
        await asyncio.gather(*workers)
    
    return results

# 대규모 프롬프트 예시 (5개로 축소)
large_prompts = [
    "인공지능의 역사를 요약해줘.",
    "블록체인의 핵심 기술은 무엇이야?",
    "세계에서 가장 높은 산은?",
    "물의 화학식은?",
    "셰익스피어의 유명한 비극 3가지는?"
]

# 실행
final_results = await process_large_batch(large_prompts, worker_count=3)
print(final_results)

['인공지능(AI)의 역사는 여러 단계로 나눌 수 있으며, 다음과 같은 주요 사건과 발전이 포함됩니다.\n\n1. **1940년대 - 1950년대: 초기 개념과 기초 이론**  \n   - 1943년, 연구자들은 인공지능의 기초가 되는 신경망 모델을 제안했습니다.\n   - 1950년, 앨런 튜링은 "튜링 테스트"를 소개하여 기계가 인간처럼 사고할 수 있는지를 평가하는 기준을 제시했습니다.\n   - 1956년, 다트머스 회의에서 인공지능이라는 용어가 공식적으로 사용되며, AI 연구가 본격적으로 시작되었습니다.\n\n2. **1960년대 - 1970년대: 탐색과 초기 적용**  \n   - 이 시기에 초기 AI 프로그램들이 개발되었고, 문제 해결, 자연어 처리, 게임 플레이 등 다양한 분야에서 적용되었습니다.\n   - 엘리자(ELIZA)와 같은 초기 챗봇이 등장하였고, 신경망 연구가 이어졌습니다.\n\n3. **1980년대: 전문가 시스템의 발전**  \n   - 전문가 시스템이 부상하여 특정 분야에서의 의사결정을 지원하는 프로그램들이 개발되었습니다. 대표적인 예로는 MYCIN이 있습니다.\n   - 이 시기의 성장은 AI에 대한 관심을 높였으나, 지나친 기대와 한계로 인해 \'AI 겨울\'이라는 연구 자원의 고갈 시기가 다가왔습니다.\n\n4. **1990년대: 기계 학습과 데이터 기반 접근법의 부상**  \n   - 기계 학습과 통계적 방법론에 대한 연구가 활발해졌고, 대량의 데이터를 활용하는 방법론이 발전했습니다.\n   - IBM의 딥 블루가 체스에서 세계 챔피언 스펜서에게 승리하는 사건이 있었고, 이는 AI의 능력을 대중에게 각인시켰습니다.\n\n5. **2000년대 - 현재: 딥러닝과 AI의 재도약**  \n   - 2010년대 초반부터 딥러닝 기술이 발전하면서 이미지 인식, 자연어 처리 등 다양한 분야에서 혁신적인 성과를 이루었습니다.\n   - 구글의 알파고가 바둑에서 세계 챔피언을 이기는 성과는 AI의 잠재력을 다시 한번 크게 부각시켰습니다.

## 요약

LLM 애플리케이션에서 `asyncio`를 활용하면 동시에 여러 요청을 처리하고, 효율적으로 리소스를 사용하며, 응답 시간을 줄일 수 있다. 특히 여러 LLM 호출을 병렬로 처리해야 하는 대화형 애플리케이션이나 일괄 처리 시스템에서 큰 이점을 제공한다.