In [16]:
# Basic imports
import nbimporter
import logging
import json
import random
import time
import asyncio
from datetime import date, datetime

# Library imports
import pandas as pd
import numpy as np
import pyarrow as pa

# pyEX is an easy-to-use IEX API interface built for Python
import pyEX

# The main course
import perspective

logging.basicConfig(format="%(asctime)s %(message)s", level=logging.INFO)

# Streaming Data Sources

Inside `datasources.ipynb`, I've defined a few streaming datasources that will feed live data to Perspective. 

Each datasource runs on its own subprocess and subthread in order to not block the main Jupyter thread from running, so cells can still be added and evaluated as normal. In the background, the datasource will fetch data, clean it (if necessary), and update the Perspective tables—which will display the new results in each widget in the notebook.

In [9]:
from datasources import IEXIntervalDataSource, IEXSSEDataSource, IEXStaticDataSource

Importing Jupyter notebook from datasources.ipynb


In [10]:
# Create a pyEX client with the token - this is just an example sandbox token.
token = "Tpk_ecc89ddf30a611e9958142010a80043c"
client = pyEX.Client(api_token=token, version="sandbox")

In [13]:
batch_schema = {
    "symbol": str,
    "companyName": str,
    "open": float,
    "openTime": datetime,
    "close": float,
    "closeTime": datetime,
    "high": float,
    "highTime": datetime,
    "low": float,
    "lowTime": datetime,
    "latestPrice": float,
    "latestUpdate": datetime,
    "latestVolume": int,
    "volume": int
}
last_schema = {
    "symbol": str,
    "price": float,
    "time": datetime,
    "size": int,
}
tops_schema = {
    "symbol": str,
    "bidSize": int,
    "bidPrice": float,
    "askSize": int,
    "askPrice": float,
    "volume": int,
    "lastSalePrice": float,
    "lastSaleSize": int,
    "lastSaleTime": datetime,
    "lastUpdated": datetime,
    "sector": str,
    "securityType": str,
    "seq": int
}
holdings_schema = {
    "symbol": str,
    "quantity": int,
    "price": float,
    "time": datetime
}
charts_schema = {
    "date": date,
    "open": float,
    "high": float,
    "low": float,
    "close": float,
    "volume": int,
    "symbol": str,
    "quantity": int
}

In [14]:
symbols = ["AAPL", "MSFT", "AMZN", "TSLA", "SPY", "SNAP", "ZM", "JPM"]

In [15]:
holdings_table = perspective.Table(holdings_schema, index="symbol")
holdings = {
    "AAPL": 20,
    "MSFT": 5,
    "AMZN": 2,
    "TSLA": 10,
    "SPY": 5,
    "SNAP": 20,
    "ZM": 10,
    "JPM": 5
}
holdings_table.update({
    "symbol": symbols,
    "quantity": [holdings[symbol] for symbol in symbols]
})

holdings_total_table = perspective.Table(holdings_schema)
holdings_view = holdings_table.view()

def update_total(port, delta):
    holdings_total_table.update(delta)

holdings_view.on_update(update_total, mode="row")

In [None]:
save_holdings_view = holdings_total_table.view(
    columns=["symbol", "quantity", "value", "time"],
    computed_columns=[{
        "column": "value", 
        "computed_function_name": "*",
        "inputs": ["quantity", "price"]
    }]
)

# TODO: clean this up
async def _save():
    while True:
        name = "portfolio_value_{0:%Y_%m_%d}.arrow".format(datetime.today())
        with open(name, "wb") as value_arrow:
            value_arrow.write(save_holdings_view.to_arrow())
        logging.info("Saved %d rows to %s", holdings_total_table.size(), name)
        await asyncio.sleep(60)

def save_to_arrow():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    task = loop.create_task(_save())
    loop.run_until_complete(task)
    
save_thread = threading.Thread(target=save_to_arrow)

In [None]:
save_thread.start()

In [None]:
holdings_widget = perspective.PerspectiveWidget(
    holdings_table,
    aggregates={
        "value": "sum",
        "price": "last"
    },
    row_pivots=["symbol"],
    columns=["price", "quantity", "value"],
    sort=[["value", "desc"]],
    computed_columns=[{
        "column": "value", 
        "computed_function_name": "*",
        "inputs": ["quantity", "price"]
    }]
)
holdings_widget

In [None]:
holdings_total_widget = perspective.PerspectiveWidget(
    holdings_total_table,
    plugin="y_line",
    row_pivots=["time"],
    column_pivots=["symbol"],
    aggregates={
        "quantity": "last",
        "price": "last"
    },
    columns=["value"],
    computed_columns=[{
        "column": "value", 
        "computed_function_name": "*",
        "inputs": ["quantity", "price"]
    }]
)
holdings_total_widget

In [None]:
quotes_table = perspective.Table(last_schema)
quotes_view = quotes_table.view()

def update_holdings(port, delta):
    holdings_table.update(delta)
    
quotes_view.on_update(update_holdings, mode="row")

In [None]:
quotes_widget = perspective.PerspectiveWidget(quotes_table, row_pivots=["symbol"], columns=["price"], aggregates={"price": "last"}, sort=[["price", "desc"]])

In [None]:
quotes_widget

In [None]:
def clean_quote(tick):
    for t in tick:
        t["time"] = datetime.now()
    return tick

In [None]:
quotes = IEXIntervalDataSource(table=quotes_table, iex_source=client.last, data_cleaner=clean_quote, symbols=symbols)

In [None]:
quotes.start()

In [None]:
quotes.stop()

In [None]:
charts_table = perspective.Table(charts_schema)

In [None]:
ohlc_config = {
    "plugin": "d3_ohlc",
    "row_pivots": ["date"],
    "columns": ["open", "close", "high", "low"],
    "aggregates": {"quantity": "last"},
    "filters": [["symbol", "==", "SPY"]],
    "computed_columns": [{
        "column": "value", 
        "computed_function_name": "*",
        "inputs": ["quantity", "close"]
    }]
}

value_config = {
    "plugin": "y_line",
    "row_pivots": ["date"],
    "column_pivots": ["symbol"],
    "columns": ["value"],
    "aggregates": {"quantity": "last"},
    "computed_columns": [{
        "column": "value", 
        "computed_function_name": "*",
        "inputs": ["quantity", "close"]
    }]
}

charts_widget = perspective.PerspectiveWidget(
    charts_table,
    **ohlc_config
)
charts_widget

In [None]:
def clean_charts(tick):
    out = []
    for k, v in tick.items():
        chart = v["chart"]
        for c in chart:
            c["symbol"] = k
            c["quantity"] = holdings[k]
            out.append(c)
    return out

In [None]:
# range_: 1d, 1m, 1y, etc.
charts = IEXStaticDataSource(charts_table, iex_source=client.batch, data_cleaner=clean_charts, symbols=symbols, fields="chart", range_="1y")

In [None]:
charts.start()

In [None]:
charts.stop()

In [None]:
with open("portfolio_value_{0:%Y_%m_%d}.arrow".format(datetime.today()), "rb") as arr:
    w = perspective.PerspectiveWidget(arr.read(), sort=[["time", "desc"]])
    display(w)

In [17]:
# TODO: remember to mention that all this code can be modularized and run as a tornado server for perspective in the browser