In [3]:
def count1(max):
    count = 1
    while count <= max:
        yield count
        count +=1

counter = count1(5)

print(next(counter))
print(next(counter))
print(next(counter))
print(next(counter))


1
2
3
4


In [4]:
# 리스트 컴프리헨션 vs 제너레이터 표현식
squares_list = [x**2 for x in range(10)]

squares_gen = (x**2 for x in range(10))

print(squares_list)
print(squares_gen)

for square in squares_gen:
    print(square)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
<generator object <genexpr> at 0x103cc70b0>
0
1
4
9
16
25
36
49
64
81


next함수와 같이 사용하면 연산되서 결과는 사람이 보기 편한 형태로 반환된다.


In [6]:
# 제너레이터 함수의 상태 관리 예시
def stateful_generator():
    print("Starting up")
    yield 1
    print("Working...")
    yield 2
    print("Cleaning up")
    yield 3     
gen = stateful_generator()
print(next(gen))
print(next(gen))
print(next(gen))


Starting up
1
Working...
2
Cleaning up
3


In [None]:
# 지연평가(lazy evaluation) 예시

# def count1(max):
#     count = 1
#     while count <= max:
#         yield count
#         count +=1   

In [8]:
import time
import random

def sensor_data_stream():
    while True:
        temp = random.uniform(20.0, 30.0)  # 20.0 ~ 30.0 사이의 랜덤 온도
        yield temp
        time.sleep(1)  # 1초 대기

data_stream = sensor_data_stream()
for _ in range(5):  # 5개의 데이터만 출력
    print(next(data_stream))

28.231693381685623
26.111389336989248
23.832390284718638
27.317009594631926
21.28398755951357


In [None]:
import time
import threading

def background_task():
    while True:
        print("Background task is running...")
        time.sleep(2)

thread = threading.Thread(target=background_task, daemon=True)
thread.start()

print("Main program is doing other work...")
time.sleep(10)
print("Main program is done.")




Background task is running...Main program is doing other work...

Background task is running...
Background task is running...
Background task is running...
Background task is running...
Main program is done.


Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background task is running...
Background

In [10]:
import time
import threading

data = None
condition = threading.Condition()

def wait_for_data():
    print("Waiting for data...")
    with condition:
        condition.wait()  # 데이터가 올 때까지 대기
        print(f"Received data: {data}")
        

In [11]:
def prepare_data():
    global data
    time.sleep(2)  # 데이터 준비에 시간 소요
    with condition:
        data = "Important Data"
        condition.notify()  # 대기 중인 스레드에 알림

thread1 = threading.Thread(target=wait_for_data)
thread2 = threading.Thread(target=prepare_data)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

Waiting for data...
Received data: Important Data


In [12]:
# 공유 변수
counter = 0

def increment(count):
    global counter
    for _ in range(count):
        current = counter
        time.sleep(0.01)  # 쓰레드 전환이 일어날 수 있는 시간
        counter = current + 1


# 두 개의 스레드가 동시에 counter를 증가시킴
thread1 = threading.Thread(target=increment, args=(100,))
thread2 = threading.Thread(target=increment, args=(100,))

thread1.start()
thread2.start()

thread1.join()
thread2.join()

print(f"Final counter value: {counter}") # 200이 아닌 경우가 많음 (경쟁 상태 발생)


Final counter value: 100


In [13]:
counter = 0
counter_lock = threading.Lock() # 락 객체 생성

def increment_safe(count):
    global counter
    for _ in range(count):
        counter_lock.acquire()  # 락 획득 
        try:
            current = counter
            time.sleep(0.01)  # 쓰레드 전환이 일어날 수 있는 시간
            counter = current + 1

        finally:
            counter_lock.release()  # 락 해제


