In [1]:
import random
import itertools
import threading
import queue
from concurrent.futures import ThreadPoolExecutor, as_completed

import requests
import pandas as pd
import speaker_buddy as buddy

### Exchanges

In [2]:
EXCHANGES= [
    'bitfinex',
    'coinbase-pro',
    'bitstamp',
    'kraken',
    'cexio',
    'okcoin',
    'bitmex',
    'mexbt',
    'huobi',
    'poloniex',
    'bittrex',
    'okex',
    'hitbtc'
]

### Symbols

In [3]:
SYMBOLS = ['btc', 'ltc', 'eth']

### Combinations

In [4]:
ALL_COMBINATIONS = list(itertools.product(EXCHANGES, SYMBOLS))

### Random Combinatios

In [5]:
combinations = [random.choice(ALL_COMBINATIONS) for _ in range(10)]

### Random Dates

In [6]:
ALL_DATES = [d.strftime('%Y-%m-%d') for d in pd.date_range('2019-01-01', '2019-07-20', freq='D')]

In [7]:
dates = [random.choice(ALL_DATES) for _ in range(5)]

### Summary

In [8]:
dates

['2019-01-11', '2019-03-07', '2019-03-31', '2019-04-04', '2019-02-25']

In [9]:
combinations

[('huobi', 'btc'),
 ('bitfinex', 'ltc'),
 ('bittrex', 'btc'),
 ('cexio', 'eth'),
 ('kraken', 'btc'),
 ('okex', 'ltc'),
 ('poloniex', 'btc'),
 ('hitbtc', 'ltc'),
 ('bittrex', 'eth'),
 ('okex', 'btc')]

In [10]:
data = list(itertools.product(combinations, dates))
data[:3]

[(('huobi', 'btc'), '2019-01-11'),
 (('huobi', 'btc'), '2019-03-07'),
 (('huobi', 'btc'), '2019-03-31')]

In [11]:
URL = 'http://localhost:5000/price/{exchange}/{symbol}/{date}'

In [12]:
price_urls = [
        URL.format(exchange=exchange, symbol=symbol, date=date)
    for (exchange, symbol), date in data]

In [13]:
price_urls[:3]

['http://localhost:5000/price/huobi/btc/2019-01-11',
 'http://localhost:5000/price/huobi/btc/2019-03-07',
 'http://localhost:5000/price/huobi/btc/2019-03-31']

In [14]:
len(price_urls)

50

### A simple test

In [15]:
resp = requests.get(price_urls[0])

In [16]:
resp.json()

{'exchange': 'huobi',
 'symbol': 'btc',
 'open': 3966.68,
 'high': 3994,
 'low': 3542.03,
 'close': 3585.36,
 'volume': 8606.622285621284,
 'day': '2019-01-11'}

We'll do the same thing for all of them.

---

# Multi threaded

In [17]:
THREAD_COUNT = 5

In [18]:
import queue

In [19]:
url_queue = queue.Queue()
results_queue = queue.Queue()
for url in price_urls:
    url_queue.put(url)

