In [1]:
import time
import threading
import random
from threading import Thread
from datetime import datetime, timedelta

import requests

## The Producer / Consumer model

Let's pick up from our crypto price example, in which we wanted to check prices of multiple exchanges, multiple days. We said:

> 10 exchanges, 3 symbols, for a total of 30 days.

In [2]:
10 * 3 * 30

900

A total of 900 requests... We said we couldn't just start 900 threads at the same time. The solution is to use the "producer/consumer" model, in which some threads _produce_ tasks to do and put them in a shared collection, and other threads "consume" the tasks from said collection and do the work.

We'll create a Pool of threads, let's say 10, which will be constantly consuming the pending tasks and consulting the prices:

<center>
    <img src="img/producer-consumer-model.png"/>
</center>

We'll use the python `Queue` class, from the `queue` module, as our shared collection; a **thread safe** FIFO queue. The `queue` module has multiple queues available (LIFO, priority, bounded), but we'll just use `SimpleQueue`, which is similar to `Queue` without the max element boundary.

Here are the basic methods of a Queue:

In [3]:
import queue
from queue import Queue

In [12]:
q = Queue()

In [13]:
q.empty()

True

In [14]:
q.put('A')

In [15]:
q.put('B')

In [16]:
q.put('C')

In [17]:
q.empty()

False

In [18]:
q.qsize()

3

In [19]:
q.get()

'A'

In [20]:
q.get()

'B'

In [21]:
q.get()

'C'

In [22]:
q.empty()

True

Queues are specially designed to work with multithreaded applications in a producer/consumer model, if we try to `get` from the queue now that is empty, it'll block **waiting for more "work" to be added** (we'll have to interrupt it):

In [24]:
q.get()

KeyboardInterrupt: 

The `Queue.get` method has a similar interface as the Lock one. It can return immediately raising a `queue.Empty` exception:

In [29]:
q.get(block=False)

Empty: 

Or a timeout (also raising an exception):

In [31]:
q.get(timeout=1)

Empty: 

Queues can also be used to "limit" the concurrency level of your program. You can set an upper limit of how many max elements can be placed in the queue. When the limit is reached, the `put` operation will block. The queue is "full":

In [32]:
q = Queue(maxsize=1)

In [33]:
q.put('A')

In [34]:
q.qsize()

1

This will block:

In [35]:
q.put('B')

KeyboardInterrupt: 

Similarly to `get`, the `put` method accepts block and timeout parameters:

In [36]:
q.put('B', block=False)

Full: 

In [37]:
q.put('B', timeout=1)

Full: 

#### Tracking work done

Queues additionally include a useful method `task_done()` that is used to track how many tasks have been completed. In pseudocode, the process is usually:

```python
def worker(q):
    try:
        task = q.get(block=False)
    except queue.Empty:
        print("All work done. Exiting")
    do_work(task)
    q.task_done()  # Notify the task was successfully finished 
```

#### A Real example

We'll now use our knowledge of queues to check multiple prices from our crypto server using threads. We'll start first with the list of exchanges we want to use:

In [4]:
BASE_URL = "http://localhost:5000"

In [5]:
resp = requests.get(f"{BASE_URL}/exchanges")

In [6]:
resp

<Response [200]>

In [7]:
EXCHANGES = resp.json()
EXCHANGES

['bitfinex',
 'bitstamp',
 'bittrex',
 'cexio',
 'coinbase-pro',
 'hitbtc',
 'huobi',
 'kraken',
 'mexbt',
 'okex',
 'poloniex']

We'll use all the exchanges available in the server. We'll ask for 31 days, from March 1st to May 31st:

In [8]:
START_DATE = datetime(2020, 3, 1)

