파이썬 프로그램이 동시에 여러 일을 수행한다면, 각 작업을 잘 조욜해야한다

동시성 작업을 처리할 때 가장 유용한 방식은 함수 파이프라인이다.

파이프라인은 공장 조립 라인처럼 작동한다. 

순차적으로 실행되야 하는 여러 단계가 있고, 각 단계마다 실행할 구체적인 함수가 정해진다. 
파이프 라인 한쪽 끝에는 새로운 작업이 계속 추가된다.



In [1]:
# 디카에서 이미지 스트림을 계속 가져와 이미지 크기를 변경하고 온라인 포토 갤러리에 저장하려고 한다

# 1. 새 이미지를 얻고
# 2. 얻은 이미지는 두 번째 단계의 크기 변환 함수로 보내며
# 3. 크기가 조정된 이미지를 마지막 단계에 업로드 함수에 전달해 처리한다

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

In [2]:
# 파이프라인의 단계마다 작업을 전달할 방법이 무엇일까?

# 스레드 안전한 생산자-소비자를 사용해 이를 모델링 할 수 있따.

from collections import deque
from threading import Lock

class MyQueue:
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
        
    def put(self, item): # 생산자인 디카는 미처리 작업을 표현하는 deque의 끝에 새로운 이미지를 추가한다
        with self.lock:
            self.items.append(item)
            
            
    def get(self): # 소비자는 미처리 작업을 표현하는 deque의 맨 앞에서 이미지를 제거한다
        with self.lock:
            return self.items.popleft()
        


In [3]:
from threading import Thread
import time

class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError: # 입력 큐가 비어있는 경우
                time.sleep(0.01) # 할 일이 없음
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1
                
                
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()

done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

while len(done_queue.items) < 1000:
    # 기다리는 동안 유용한 작업을 수행한다
    pass

processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print(f'{processed} 개의 아이템을 처리했습니다, '
      f'이때 폴링을 {polled} 번 했습니다.')


# 제대로 동작하지만, IndexError 예외를 잡아내는 부분이 상당히 많이 발생한다.

# 작업자 함수의 속도가 달라지만 앞에 있는 단계가 그보다 더 뒤에 있는 단계의 진행을 방해하면서 파이프 라인을 방해 할 수 있다.

1000 개의 아이템을 처리했습니다, 이때 폴링을 3007 번 했습니다.


In [5]:
# 문제 1. 모든 작업이 다 끝났는지 검사하기 위해 추가로 done_queue에 대해 바쁜 대기(busy waiting)을 수행해야 한다.

# 문제 2. Worker의 run 메서드가 루프를 무한히 반복한다. 루프를 중단 할 시점임을 알려줄 뚜렷한 방법이 없다

# 문제 3. 파이프라인 진행이 막히면 프로그램이 임의로 중단될 수 있다
# 첫번째 단계가 빠르게 진행되는데 두번째단계가 느리게 진행되면, 두 단계를 연결하는 큐의 크기가 계속 늘어난다
# 이 경우에 두번 재 단계가 작업을 계속 처리할 수 없게 된다.
# 시간과 입력 데이터가 충분히 많으면 언젠가는 메모리를 다 소모하고 프로그램이 죽어버릴 것이다.


## -> 굳이 직접하지말고 Queue를 사용하면 해결된다

from queue import Queue
from threading import Thread

my_queue = Queue()

def consumer():
    print('소비자 대기')
    my_queue.get()  # 다음에 보여줄 put()이 실행된 다음에 시행된다
    print('소비자 완료')

thread = Thread(target=consumer)
thread.start()

print('생산자 데이터 추가')
my_queue.put(object())     # 앞에서 본 get()이 실행되기 전에 실행된다.
print('생산자 완료') # consumer -> put -> join -> get ... (block 됨)
thread.join()

소비자 대기생산자 데이터 추가
생산자 완료

소비자 완료


In [6]:
from queue import Queue
from threading import Thread
import time

my_queue = Queue(1)  # 버퍼 크기 1

def consumer():
    time.sleep(0.1)  # 대기
    my_queue.get()  # 두 번째로 실행됨
    print('소비자 1')
    my_queue.get()  # 네 번째로 실행됨
    print('소비자 2')
    print('소비자 완료')

thread = Thread(target=consumer)
thread.start()

my_queue.put(object()) # 첫 번째로 실행됨
print('생산자 1')
my_queue.put(object()) # 세 번째로 실행됨
print('생산자 2')
print('생산자 완료')
thread.join()

생산자 1
소비자 1
생산자 2
생산자 완료
소비자 2
소비자 완료


In [7]:
from queue import Queue
from threading import Thread
import time

in_queue = Queue()

def consumer():
    print('소비자 대기')
    work = in_queue.get()  # 두 번째로 실행됨
    print('소비자 작업중')
    # Doing work
    print('소비자 완료')
    in_queue.task_done()  # 세 번째로 실행됨 # 어떤 단계의 입력 큐가 다 소진 될때까지 기다릴 숭 씨고, 마지막 단계를 폴림 할 필요도 없어진다.

thread = Thread(target=consumer)
thread.start()

print('생산자 데이터 추가')
in_queue.put(object())    # 첫 번째로 실행됨
print('생산자 대기')
in_queue.join()           # 네 번째로 실행됨
print('생산자 완료')
thread.join()

소비자 대기
생산자 데이터 추가
생산자 대기
소비자 작업중
소비자 완료
생산자 완료


In [8]:
from queue import Queue
from threading import Thread
import time

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

class ClosableQueue(Queue):
    SENTINEL = object() # 다음 코드는 큐에 더이상 입력이 없음을 표시함

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return   # 스레드를 종료시킨다
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object())

download_queue.close()

download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), '개의 원소가 처리됨')

for thread in threads:
    thread.join()

1000 개의 원소가 처리됨


In [None]:
from queue import Queue
from threading import Thread
import time

def download(item):
    return item

def resize(item):
    return item

def upload(item):
    return item

class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return   # 스레드를 종료시킨다
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)


def start_threads(count, *args):
    threads = [StoppableWorker(*args) for _ in range(count)]
    for thread in threads:
        thread.start()
    return threads


def stop_threads(closable_queue, threads):
    for _ in threads:
        closable_queue.close()

    closable_queue.join()

    for thread in threads:
        thread.join()

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
download_threads = start_threads(
    3, download, download_queue, resize_queue)
resize_threads = start_threads(
    4, resize, resize_queue, upload_queue)
upload_threads = start_threads(
    5, upload, upload_queue, done_queue)

for _ in range(1000):
    download_queue.put(object())

stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)

print(done_queue.qsize(), '개의 원소가 처리됨')