# 두 개의 스레드가 동시에 counter를 증가시킴
thread1 = threading.Thread(target=increment_safe, args=(100,))
thread2 = threading.Thread(target=increment_safe, args=(100,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Final counter value: {counter}")

Final counter value: 200


In [14]:
# wiht문을 사용한 락 관리(더 간결함.)
counter = 0
counter_lock = threading.Lock() # 락 객체 생성

def increment_safe(count):
    global counter
    for _ in range(count):
        with counter_lock:  # 락 획득
            current = counter
            time.sleep(0.01)  # 쓰레드 전환이 일어날 수 있는 시간
            counter = current + 1
        # 락 해제는 with 블록을 벗어나면 자동으로 이루어짐


# 두 개의 스레드가 동시에 counter를 증가시킴
thread1 = threading.Thread(target=increment_safe, args=(100,))
thread2 = threading.Thread(target=increment_safe, args=(100,))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
print(f"Final counter value: {counter}")


Final counter value: 200


스레드 간 통신: Queue 활용하기
- FIFO
- 작업자 - 소비자 패턴 구현에 이상적
- Queue 클래스는 내부적으로 Lock을 사용하여 여러 스레드에서 동시에 접근해도 안전하게 동작하여
스레드 간 데이터 교환 시 경쟁 조건(race condition)을 방지


In [23]:
import queue

# 작업 큐 생성
task_queue = queue.Queue()

# 결과 큐 생성
result_queue = queue.Queue()

# 작업자 스레드 함수
def creat_thread():
    print("Worker started")
    # 10개의 작업 생성
    for i in range(10):
        task = f"작업 -{i}"
        task_queue.put(task)
        print(f"추가된 작업 {task}")
        time.sleep(random.uniform(0.1,0.3))  # 약간의 간격을 두고 작업 생성

    for _ in range(3):
        task_queue.put(None)  # 작업 종료 신호
    print("Worker finished")


def worker_thread(id):
    while True:
        task = task_queue.get()
        if task is None:  # 종료 신호
            print(f"워커: {id} 종료 신호 수신")
            break


        print(f"스레드 {id}가 {task} 처리 중...")
        processing_time = random.uniform(0.5, 1.5)
        time.sleep(processing_time)  # 작업 처리 시간 시뮬레이션

        result = f"{task} 완료 by 스레드 {id}"
        result_queue.put(result)
        task_queue.task_done() # 전체가 끝난게 아니라 이 작업이 끝났다고 알림

    print(f"스레드 {id} 종료")


def result_collector():
    print("Result collector started")
    completed_tasks = []
    
    for _ in range(10):  # 총 10개의 작업이 완료될 때까지 대기
        id, result = result_queue.get()
        print(f"Collector received: {result}")
        completed_tasks.append(result)
        result_queue.task_done()  # 이 결과 처리가 끝났다고 알림
    
    print("All results collected:", completed_tasks)

# 스레드 생성 및 시작      
creator_threads = threading.Thread(target=creat_thread)
worker_threads = [threading.Thread(target=worker_thread, args=(i,)) for i in range(3)]
completed_threads = threading.Thread(target=result_collector)   

# 스레드 시작
creator_threads.start()
for wt in worker_threads:
    wt.start()
completed_threads.start()

# 스레드 종료 대기
completed_threads.join()
for wt in worker_threads:
    wt.join()
creator_threads.join()

print("All threads completed.")


Worker started
추가된 작업 작업 -0
스레드 0가 작업 -0 처리 중...
Result collector started
추가된 작업 작업 -1스레드 1가 작업 -1 처리 중...

추가된 작업 작업 -2스레드 2가 작업 -2 처리 중...



Exception in thread Thread-299:
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
  File "/Users/iseungjun/Library/Python/3.9/lib/python/site-packages/ipykernel/ipkernel.py", line 772, in run_closure
    _threading_Thread_run(self)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
  File "/var/folders/b4/y3vm1z7n63g2lrchgq49_zn40000gn/T/ipykernel_57545/2475471975.py", line 48, in result_collector
ValueError: too many values to unpack (expected 2)


추가된 작업 작업 -3
스레드 0가 작업 -3 처리 중...
추가된 작업 작업 -4스레드 1가 작업 -4 처리 중...

추가된 작업 작업 -5
추가된 작업 작업 -6
스레드 0가 작업 -5 처리 중...
스레드 2가 작업 -6 처리 중...
추가된 작업 작업 -7
추가된 작업 작업 -8
추가된 작업 작업 -9
Worker finished
스레드 1가 작업 -7 처리 중...
스레드 0가 작업 -8 처리 중...
스레드 2가 작업 -9 처리 중...
워커: 1 종료 신호 수신
스레드 1 종료
워커: 0 종료 신호 수신
스레드 0 종료
워커: 2 종료 신호 수신
스레드 2 종료
All threads completed.


In [24]:
# submit과 as_completed로 개별 작업 제출 및 처리
import concurrent.futures

def task(name):
    print(f"Task {name} starting")
    time.sleep(random.uniform(0.5, 2.0))  # 작업 처리 시간 시뮬레이션
    
    result = f"Result of {name}"
    print(f"Task {name} completed")
    return result

params = [
    ("A", 2),
    ("B", 3),
    ("C", 1),
    ("D", 4),
    ("E", 2) 
]

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    results = list(executor.map(task, params))

    for result in results:
        print(result)

Task ('A', 2) starting
Task ('B', 3) starting
Task ('A', 2) completed
Task ('C', 1) starting
Task ('B', 3) completed
Task ('D', 4) starting
Task ('C', 1) completed
Task ('E', 2) starting
Task ('E', 2) completed
Task ('D', 4) completed
Result of ('A', 2)
Result of ('B', 3)
Result of ('C', 1)
Result of ('D', 4)
Result of ('E', 2)


In [29]:
# 작업 결과를 추적하는 딕셔너리 사용

import concurrent.futures


In [30]:
# 프로세스 간단 예제

def count_up(name, max_count):
    for i in range(1, max_count + 1):
        print(f"{name} counting: {i}")
        time.sleep(0.5)

if __name__ == "__main__":
    process1 = threading.Thread(target=count_up, args=("Process-1", 5))
    process2 = threading.Thread(target=count_up, args=("Process-2", 3))

    process1.start()
    process2.start()

    process1.join()
    process2.join()

    print("Both processes completed.")



Process-1 counting: 1
Process-2 counting: 1
Process-1 counting: 2
Process-2 counting: 2
Process-1 counting: 3Process-2 counting: 3

Process-1 counting: 4
Process-1 counting: 5
Both processes completed.


In [35]:
import multiprocessing

def add_to_shared_number(shared_value, lock, increment):
    for _ in range(5):
        with lock:  # 락 획득
            shared_value.value += increment
        time.sleep(0.1)  # 작업 지연 시뮬레이션
    print(f"Process {multiprocessing.current_process().name} done.")

if __name__ == "__main__":
    shared_number = multiprocessing.Value('i', 0)  # 정수형 공유 변수 초기화
    lock = multiprocessing.Lock()  # 락 객체 생성

    p1 = multiprocessing.Process(target=add_to_shared_number, args=(shared_number, lock, 1))
    p2 = multiprocessing.Process(target=add_to_shared_number, args=(shared_number, lock, 1))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print(f"Final shared number: {shared_number.value}")

# 아래 문구 나오는 이유 py에서 하면 에러 안난다.


Final shared number: 0


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'add_to_shared_number' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
   

In [36]:
# Queue 활용

def producer_process(queue):
    for i in range(5):
        item = f"Item-{i}"
        queue.put(item)
        print(f"Produced {item}")
        time.sleep(random.uniform(0.1, 0.5))  # 생산 속도 시뮬레이션
    queue.put(None)  # 종료 신호

def consumer_process(queue):
    while True:
        item = queue.get()
        if item is None:  # 종료 신호
            break
        print(f"Consumed {item}")
        time.sleep(random.uniform(0.2, 0.6))  # 소비 속도 시뮬레이션
    print("Consumer done.")

if __name__ == "__main__":
    q = multiprocessing.Queue()

    producer = multiprocessing.Process(target=producer_process, args=(q,))
    consumer = multiprocessing.Process(target=consumer_process, args=(q,))

    producer.start()
    consumer.start()

    producer.join()
    consumer.join()

    print("Both producer and consumer have finished.")  

Both producer and consumer have finished.


Traceback (most recent call last):
  File "<string>", line 1, in <module>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
    exitcode = _main(fd, parent_sentinel)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'producer_process' on <module '__main__' (built-in)>    
sel

In [42]:
import multiprocessing
import time
import os

def process_task(task_id):
    process_id = os.getpid()
    print(f"Process {task_id} starting task {task_id}")
    result = 0
    for i in range(10000000):
        result += i
    print(f"Process {task_id} finished task {task_id} with result {result}")
    return task_id, result, process_id

if __name__ == "__main__":
    num_cores = multiprocessing.cpu_count()
    print(f"Number of CPU cores: {num_cores}")

    tasks = range(10)

    start_time = time.time()
    sequential_results = [process_task(i) for i in tasks]
    end_time = time.time()
    print(f"순차 처리 시간: {end_time - start_time:.2f} seconds")

    with multiprocessing.Pool(processes=num_cores) as pool:
        parllel_results = pool.map(process_task, tasks)

    end_time = time.time()
    print(f"병렬 처리 시간: {end_time - start_time:.2f} seconds")

    process_ids = set(result[2] for result in parllel_results)
    print(f"Processes used in parallel execution: {process_ids}")


Number of CPU cores: 10
Process 0 starting task 0
Process 0 finished task 0 with result 49999995000000
Process 1 starting task 1
Process 1 finished task 1 with result 49999995000000
Process 2 starting task 2
Process 2 finished task 2 with result 49999995000000
Process 3 starting task 3
Process 3 finished task 3 with result 49999995000000
Process 4 starting task 4
Process 4 finished task 4 with result 49999995000000
Process 5 starting task 5
Process 5 finished task 5 with result 49999995000000
Process 6 starting task 6
Process 6 finished task 6 with result 49999995000000
Process 7 starting task 7
Process 7 finished task 7 with result 49999995000000
Process 8 starting task 8
Process 8 finished task 8 with result 49999995000000
Process 9 starting task 9
Process 9 finished task 9 with result 49999995000000
순차 처리 시간: 1.98 seconds


Process SpawnPoolWorker-23:
Process SpawnPoolWorker-22:
Process SpawnPoolWorker-21:
Process SpawnPoolWorker-20:
Traceback (most recent call last):
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/pool.py", line 114, in worker
    task = get()
  File "/Library/Developer/CommandLineTools/Library/Frameworks/Python3.framework/Versions/3.9/lib/python3.9/multiprocessing/queues.py", line 368, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'process_task' on <module '__main__' (built-in)>
Traceback (most recent call la

KeyboardInterrupt: 

## 비동기 프로그래밍

코루틴(Coroutine)

In [43]:
# 코루틴(Coroutine)
# 코루틴은 함수 실행을 중간에 멈추고, 나중에 다시 시작할 수 있는 특별한 함수
# 일반 함수와 달리 호출자가 제어권을 넘겨받아 함수 실행을 일시 중지하고 다시 시작할 수 있음

In [45]:
import asyncio
async def say_hello(name, delay):
    await asyncio.sleep(delay)
    print(f"Hello, {name}! (delay: {delay}s)")
    return f"Greeted {name}"    

async def main():
    print("Starting main coroutine")

    results = await asyncio.gather(
        say_hello("Alice", 2),
        say_hello("Bob", 3),
        say_hello("Charlie", 1)
    )

    print(f"All greetings done: {results}")
    print("Main coroutine finished")

# if __name__ == "__main__":
#     asyncio.run(main())
    

In [46]:
# 과제 
# API URL에 GET 요청 보내기.