# Queues

Python has a queue library: [https://docs.python.org/3/library/queue.html]() which we can use to put things on and off of a queue. However, this queue exists for a specific application of queues -- to produce results from a one or more workers and consume them from many workers.

Imagine you were processing Wikipedia in order to count up the occurrences of a particular phrase, ``citation needed``.

When you [download wikipedia dumps](https://dumps.wikimedia.org/enwiki/latest/), they are provided in a group of 30 or so files. Each file must be processed linearly (since they are compressed), but you want to count pages as fast as possible.

You might therefore create 30 "read and split" workers and a few counting workers to do this job.

We're going to do a similar task with made-up data. First we generate the numbers from 0 until ``N*B``, split into ``B`` buckets.

In [20]:
N = 1000
B = 30
dataset = [[(y + x*N) for y in range(N)] for x in range(B)]

We're going to calculate all numbers divisible by 13 and 17 in two steps, first, by only keeping the ones divisible by 17 in a first pass, and then in a final pass collecting those also divisible by 13.

In [21]:
# this is a thread that adds things to the queue
def producer(lst):
    for item in lst:
        if item % 17 == 0:
            q.put(item)
            
# this is where we save the results we want to a list
correct = []
def do_work(item):
    global correct
    if item % 13 == 0:
        correct.append((item, item // (13*17)))

The traditional way to solve this problem would be something like the following:

In [17]:
[x for x in range(1000) if x % 13 == 0 and x % 17 == 0]

[0, 221, 442, 663, 884]

But let's imagine we have a data science task that takes much longer, like the processing of Wikipedia described above.

Then, it is worth playing with the queue and threading libraries to try and speed up the computation by using more than one core at a time.

In [22]:
import queue, threading

# let's have two workers:
num_worker_threads = 2

def worker():
    while True:
        item = q.get()
        # important, we stop when someone puts a None in the list.
        if item is None:
            break
        do_work(item)
        q.task_done()

q = queue.Queue()
threads = []
for i in range(num_worker_threads):
    t = threading.Thread(target=worker)
    t.start()
    threads.append(t)

for bucket in dataset:
    t = threading.Thread(target=producer, args=(bucket,))
    t.start()
    threads.append(t)
    
# block until all tasks are done
q.join()

# stop workers by sending them None after all the data.
for i in range(num_worker_threads):
    q.put(None)
for t in threads:
    t.join()

Now we can look at the results:

In [23]:
correct

[(0, 0),
 (221, 1),
 (442, 2),
 (663, 3),
 (884, 4),
 (1105, 5),
 (1326, 6),
 (1547, 7),
 (1768, 8),
 (1989, 9),
 (2210, 10),
 (2431, 11),
 (2652, 12),
 (2873, 13),
 (3094, 14),
 (3315, 15),
 (3536, 16),
 (3757, 17),
 (3978, 18),
 (4199, 19),
 (4420, 20),
 (4641, 21),
 (4862, 22),
 (5083, 23),
 (5304, 24),
 (5525, 25),
 (5746, 26),
 (5967, 27),
 (6188, 28),
 (6409, 29),
 (6630, 30),
 (6851, 31),
 (7072, 32),
 (7293, 33),
 (7514, 34),
 (7735, 35),
 (7956, 36),
 (8177, 37),
 (8398, 38),
 (8619, 39),
 (8840, 40),
 (9061, 41),
 (9282, 42),
 (9503, 43),
 (9724, 44),
 (9945, 45),
 (10166, 46),
 (10387, 47),
 (10608, 48),
 (10829, 49),
 (11050, 50),
 (11271, 51),
 (11492, 52),
 (11713, 53),
 (11934, 54),
 (12155, 55),
 (12376, 56),
 (12597, 57),
 (12818, 58),
 (13039, 59),
 (13260, 60),
 (13481, 61),
 (13702, 62),
 (13923, 63),
 (14144, 64),
 (14365, 65),
 (14586, 66),
 (14807, 67),
 (15028, 68),
 (15249, 69),
 (15470, 70),
 (15691, 71),
 (15912, 72),
 (16133, 73),
 (16354, 74),
 (16575, 75),