In [1]:
import time
import threading
import random
from threading import Thread
from datetime import datetime, timedelta
from pprint import pprint

import requests

## The Producer / Consumer model

### The Problem
Let's pick up from our crypto price example, in which we wanted to check prices of multiple exchanges, multiple days. We said:

> 10 exchanges, 3 symbols, for a total of 30 days.

In [2]:
10 * 3 * 30

900

A total of 900 requests... We said we couldn't just start 900 threads at the same time. Furthermore, writing synchronize code is very difficult and error-prone, hard to debug, there is always an issue, deadlocks can happen, corrupted data can happen when writing synchronized code.


### Introduction to The Producer / Consumer Model

We are going to see a partial solution to many of the above prolems called the *<span style="color:yellow">"producer/consumer" model, in which some threads _produce_ tasks to do and put them in a shared collection, and other threads "consume" the tasks from said collection and do the work. The most important thing is that the "producer/consumer" model is threat safe, meaning that we don't need to synchronize it</span>*

We'll create a Pool of Consumer threads, let's say 10, which will be constantly consuming the pending tasks and consulting the prices. Let say we have 900 tasks, then each threat will consume 90 tasks:

<center>
    <img src="img/producer-consumer-model.png"/>
</center>

### Implementing Using `Queue`

We'll use the <span style="color:lightgreen">python `Queue` class, from the `queue` module, as our shared collection; a **thread safe** FIFO queue</span>. The `queue` module has multiple queues available (LIFO, priority, bounded), but we'll just use `SimpleQueue`, which is similar to `Queue` without the max element boundary.

Here are the basic methods of a Queue:

In [5]:
import queue
from queue import Queue

In [6]:
q = Queue()

In [7]:
q.empty()

True

Put 'A', 'B', 'C' in

In [8]:
q.put('A')

In [9]:
q.put('B')

In [10]:
q.put('C')

In [11]:
q.empty()

False

In [12]:
q.qsize()

3

We will get 'A', 'B', 'C' out in that order (first in first out)

In [13]:
q.get()

'A'

In [14]:
q.get()

'B'

In [15]:
q.get()

'C'

In [16]:
q.empty()

True

_Queues are specially designed to work with multithreaded applications in a producer/consumer model_.  
 - If one thread `get` the object, that object will be removed from the queue. Hence, no 2 threads will see the same task
 - If we try to `get` from the queue now that is empty, it'll block **waiting for more "work" to be added** (we'll have to interrupt it manually)

In [17]:
q.get()

KeyboardInterrupt: 

The `Queue.get` method has a similar interface as the Lock one. It can return immediately raising a `queue.Empty` exception:

In [18]:
q.get(block=False)

Empty: 

Or a timeout (also raising an exception):

In [19]:
q.get(timeout=1)

Empty: 

Queue limit: Queues can also be used to "limit" the concurrency level of your program. You can set an upper limit of how many max elements can be placed in the queue. When the limit is reached, the `put` operation will block. The queue is "full":

In [20]:
q = Queue(maxsize=1)

In [21]:
q.put('A')

In [22]:
q.qsize()

1

This will block:

In [23]:
q.put('B')

KeyboardInterrupt: 

Similarly to `get`, the `put` method accepts block and timeout parameters:

In [24]:
q.put('B', block=False)

Full: 

In [25]:
q.put('B', timeout=1)

Full: 

#### Tracking work done

Queues additionally include a useful method `task_done()` that is used to track how many tasks have been completed. In pseudocode, the process is usually:

```python
def worker(q):
    try:
        task = q.get(block=False)  # can also use a time-out
    except queue.Empty:
        print("All work done. Exiting")
        return
    do_work(task)
    q.task_done()  # Notify the task was successfully finished
```

### A Real example

We'll now use our knowledge of queues to check multiple prices from our crypto server using threads. 
First spin up the crypto server with 
```bash
cd crypto-examples
python flask_app.py
```

The app runs at port `5000`. Let's start first with the list of exchanges we want to use:

In [26]:
BASE_URL = "http://localhost:5000"

In [27]:
resp = requests.get(f"{BASE_URL}/exchanges")

In [28]:
resp

<Response [200]>

In [29]:
EXCHANGES = resp.json()
EXCHANGES

['bitfinex',
 'bitstamp',
 'bittrex',
 'cexio',
 'coinbase-pro',
 'hitbtc',
 'huobi',
 'kraken',
 'mexbt',
 'okex',
 'poloniex']

We'll use all the exchanges available in the server. We'll ask for 31 days, from March 1st to May 31st:

In [30]:
START_DATE = datetime(2020, 3, 1)

In [31]:
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]

In [32]:
DATES