In [20]:
import time
class PriceProcessThread(threading.Thread):
    def __init__(self, url_queue, results_queue, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.url_queue = url_queue
        self.results_queue = results_queue
        
    def run(self):
        while True:
            url = self.url_queue.get()
            resp = requests.get(url)
            self.results_queue.put((url, resp.json()))
            self.url_queue.task_done()

In [21]:
for _ in range(THREAD_COUNT):
    worker = PriceProcessThread(url_queue, results_queue, daemon=True)
    worker.start()

In [22]:
url_queue.join()

In [23]:
url_queue.qsize()

0

In [24]:
results_queue.qsize()

50

In [25]:
EXCHANGES = {}

In [26]:
while not results_queue.empty():
    url, result = results_queue.get()
    (*_, exchange, symbol, date) = url.split('/')
    print(f"{date} - {exchange} - {symbol.upper()}: {(result or {}).get('close')}")

2019-03-07 - huobi - BTC: 3861.99
2019-01-11 - huobi - BTC: 3585.36
2019-02-25 - huobi - BTC: 3747.86
2019-03-31 - huobi - BTC: 4109.92
2019-04-04 - huobi - BTC: 4944
2019-03-07 - bitfinex - LTC: None
2019-02-25 - bitfinex - LTC: None
2019-01-11 - bitfinex - LTC: None
2019-04-04 - bitfinex - LTC: None
2019-03-31 - bitfinex - LTC: None
2019-02-25 - bittrex - BTC: 3728.322
2019-01-11 - bittrex - BTC: 3628
2019-03-07 - bittrex - BTC: 3852.815
2019-04-04 - bittrex - BTC: 4970.639
2019-03-31 - bittrex - BTC: 4094.4440000000004
2019-03-07 - cexio - ETH: 139.32
2019-01-11 - cexio - ETH: 129.76
2019-03-31 - cexio - ETH: 143.64
2019-02-25 - cexio - ETH: 132.7
2019-04-04 - cexio - ETH: 161.7
2019-01-11 - kraken - BTC: 3623.5
2019-03-31 - kraken - BTC: 4090.9
2019-04-04 - kraken - BTC: 4972.2
2019-03-07 - kraken - BTC: 3851.7
2019-02-25 - kraken - BTC: 3732.2
2019-01-11 - okex - LTC: 32.8011
2019-03-31 - okex - LTC: 60.44
2019-03-07 - okex - LTC: 55.3625
2019-04-04 - okex - LTC: 85.2
2019-02-25 -

## `concurrent.futures`

The code we have produced includes multiple of the issues we mentioned before. `concurrent.futures` is a library that includes a high level abstraction for concurrency, with many issues already solved. The library is very well built and its simplicity relies in 2 main abstractions:

* The `Executor`: in charge of scheduling and synchronizing jobs
* A `Future`: an object encapsulating the state of a concurrent computation

In [27]:
import time

In [28]:
from concurrent.futures import ThreadPoolExecutor

With the `Executor` we'll schedule multiple tasks, we'll define the task as a simple function:

In [29]:
def get_price(url):
    time.sleep(.25)
    resp = requests.get(url)
    return resp.json()

We want to run this for each URL:

In [30]:
get_price(price_urls[0])

{'exchange': 'huobi',
 'symbol': 'btc',
 'open': 3966.68,
 'high': 3994,
 'low': 3542.03,
 'close': 3585.36,
 'volume': 8606.622285621284,
 'day': '2019-01-11'}

Now the code. First we create an executor:

In [31]:
executor = ThreadPoolExecutor(max_workers=THREAD_COUNT)

We "schedule" all the jobs with the `submit()` method. Each `submit()` call returns a `Future` (that we'll use later).

In [32]:
futures = [executor.submit(get_price, url) for url in price_urls]

In [33]:
futures

[<Future at 0x115c4abd0 state=running>,
 <Future at 0x115c4a1d0 state=running>,
 <Future at 0x115c5ca10 state=running>,
 <Future at 0x11536fa10 state=running>,
 <Future at 0x115c4aa90 state=running>,
 <Future at 0x115c5f5d0 state=pending>,
 <Future at 0x115c5f890 state=pending>,
 <Future at 0x115c5f810 state=pending>,
 <Future at 0x115c5fa10 state=pending>,
 <Future at 0x115c4a050 state=pending>,
 <Future at 0x115c5fcd0 state=pending>,
 <Future at 0x115c5f850 state=pending>,
 <Future at 0x115c5f7d0 state=pending>,
 <Future at 0x115c5fe10 state=pending>,
 <Future at 0x115c5fad0 state=pending>,
 <Future at 0x115c5fdd0 state=pending>,
 <Future at 0x115c5f8d0 state=pending>,
 <Future at 0x115c5f710 state=pending>,
 <Future at 0x115c39150 state=pending>,
 <Future at 0x115c39d50 state=pending>,
 <Future at 0x115c39ed0 state=pending>,
 <Future at 0x115c39cd0 state=pending>,
 <Future at 0x115c39810 state=pending>,
 <Future at 0x115c39890 state=pending>,
 <Future at 0x115c39850 state=pending>,


Most of the `Future`s will be _pending_. They're still being executed by the `Executor`.

In [34]:
f = futures[-1]

In [35]:
f.done()

False

In [36]:
f.running()

False

In [37]:
f.result()  # will block until a result arrives

We can use the `as_completed` module-level function to produce start receiving results as they're completed:

In [38]:
for future in as_completed(futures):
    print(future.result())

{'exchange': 'poloniex', 'symbol': 'btc', 'open': 4102.89931454, 'high': 4140, 'low': 4052.57725231, 'close': 4105.3, 'volume': 205.32939073, 'day': '2019-03-31'}
{'exchange': 'poloniex', 'symbol': 'btc', 'open': 4117.60261769, 'high': 4193.17030086, 'low': 3708.3500136, 'close': 3750.94339887, 'volume': 1489.19861538, 'day': '2019-02-25'}
{'exchange': 'hitbtc', 'symbol': 'ltc', 'open': 38.2623, 'high': 38.7598, 'low': 31.9, 'close': 33.033, 'volume': 323009.575, 'day': '2019-01-11'}
{'exchange': 'cexio', 'symbol': 'eth', 'open': 144.32, 'high': 146.55, 'low': 141.73, 'close': 143.64, 'volume': 1026.975561, 'day': '2019-03-31'}
{'exchange': 'bittrex', 'symbol': 'btc', 'open': 4089.9990000000003, 'high': 4128.71, 'low': 4044.284, 'close': 4094.4440000000004, 'volume': 420.27000525, 'day': '2019-03-31'}
{'exchange': 'hitbtc', 'symbol': 'ltc', 'open': 76.792, 'high': 98.1019, 'low': 76.032, 'close': 85.1948, 'volume': 1013312.454, 'day': '2019-04-04'}
{'exchange': 'okex', 'symbol': 'ltc',

**Important!** We must shutdown the Executor once we're done using it. This way it'll free the resources:

In [39]:
executor.shutdown()

### A better approach

The `Executor` class can act as a Context Manager (`with`). That allows us to simplify the above code in:

In [40]:
with ThreadPoolExecutor() as executor:
    futures = [executor.submit(get_price, url) for url in price_urls]

    for future in as_completed(futures):
        print(future.result())

{'exchange': 'huobi', 'symbol': 'btc', 'open': 4117.69, 'high': 4200.02, 'low': 3681, 'close': 3747.86, 'volume': 8060.044829426483, 'day': '2019-02-25'}
{'exchange': 'huobi', 'symbol': 'btc', 'open': 3857.27, 'high': 3892.51, 'low': 3815.43, 'close': 3861.99, 'volume': 8936.483057356172, 'day': '2019-03-07'}
None
{'exchange': 'huobi', 'symbol': 'btc', 'open': 4103.96, 'high': 4138, 'low': 4054.86, 'close': 4109.92, 'volume': 9836.147282091302, 'day': '2019-03-31'}
{'exchange': 'huobi', 'symbol': 'btc', 'open': 3966.68, 'high': 3994, 'low': 3542.03, 'close': 3585.36, 'volume': 8606.622285621284, 'day': '2019-01-11'}
{'exchange': 'huobi', 'symbol': 'btc', 'open': 4857.79, 'high': 5276.91, 'low': 4805, 'close': 4944, 'volume': 4837.8328254302705, 'day': '2019-04-04'}
{'exchange': 'bittrex', 'symbol': 'btc', 'open': 4902.46, 'high': 5340.56, 'low': 4810, 'close': 4970.639, 'volume': 3625.01819298, 'day': '2019-04-04'}
None
{'exchange': 'cexio', 'symbol': 'eth', 'open': 165.32, 'high': 179