In [9]:
DATES = [(START_DATE + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(31)]

In [10]:
DATES

['2020-03-01',
 '2020-03-02',
 '2020-03-03',
 '2020-03-04',
 '2020-03-05',
 '2020-03-06',
 '2020-03-07',
 '2020-03-08',
 '2020-03-09',
 '2020-03-10',
 '2020-03-11',
 '2020-03-12',
 '2020-03-13',
 '2020-03-14',
 '2020-03-15',
 '2020-03-16',
 '2020-03-17',
 '2020-03-18',
 '2020-03-19',
 '2020-03-20',
 '2020-03-21',
 '2020-03-22',
 '2020-03-23',
 '2020-03-24',
 '2020-03-25',
 '2020-03-26',
 '2020-03-27',
 '2020-03-28',
 '2020-03-29',
 '2020-03-30',
 '2020-03-31']

And for all available symbols:

In [11]:
resp = requests.get(f"{BASE_URL}/symbols")

In [12]:
resp

<Response [200]>

In [13]:
SYMBOLS = resp.json()
SYMBOLS

['btc', 'eth', 'ltc']

In total, we'll check the following number of prices:

In [14]:
len(EXCHANGES) * len(SYMBOLS) * len(DATES)

1023

Let's first write the function:

In [15]:
def check_price(exchange, symbol, date, base_url=BASE_URL):
    resp = requests.get(f"{base_url}/price/{exchange}/{symbol}/{date}")
    return resp.json()

In [16]:
exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
exchange, symbol, date

('okex', 'btc', '2020-03-13')

In [17]:
check_price(exchange, symbol, date)

{'exchange': 'okex',
 'symbol': 'btc',
 'open': 7934.1,
 'high': 7961.8,
 'low': 4399.1,
 'close': 4797,
 'volume': 238390.15825014,
 'day': '2020-03-13'}

We'll now create our queue:

In [98]:
tasks = Queue()

And we'll initialize it with all the "tasks" to finish:

In [99]:
for exchange in EXCHANGES:
    for date in DATES:
        for symbol in SYMBOLS:
            task = {
                'exchange': exchange,
                'symbol': symbol,
                'date': date,
            }
            tasks.put(task)

In [100]:
tasks.qsize()

1023

This is the _task_ dictionary that will be consumed by our workers:

In [101]:
task

{'exchange': 'poloniex', 'symbol': 'ltc', 'date': '2020-03-31'}

We'll create a specialized class to store the results:

In [102]:
class PriceResults:
    def __init__(self):
        results = {}
        for exchange in EXCHANGES:
            results[exchange] = {}
            for date in DATES:
                results[exchange][date] = {}
                for symbol in SYMBOLS:
                    results[exchange][date][symbol] = None
        self._results = results
        
    def put_price(self, price, exchange, symbol, date):
        self._results[exchange][date][symbol] = price

    def get_price(self, exchange, symbol, date):
        return self._results[exchange][date][symbol]

**Warning!** We must be sure to use a thread safe collection if multiple threads are writing at the same time. In this case, we don't have duplicated tasks, which means that only 1 thread will write at a given particular spot. If that wasn't the case, we could also use a thread-safe queue to store the results.

Now, let's define the worker function that will consume the queue and check the price:

In [103]:
def worker(task_queue, results):
    while True:
        try:
            task = task_queue.get(block=False)
        except queue.Empty:
            print('Queue is empty! My work here is done. Exiting.')
            return
        exchange, symbol, date = task['exchange'], task['symbol'], task['date']
        price = check_price(exchange, symbol, date)
        results.put_price(price, exchange, symbol, date)
        task_queue.task_done()

Now it's time to initialize our workers. How many is the limit? It's very hard to know upfront which will be the limit of the current system in terms of performance. The `concurrent.futures` package uses by default the following formula: `min(32, os.cpu_count() + 4)`. So that's **AT LEAST** 32 threads. We can use that number to try things out, but in this point is when profiling is necessary.

In [104]:
results = PriceResults()

In [105]:
MAX_WORKERS = 32

In [106]:
threads = [Thread(target=worker, args=(tasks, results)) for _ in range(MAX_WORKERS)]

And now we're ready! We can start the threads and wait for the queue to empty:

In [107]:
[t.start() for t in threads];

In [108]:
tasks.join()

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.


Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.

Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.Queue is empty! My work here is done. Exiting.
Queue is empty! My work here is done. Exiting.
Queue is empty

In [111]:
tasks.qsize()

0

In [110]:
any([t.is_alive() for t in threads])

False

And that's it! Our workers have processed all the tasks available. Let's check a few samples:

In [141]:
for _ in range(5):
    exchange, symbol, date = random.choice(EXCHANGES), random.choice(SYMBOLS), random.choice(DATES)
    price = results.get_price(exchange, symbol, date)
    if price:
        print(f"{exchange.title():<20} price of {symbol.upper():^5} on {date:^10} was: ${round(price['close'], 4):>9}")
    else:
        print(f"No price of {symbol.upper()} for {exchange.title()} on {date}")

No price of BTC for Mexbt on 2020-03-23
Okex                 price of  LTC  on 2020-03-13 was: $    29.88
Poloniex             price of  LTC  on 2020-03-22 was: $     38.3
Coinbase-Pro         price of  LTC  on 2020-03-25 was: $     40.7
Okex                 price of  BTC  on 2020-03-18 was: $   5314.1