['2020-03-01',
 '2020-03-02',
 '2020-03-03',
 '2020-03-04',
 '2020-03-05',
 '2020-03-06',
 '2020-03-07',
 '2020-03-08',
 '2020-03-09',
 '2020-03-10',
 '2020-03-11',
 '2020-03-12',
 '2020-03-13',
 '2020-03-14',
 '2020-03-15',
 '2020-03-16',
 '2020-03-17',
 '2020-03-18',
 '2020-03-19',
 '2020-03-20',
 '2020-03-21',
 '2020-03-22',
 '2020-03-23',
 '2020-03-24',
 '2020-03-25',
 '2020-03-26',
 '2020-03-27',
 '2020-03-28',
 '2020-03-29',
 '2020-03-30',
 '2020-03-31']

And for all available symbols:

In [34]:
resp = requests.get(f"{BASE_URL}/symbols")

In [35]:
resp

<Response [200]>

In [36]:
SYMBOLS = resp.json()
SYMBOLS

['btc', 'eth', 'ltc']

In total, we'll check the following number of prices:

In [37]:
len(EXCHANGES) * len(SYMBOLS) * len(DATES)

1023

Let's first write the function:

In [38]:
def check_price(exchange, symbol, date, base_url=BASE_URL):
    resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
    return resp.json()

In [43]:
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
exchange, symbol, date

('kraken', 'eth', '2020-03-13')

In [45]:
check_price(exchange, symbol, date)

{'exchange': 'kraken',
 'symbol': 'eth',
 'open': 195.24,
 'high': 195.24,
 'low': 102,
 'close': 107.97,
 'volume': 7901.27262233,
 'day': '2020-03-13'}

We'll now create our queue:

In [46]:
tasks = Queue()

And we'll initialize it with all the "tasks" to finish:

In [47]:
for exchange in EXCHANGES:
    for date in DATES:
        for symbol in SYMBOLS:
            task = {
                'exchange': exchange,
                'symbol': symbol,
                'date': date,
            }
            tasks.put(task)

In [48]:
tasks.qsize()

1023

This is the _task_ dictionary that will be consumed by our workers:

In [51]:
task

{'exchange': 'poloniex', 'symbol': 'ltc', 'date': '2020-03-31'}

We'll create a specialized class to store the results:

In [54]:
class PriceResults:
    def __init__(self):
        results = {}  # need to make sure that this is thread safe
        for exchange in EXCHANGES:
            results[exchange] = {}
            for date in DATES:
                results[exchange][date] = {}
                for symbol in SYMBOLS:
                    results[exchange][date][symbol] = None
        self._results = results
        
    def put_price(self, price, exchange, symbol, date):
        self._results[exchange][date][symbol] = price

    def get_price(self, exchange, symbol, date):
        return self._results[exchange][date][symbol]

<span style="color:orange"> **Warning!** We must be sure to use a thread safe collection if multiple threads are writing at the same time. Here, we are using a dictionary which is thread-safe when putting objects into it.
Furthermore, in this case, we don't have duplicated tasks, which means that only 1 thread will write at a given particular spot. If that wasn't the case, we probably have to use a thread-safe queue to store the results => we will have 2 queues, one for tasks and one for results.</span>

Now, let's define the worker function that will consume the queue and check the price:

In [53]:
def worker(task_queue, results):
    while True:
        try:
            task = task_queue.get(block=False)
        except queue.Empty:
            print('Queue is empty! My work here is done. Exiting.')
            return
        exchange, symbol, date = task['exchange'], task['symbol'], task['date']
        price = check_price(exchange, symbol, date)
        results.put_price(price, exchange, symbol, date)
        task_queue.task_done()

Now it's time to initialize our workers. How many is the limit? It's very hard to know upfront which will be the limit of the current system in terms of performance. The `concurrent.futures` package uses by default the following formula: `min(32, os.cpu_count() + 4)`. So that's **AT LEAST** 32 threads. We can use that number to try things out, but in this point is when profiling is necessary.

In [55]:
results = PriceResults()

In [56]:
MAX_WORKERS = 32

In [57]:
threads = [Thread(target=worker, args=(tasks, results)) for _ in range(MAX_WORKERS)]

And now we're ready! We can start the threads and wait for the queue to empty:

In [59]:
[t.start() for t in threads];

In [60]:
tasks.join()

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empt

In [61]:
tasks.qsize()

0

In [62]:
any([t.is_alive() for t in threads])

False

And that's it! Our workers have processed all the tasks available. Let's check a few samples:

In [63]:
for _ in range(5):
    exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
    price = results.get_price(exchange, symbol, date)
    if price:
        print(f"{exchange.title():<20} price of {symbol.upper():^5} on {date:^10} was: ${round(price['close'], 4):>9}")
    else:
        print(f"No price of {symbol.upper()} for {exchange.title()} on {date}")

Bitstamp             price of  LTC  on 2020-03-25 was: $    40.72
Bittrex              price of  LTC  on 2020-03-21 was: $       38
Cexio                price of  ETH  on 2020-03-23 was: $   123.02
Bitfinex             price of  LTC  on 2020-03-30 was: $   37.224
Okex                 price of  BTC  on 2020-03-24 was: $   6464.7
