In [1]:
# Queueing for a constant stream of images to do sequential tasks: download, resize, upload and etc.
from threading import Thread, Lock
from collections import deque
from random import randint
from time import sleep

def download(item):
    print('download..')
    sleep(randint(1,3))
    return item

def resize(item):
    print('resize..')
    sleep(randint(1,3))
    return item 

def upload(item):
    print('upload...')
    sleep(randint(1,3))
    return item 

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
        
    def put(self, item):
        with self.lock:
            self.items.append(item)
            
    def get(self):
        with self.lock:
            return self.items.popleft()
        
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0

    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1
                print('> work_done: %s' % self.work_done)

download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

count = 10

for thread in threads:
    thread.start()

for _ in range(count):
    download_queue.put(object())
    
while len(done_queue.items) < count:
        print('> wait...')
        sleep(1)
        
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Processed', processed, 'items after polling', polled, 'times')

> wait...
download..
> wait...
> wait...
> wait...
> work_done: 1
download..
resize..
> wait...
> work_done: 1
upload...
> wait...
> wait...
> work_done: 2
download..
resize..
> work_done: 1
> wait...
> work_done: 3
download..
> work_done: 2
resize..
upload...
> wait...
> wait...
> work_done: 2
> wait...
> work_done: 4
download..
> work_done: 3
resize..
upload...
> wait...
> work_done: 5
download..
> wait...
> work_done: 4
resize..
> wait...
> work_done: 3
upload...
> wait...
> work_done: 6
download..
> work_done: 5
resize..
> wait...
> work_done: 4
upload...
> wait...
> work_done: 7
download..
> wait...
> work_done: 5
> work_done: 6
resize..
upload...
> wait...
> work_done: 6
> wait...
> work_done: 8
download..
> wait...
> work_done: 9
download..
> work_done: 7
resize..
upload...
> wait...
> work_done: 10
> wait...
> work_done: 8
resize..
> wait...
> work_done: 7
upload...
> wait...
> work_done: 9
resize..
> wait...
> work_done: 10
> work_done: 8
upload...
> wait...
> work_done: 9
upl

In [2]:
# Queue's get() method block until new data is available
from queue import Queue
queue = Queue()

def consumer():
    print('Consumer waiting')
    queue.get() # Runs after put() below
    print('Consumer done')

thread = Thread(target=consumer)
thread.start()

print('Producer putting')
queue.put(object())
print('Producer done')

Consumer waiting
Producer putting
Producer done
Consumer done


In [3]:
# To solve the pipeline backup issue, specify the maximum amount of pending work
queue = Queue(1)

def consumer():
    sleep(0.1)
    queue.get()
    print('Consumer got 1')
    queue.get()
    print('Consumer got 2')
    
thread = Thread(target=consumer)
thread.start()

queue.put(object())
print('Producer put 1')
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')

Producer put 1
Consumer got 1
Producer put 2
Consumer got 2
Producer done


In [4]:
# track the progress of work
queue = Queue()

def consumer():
    print('Consumer waiting')
    work = queue.get()
    sleep(0.1) # Doing some work
    print('Consumer done')
    queue.task_done()
    
Thread(target=consumer).start()
queue.put(object())
print('Producer waiting')
queue.join()
print('Producer done')

Consumer waiting
Producer waiting
Consumer done
Producer done


In [5]:
# A custom queue which has a close method
class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return # cause the thread to exit
                yield item
            finally:
                self.task_done()
                
class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)
            
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

count = 10

for thread in threads:
    thread.start()

for _ in range(count):
    download_queue.put(object())
download_queue.close()

download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

download..
download..
resize..
download..
download..
resize..
upload...
download..
resize..
upload...
resize..
download..
upload...
resize..
upload...
download..
resize..
upload...
download..
resize..
upload...
download..
resize..
upload...
download..
resize..upload...

resize..
upload...
upload...
10 items finished
