In [13]:
from queue import Queue
from threading import Thread
import time

In [14]:
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)

    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()


class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [15]:
def download(item):
    time.sleep(0.2)
    print(f"donwload: {item}")
    return item


def resize(item):
    time.sleep(0.3)
    print(f"resize: {item}")
    return item


def upload(item):
    time.sleep(0.1)
    print(f"upload: {item}")
    return item


download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
for i in range(10):
    time.sleep(0.2)
    download_queue.put(i)

download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), "items finished")

donwload: 0
donwload: 1
resize: 0
donwload: 2
upload: 0
donwload: 3
resize: 1
upload: 1
donwload: 4
resize: 2
donwload: 5
upload: 2
donwload: 6
resize: 3
upload: 3
donwload: 7
resize: 4
donwload: 8
upload: 4
donwload: 9
resize: 5
upload: 5
resize: 6
upload: 6
resize: 7
upload: 7
resize: 8
upload: 8
resize: 9
upload: 9
10 items finished
