# Multiprocessing IPC with Multiple Queues

**Problem:**
- Process1: Calculate square of a number in a process.
- Process2: Calculate the square root of the number we get from the process1.

**Archetecting The Solution**
![IPC Multiple Queues](./IPCMultipleQueues.jpg)

**Resources:**
- [Multiprocessing Queue Docs](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)

# Solution 1:  Wait between the processes starting time.
- Not a recommended solution.

In [1]:
from multiprocessing import Process, Lock
from multiprocessing import Queue

import math, time

In [2]:
def calculate_square(sq_q, sqrt_q):
    """Calculate the square of number in queue
        @params: sq_q <Queue>, Queue containing the number whose square is to be calc.
        @params: sqrt_q <Queue>, Queue containing the number whose square root is to be calc.
    """
    while not sq_q.empty():
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)

def calculate_sqroot(sqrt_q, result_q):
    """Calculate the sq root of number in queue
       @params: sqrt_q <Queue>, Queue containing the number whose sq_root is to be calc.
       @params: result_q <Queue>, Queue containing the final result
    """
    while not sqrt_q.empty():
        itm = sqrt_q.get()
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)

In [3]:
sq_q = Queue()
sqrt_q = Queue()
result_q = Queue()

In [4]:
for i in range(5, 20):
    sq_q.put(i)

In [5]:
p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))

In [6]:
p_sq.start()
time.sleep(0.1) # wait for 0.1 second
p_sqrt.start()

Calculating sq of: 5
Calculating sq of: 6
Calculating sq of: 7
Calculating sq of: 8
Calculating sq of: 9
Calculating sqrt of: 25Calculating sq of: 10

Calculating sq of: 11
Calculating sqrt of: 36Calculating sq of: 12

Calculating sqrt of: 49Calculating sq of: 13
Calculating sqrt of: 64
Calculating sq of: 14

Calculating sqrt of: 81Calculating sq of: 15

Calculating sq of: 16Calculating sqrt of: 100

Calculating sqrt of: 121Calculating sq of: 17

Calculating sq of: 18
Calculating sq of: 19Calculating sqrt of: 144

Calculating sqrt of: 169
Calculating sqrt of: 196
Calculating sqrt of: 225
Calculating sqrt of: 256
Calculating sqrt of: 289
Calculating sqrt of: 324
Calculating sqrt of: 361


In [7]:
p_sq.join()
p_sqrt.join()

In [8]:
while not result_q.empty():
    print(result_q.get())

5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
17.0
18.0
19.0


# Solution 2: Wait untill the result queue is full

**From the Docs:**
- [multiprocessing.Queue([maxsize])](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Queue)

- **Queue.full()**
> Return True if the queue is empty, False otherwise. Because of multithreading/multiprocessing semantics, this is not reliable.


- **Queue.get()**
> Remove and return an item from the queue. If **optional args block** is True (the default) and **timeout is None (the default),** block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the queue.Empty exception if no item was available within that time. Otherwise (block is False), return an item if one is immediately available, else raise the queue.Empty exception (timeout is ignored in that case).

In [9]:
def calculate_square(sq_q, sqrt_q):
    while not sq_q.empty():
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)

def calculate_sqroot(sqrt_q, result_q):
    while not result_q.full(): # untill the result queue is full
        itm = sqrt_q.get() # this blocks the process unless there's a item to consume
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)

In [10]:
items = [i for i in range(5, 20)]

In [11]:
sq_q = Queue()
sqrt_q = Queue()
result_q = Queue(maxsize=len(items)) # maximum size, -> Needed to test weather queue is full

In [12]:
for i in items:
    sq_q.put(i)

In [13]:
p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))

In [14]:
p_sq.start()
p_sqrt.start()

Calculating sq of: 5
Calculating sq of: 6Calculating sqrt of: 25

Calculating sq of: 7Calculating sqrt of: 36

