In [81]:
from threading import Thread
from threading import Lock
from collections import deque
import time

In [82]:
def download(file):
    return
def resize(image):
    return
def upload(image):
    return

In [83]:
class MyQueue : 
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
    def put(self, item):
        with self.lock:
            self.items.append(item)
    def get(self):
        with self.lock:
            self.items.popleft()

In [84]:
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
    
    def run(self):
        while True:
            self.polled_count +=1
            try :
                item = self.in_queue.get()
            except IndexError:
                time.sleep(0.01)
            else : 
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done +=1

In [85]:
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()

In [86]:
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue)
]

In [87]:
for thread in threads : 
    thread.start()

In [88]:
for _ in range(1000):
    download_queue.put(object)

In [89]:
while len(done_queue.items) < 1000:
    pass


In [90]:
processed =len(done_queue.items)
polled = sum(t.polled_count for t in threads)

In [91]:
print(f'Processed {processed} item after polling {polled} times')

Processed 1000 item after polling 3009 times


This outcome is that worker threads waste CPU time doing nothing useful.
Three problems in the above producer-and-consumer queue
1. determining that all of input work is complete requires yet another busy wait on the done_queue
2. In worker , the run method will execute forever in its busy loop
3. if the first phase makes rapid progress , but the second phrase makes slow progress, then the queue connecting the first phase to the seocond phase will constanly increase in size. Given enough time and input data, the program will eventually run out of memory and die

In [92]:
from queue import Queue

In [93]:
my_queue = Queue()

def consumer():
    time.sleep(0.1)
    my_queue.get()
    print("Consumer got 1")
    my_queue.get()
    print("Consumer got 2")
    print("Consumer done")

thread = Thread(target=consumer)
thread.start()
my_queue.put(object)
print("Producer put 1")
my_queue.put(object)
print("Producer put 2")
print("Producer done")
thread.join()

Producer put 1
Producer put 2
Producer done
Consumer got 1
Consumer got 2
Consumer done


By control the buffer size, we can make the queue manageable in size

In [94]:
class ClosableQueue(Queue):
    SENTINEL = object()

    def close(self):
        self.put(self.SENTINEL)
        
        
    def __iter__(self):
        while True :
            item = self.get()
            try :
                if item is self.SENTINEL:
                    return
                yield item
            finally:
                self.task_done()

In [95]:
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
    
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)

In [96]:
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

In [97]:
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue)
]

In [98]:
for thread in threads:
    thread.start()

for _ in range(1000):
    download_queue.put(object)
    



In [99]:
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()

print(done_queue.qsize(), 'items finished')

1000 items finished


In [100]:
for thread in threads:
    thread.join()