In [2]:
import time
import requests

def sync_example():
    print("작업 1 시작")
    response1 = requests.get("https://httpbin.org/delay/2")  # 2초 대기
    print("작업 1 완료")

    print("작업 2 시작")
    response2 = requests.get("https://httpbin.org/delay/2")  # 2초 대기
    print("작업 2 완료")

    return response1, response2

# 실행하면 총 4초가 걸림 (2초 + 2초)
start = time.time()
result = sync_example()
print(f"총 소요시간: {time.time() - start:.2f}초")

작업 1 시작
작업 1 완료
작업 2 시작
작업 2 완료
총 소요시간: 6.17초


In [9]:
import threading
import time
import requests

def demonstrate_gil_release():
    """GIL이 I/O 중에 해제되는 것을 보여주는 예제"""

    def io_task(task_id):
        print(f"태스크 {task_id}: 시작 (스레드 {threading.current_thread().name})\n")

        # 이 순간 GIL이 해제됩니다!
        response = requests.get("https://httpbin.org/delay/2")

        print(f"태스크 {task_id}: 완료 (스레드 {threading.current_thread().name})")
        return response.status_code

    # 멀티스레드로 실행
    start_time = time.time()
    threads = []

    for i in range(3):
        thread = threading.Thread(target=io_task, args=(i,))
        threads.append(thread)
        thread.start()

    for thread in threads:
        thread.join()

    print(f"멀티스레드 총 시간: {time.time() - start_time:.2f}초")

# 실행하면 약 2초 정도 걸림 (순차 실행이면 6초였을 것)
demonstrate_gil_release()

태스크 0: 시작 (스레드 Thread-8 (io_task))

태스크 1: 시작 (스레드 Thread-9 (io_task))

태스크 2: 시작 (스레드 Thread-10 (io_task))

태스크 0: 완료 (스레드 Thread-8 (io_task))
태스크 2: 완료 (스레드 Thread-10 (io_task))
태스크 1: 완료 (스레드 Thread-9 (io_task))
멀티스레드 총 시간: 4.26초


In [11]:
import threading
import asyncio
import multiprocessing
import os

def demonstrate_what_runs_where():
    """각각이 어디서 실행되는지 확인"""

    print("=== 현재 상황 확인 ===")
    print(f"프로세스 ID: {os.getpid()}")
    print(f"메인 스레드: {threading.current_thread().name}")
    print(f"활성 스레드 수: {threading.active_count()}")

    # 1. 멀티스레드 생성
    def thread_worker(name):
        print(f"스레드 {name} - 프로세스: {os.getpid()}, 스레드: {threading.current_thread().name}")
        time.sleep(1)

    print("\n=== 멀티스레드 실행 ===")
    threads = []
    for i in range(3):
        t = threading.Thread(target=thread_worker, args=(f"T{i}",))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    print(f"스레드 작업 후 활성 스레드 수: {threading.active_count()}")

    # 2. 비동기 실행
    async def async_worker(name):
        print(f"비동기 {name} - 프로세스: {os.getpid()}, 스레드: {threading.current_thread().name}")
        await asyncio.sleep(1)

    async def run_async_tasks():
        print("\n=== 비동기 실행 ===")
        await asyncio.gather(
            async_worker("A1"),
            async_worker("A2"),
            async_worker("A3")
        )
        print(f"비동기 작업 후 활성 스레드 수: {threading.active_count()}")

    asyncio.run(run_async_tasks())

demonstrate_what_runs_where()

=== 현재 상황 확인 ===
프로세스 ID: 93783
메인 스레드: MainThread
활성 스레드 수: 9

=== 멀티스레드 실행 ===
스레드 T0 - 프로세스: 93783, 스레드: Thread-11 (thread_worker)
스레드 T1 - 프로세스: 93783, 스레드: Thread-12 (thread_worker)
스레드 T2 - 프로세스: 93783, 스레드: Thread-13 (thread_worker)
스레드 작업 후 활성 스레드 수: 9


RuntimeError: asyncio.run() cannot be called from a running event loop

In [12]:
import multiprocessing
print(multiprocessing.cpu_count())

14