Calculating sqrt of: 49
Calculating sq of: 8
Calculating sq of: 9Calculating sqrt of: 64
Calculating sq of: 10
Calculating sq of: 11
Calculating sq of: 12
Calculating sq of: 13
Calculating sq of: 14

Calculating sq of: 15
Calculating sqrt of: 81Calculating sq of: 16

Calculating sqrt of: 100Calculating sq of: 17
Calculating sq of: 18

Calculating sqrt of: 121Calculating sq of: 19

Calculating sqrt of: 144
Calculating sqrt of: 169
Calculating sqrt of: 196
Calculating sqrt of: 225
Calculating sqrt of: 256
Calculating sqrt of: 289
Calculating sqrt of: 324
Calculating sqrt of: 361


In [15]:
p_sq.join()
p_sqrt.join()

In [16]:
while not result_q.empty():
    print(result_q.get())

5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
17.0
18.0
19.0


# Solution 3: Using JoinableQueue
- [Joinable Queue Docs](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.JoinableQueue)
- JoinableQueue, a Queue subclass, is a queue which additionally has **task_done()** and **join()** methods.

## Pros from APis:

**i. task_done()**
> Indicate that a formerly enqueued task is complete. Used by queue consumers. For each ``get()`` used to fetch a task, a subsequent call to ``task_done()`` tells the queue that the processing on the task is complete.


> If a ``join()`` is currently blocking, it will resume when all items have been processed (meaning that a ``task_done()`` call was received for every item that had been ``put()`` into the queue).

> Raises a ``ValueError`` if called more times than there were items placed in the queue.

**ii. join()**
> Block until all items in the queue have been gotten and processed.

> The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls ``task_done()`` to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, ``join()`` unblocks.

In [17]:
from multiprocessing import JoinableQueue

In [18]:
def calculate_square(sq_q, sqrt_q):
    while True:
        itm = sq_q.get()
        print(f"Calculating sq of: {itm}")
        square = itm * itm
        sqrt_q.put(square)
        sq_q.task_done()

def calculate_sqroot(sqrt_q, result_q):
    while True:
        itm = sqrt_q.get() # this blocks the process unless there's a item to consume
        print(f"Calculating sqrt of: {itm}")
        sqrt = math.sqrt(itm)
        result_q.put(sqrt)
        sqrt_q.task_done()

In [19]:
items = [i for i in range(5, 20)]

In [20]:
sq_q = JoinableQueue()
sqrt_q = JoinableQueue()
result_q = JoinableQueue()

In [21]:
for i in items:
    sq_q.put(i)

In [22]:
p_sq = Process(target=calculate_square, args=(sq_q, sqrt_q))
p_sqrt = Process(target=calculate_sqroot, args=(sqrt_q, result_q))

In [23]:
p_sq.start()
p_sqrt.start()

Calculating sq of: 5
Calculating sq of: 6
Calculating sq of: 7Calculating sqrt of: 25

Calculating sqrt of: 36
Calculating sq of: 8Calculating sqrt of: 49

Calculating sq of: 9Calculating sqrt of: 64

Calculating sq of: 10Calculating sqrt of: 81

Calculating sqrt of: 100Calculating sq of: 11

Calculating sq of: 12
Calculating sqrt of: 121
Calculating sq of: 13
Calculating sq of: 14
Calculating sqrt of: 144Calculating sq of: 15

Calculating sq of: 16
Calculating sq of: 17Calculating sqrt of: 169
Calculating sq of: 18
Calculating sq of: 19

Calculating sqrt of: 196
Calculating sqrt of: 225
Calculating sqrt of: 256
Calculating sqrt of: 289
Calculating sqrt of: 324
Calculating sqrt of: 361


In [24]:
sq_q.join()
sqrt_q.join()
# result_q.join() no need to join this queue

In [25]:
while not result_q.empty():
    print(result_q.get())

5.0
6.0
7.0
8.0
9.0
10.0
11.0
12.0
13.0
14.0
15.0
16.0
17.0
18.0
19.0
