In [1]:
import json
import logging
import time
import asyncio
import multiprocessing
import threading

### Streaming Datasources

The main goal of these datasources is to ensure that Perspective is updated with new data without blocking the main thread. `BaseDataSource` implements the core of a data source that runs on a subprocess, placing returned data in a queue where it is fetched by a Perspective running in a separate thread. This allows us to spawn multiple data sources and run them all at the same time without preventing the notebook from running new cells.

The IEX-based datasources we use will inherit from `BaseDataSource`, and implement datasource-specific logic. This base class handles management of subprocesses/threads, starting/stopping the datasource, and how data flows from the datasource into Perspective.

In [2]:
class BaseDataSource(object):
    
    def __init__(self, table, data_cleaner=None):
        """A base class for a datasource that feeds data into Perspective.
        
        Subclasses must implement `get_data`, which retrieves data and calls
        `self.queue.put(data)` for Perspective to retrieve. This prevents the
        main thread/notebook from being blocked.
        
        Args:
            table (perspective.Table) a Perspective table instance that will
                be updated with new data.
                
        Keyword Args:
            data_cleaner (func) A function that receives data input and
                returns cleaned data before the Perspective table is updated.
        """
        self.table = table  # An already-created Perspective table
        self.queue = multiprocessing.Queue()
        self._process = multiprocessing.Process(target=self.get_data)
        self._table_updater = threading.Thread(target=self.table_updater)
        self._data_cleaner = data_cleaner
        
    def start(self):
        """Start both the data getter subprocess and the table updater sub thread."""
        self._process.start()
        self._table_updater.start()
        logging.info("[DataSource] Started")
        
    def stop(self):
        """Stop fetching data and updating Perspective."""
        self._process.terminate()
        self.queue.put_nowait("STOP")
        self._table_updater.join()
        logging.info("[DataSource] Stopped")
        
    def get_data(self):
        """A method that gets data and submits it to `self.queue` - must be implemented
        by the subclass."""
        raise NotImplementedError("Not implemented!")

    def table_updater(self):
        """Update the Perspective Table in a subthread, using the event loop to manage
        the call to `update()`."""
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        task = loop.create_task(self._update())
        loop.run_until_complete(task)
    
    async def _update(self):
        """Update the Perspective table with data, cleaning it if necessary."""
        while True:
            tick = self.queue.get()
            if tick == "STOP":
                logging.info("[DataSource] Stopping update thread")
                return True
            elif tick:
                if self._data_cleaner:
                    tick = self._data_cleaner(tick)
                self.table.update(tick)

### Using the IEX API

For this tutorial, we'll be using the [IEX Cloud API](https://iexcloud.io) to provide us with streaming market data. The API provides a whole gamut of finance-related data, including live quotes, trade books, news stories, and market research, as well as foreign exchange and cryptocurrency data.

In `jupytercon.ipynb`, I've set up a fictional stock portfolio of tickers and quantities, and we will use the IEX Cloud API to fetch live quotes for each symbol, as well as charting data so we can see the performance of our fictional portfolio over time. This allows us to jump into one of the main use cases for streaming data, and examine the capabilities of Perspective within a wide sandbox.

To interact with the IEX Cloud API, we'll be using [pyEX](https://pyex.readthedocs.io/en/latest/), a Python interface with the IEX Cloud API that implements easy-to-use methods to get data from the IEX API. We'll be passing those methods into our datasources, calling on them to get data (either through Server-Sent Events (SSE), polling the server, or one-off requests), and passing the data into Perspective for analysis.

In [3]:
class IEXSSEDataSource(BaseDataSource):
    
    def __init__(self, table, iex_source, **kwargs):
        """A datasource that consumes data from IEX's SSE (Server-Sent Events)
        API, such as `pyEX.topsSSE` or `pyEX.cryptoQuotesSSE.
        
        Args:
            iex_source (func) a function from pyEX that returns an SSE
                data source, which will be called with **kwargs.
                
        Keyword Args:
            kwargs (dict) keyword arguments which will be applied when calling
                the IEX data source. See the pyEX documentation for more details.
        """
        data_cleaner = kwargs.pop("data_cleaner")
        super(IEXSSEDataSource, self).__init__(table, data_cleaner=data_cleaner)
        self._iex_source = iex_source
        self._iex_source_kwargs = kwargs
        
        # Replace `on_data` kwarg with Queue.put_nowait, which
        # will not block the process and place received data in
        # the queue.
        self._iex_source_kwargs["on_data"] = self.queue.put_nowait
    
    def get_data(self):
        """Call the SSE datasource with the provided kwargs, placing data
        into `self.queue`."""
        logging.info("[IEXSSEDataSource] Started")
        self._iex_source(**self._iex_source_kwargs)

In [4]:
from zlib import crc32

def bytes_to_float(b):
    return float(crc32(b) & 0xffffffff) / 2**32

def str_to_float(s, encoding="utf-8"):
    return bytes_to_float(s.encode(encoding))

class IEXIntervalDataSource(BaseDataSource):
    
    def __init__(self, table, iex_source, interval=1, should_hash=False, **kwargs):
        """A datasource that consumes data from IEX once per `interval` seconds.
        
        Because IEX does not provide streaming APIs for simple quotes, OHLC,
        etc, use this datasource to query the API for updates. If `should_hash` is
        True, the datasource will hash incoming ticks and discard duplicates.
        
        Args:
            iex_source (func) a function from pyEX that returns a single piece of
                data, such as `quote` or `batch`.
        
        Keyword Args:
            kwargs (dict) keyword arguments which will be applied when calling
                the IEX data source. See the pyEX documentation for more details.
        """
        data_cleaner = kwargs.pop("data_cleaner")
        super(IEXIntervalDataSource, self).__init__(table, data_cleaner=data_cleaner)
        self._iex_source = iex_source
        self._iex_source_kwargs = kwargs
        self._interval = interval
        self._process = multiprocessing.Process(target=self.get_data)
        
        # Hash the dataset so it does not repeatedly enqueue identical datasets.
        self._should_hash = should_hash
        self._last_hash = ""
    
    def get_data(self):
        """Retrieve data every `interval` seconds, hashing and discarding duplicates
        if necessary."""
        logging.info("[IEXIntervalDataSource] started: fetching every %d seconds", self._interval)
        def _get(self):
            data = self._iex_source(**self._iex_source_kwargs)
            if data:
                if self._should_hash:
                    hashed = str_to_float(json.dumps(data, sort_keys=True))
                    if hashed != self._last_hash:
                        # Only enqueue new data, not data we've already gotten.
                        self.queue.put_nowait(data)
                        self._last_hash = hashed
                else:
                    self.queue.put_nowait(data)
            time.sleep(self._interval)
            _get(self)
        _get(self)

In [5]:
class IEXStaticDataSource(BaseDataSource):
    
    def __init__(self, table, iex_source, **kwargs):
        """A data source for static data - calls the source once, and then stops.
        
        Good for non-streaming data such as charts and fundamentals."""
        data_cleaner = kwargs.pop("data_cleaner")
        super(IEXStaticDataSource, self).__init__(table, data_cleaner=data_cleaner)
        self._iex_source = iex_source
        self._iex_source_kwargs = kwargs
    
    def get_data(self):
        data = self._iex_source(**self._iex_source_kwargs)
        self.queue.put_nowait(data)