Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
daa429f
Put fsp plotting into a couple tasks, startup speedups.
goodboy Nov 13, 2020
acf8aeb
Allocate space for 2d worth of 5s bars
goodboy Nov 13, 2020
bfcf517
Add commented ex. code for line price charts
goodboy Nov 13, 2020
9572280
Kill the tractor tree on window close.
goodboy Nov 16, 2020
c625dc9
Use new global var stack from tractor
goodboy Nov 16, 2020
f9e4e95
Tweak axis text offset and margins
goodboy Nov 16, 2020
8aede3c
Add field diffing on failed push
goodboy Nov 18, 2020
413c703
Draw bars using `QPainterPath` magic
goodboy Nov 24, 2020
f083f53
Get `QPainterPath` "append" working
goodboy Nov 24, 2020
949e9d6
Drop commented pixmap cruft
goodboy Nov 24, 2020
3e16840
Attempt to add numba typing and use `QGraphicsPathItem`
goodboy Nov 25, 2020
247b5fa
Tidy up doc string
goodboy Nov 26, 2020
1f8f2eb
Font size tweaks for low dpi
goodboy Dec 4, 2020
9710685
Left align yaxis label
goodboy Dec 9, 2020
02b7d6c
Add prepend support to shm system
goodboy Dec 9, 2020
2568a2d
First draft, make graphics work on shm primary index
goodboy Dec 10, 2020
6d50ad7
Ensure right bar x index is an int
goodboy Dec 10, 2020
fda9fcb
Add historical backfilling to ib backend
goodboy Dec 10, 2020
599b527
Port data apis to not touch primary index
goodboy Dec 12, 2020
9930430
Close app on last window exit
goodboy Dec 13, 2020
c8537d5
Port charting to new shm primary indexing
goodboy Dec 14, 2020
7ce2710
Fix axes for shm primary indexing
goodboy Dec 14, 2020
db6f774
Stick with slightly smaller fonts
goodboy Dec 14, 2020
618c2f8
More general salutation
goodboy Dec 14, 2020
df68675
Drop legacy "historical" QPicture cruft
goodboy Dec 14, 2020
b72d7d3
Drop profile calls on OHLC bars for now
goodboy Dec 16, 2020
f27d639
Port kraken to declare "wap" field
goodboy Dec 12, 2020
833a442
Disable vwap in conf, feature delay
goodboy Dec 19, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
186 changes: 160 additions & 26 deletions piker/brokers/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
built on it) and thus actor aware API calls must be spawned with
``infected_aio==True``.
"""
from contextlib import asynccontextmanager, contextmanager
from contextlib import asynccontextmanager
from dataclasses import asdict
from functools import partial
from datetime import datetime
from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable
import asyncio
import logging
Expand All @@ -32,6 +33,7 @@
import time

from async_generator import aclosing
from ib_insync.wrapper import RequestError
from ib_insync.contract import Contract, ContractDetails
from ib_insync.ticker import Ticker
import ib_insync as ibis
Expand All @@ -45,7 +47,7 @@
maybe_spawn_brokerd,
iterticks,
attach_shm_array,
get_shm_token,
# get_shm_token,
subscribe_ohlc_for_increment,
)
from ..data._source import from_df
Expand Down Expand Up @@ -86,6 +88,8 @@
'Y': 'OneYear',
}

_show_wap_in_history = False


# overrides to sidestep pretty questionable design decisions in
# ``ib_insync``:
Expand Down Expand Up @@ -128,6 +132,8 @@ def __init__(self):
'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}),
}

_enters = 0


class Client:
"""IB wrapped for our broker backend API.
Expand All @@ -142,32 +148,54 @@ def __init__(
self.ib = ib
self.ib.RaiseRequestErrors = True

# NOTE: the ib.client here is "throttled" to 45 rps by default

async def bars(
self,
symbol: str,
# EST in ISO 8601 format is required... below is EPOCH
start_date: str = "1970-01-01T00:00:00.000000-05:00",
time_frame: str = '1m',
count: int = int(2e3), # <- max allowed per query
is_paid_feed: bool = False,
start_dt: str = "1970-01-01T00:00:00.000000-05:00",
end_dt: str = "",

sample_period_s: str = 1, # ohlc sample period
period_count: int = int(2e3), # <- max per 1s sample query

is_paid_feed: bool = False, # placeholder
) -> List[Dict[str, Any]]:
"""Retreive OHLCV bars for a symbol over a range to the present.
"""
bars_kwargs = {'whatToShow': 'TRADES'}

global _enters
print(f'ENTER BARS {_enters}')
_enters += 1

contract = await self.find_contract(symbol)
bars_kwargs.update(getattr(contract, 'bars_kwargs', {}))

# _min = min(2000*100, count)
bars = await self.ib.reqHistoricalDataAsync(
contract,
endDateTime='',
# durationStr='60 S',
# durationStr='1 D',
endDateTime=end_dt,

# time history length values format:
# ``durationStr=integer{SPACE}unit (S|D|W|M|Y)``

# OHLC sampling values:
# 1 secs, 5 secs, 10 secs, 15 secs, 30 secs, 1 min, 2 mins,
# 3 mins, 5 mins, 10 mins, 15 mins, 20 mins, 30 mins,
# 1 hour, 2 hours, 3 hours, 4 hours, 8 hours, 1 day, 1W, 1M
# barSizeSetting='1 secs',

# durationStr='{count} S'.format(count=15000 * 5),
# durationStr='{count} D'.format(count=1),
# barSizeSetting='5 secs',

durationStr='{count} S'.format(count=period_count),
barSizeSetting='1 secs',

# barSizeSetting='1 min',

# time length calcs
durationStr='{count} S'.format(count=5000 * 5),
barSizeSetting='5 secs',

# always use extended hours
useRTH=False,
Expand All @@ -181,9 +209,13 @@ async def bars(
# TODO: raise underlying error here
raise ValueError(f"No bars retreived for {symbol}?")

# TODO: rewrite this faster with ``numba``
# convert to pandas dataframe:
df = ibis.util.df(bars)
return from_df(df)
return bars, from_df(df)

def onError(self, reqId, errorCode, errorString, contract) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, this i don't even think works.

breakpoint()

async def search_stocks(
self,
Expand Down Expand Up @@ -237,6 +269,8 @@ async def get_cont_fute(
"""Get an unqualifed contract for the current "continous" future.
"""
contcon = ibis.ContFuture(symbol, exchange=exchange)

# it's the "front" contract returned here
frontcon = (await self.ib.qualifyContractsAsync(contcon))[0]
return ibis.Future(conId=frontcon.conId)

Expand Down Expand Up @@ -279,10 +313,10 @@ async def find_contract(

if exch in ('PURE', 'TSE'): # non-yankee
currency = 'CAD'
if exch in ('PURE',):
if exch in ('PURE', 'TSE'):
# stupid ib...
primaryExchange = exch
exch = 'SMART'
primaryExchange = 'PURE'

con = ibis.Stock(
symbol=sym,
Expand All @@ -293,10 +327,27 @@ async def find_contract(
try:
exch = 'SMART' if not exch else exch
contract = (await self.ib.qualifyContractsAsync(con))[0]

head = await self.get_head_time(contract)
print(head)
except IndexError:
raise ValueError(f"No contract could be found {con}")
return contract

async def get_head_time(
self,
contract: Contract,
) -> datetime:
"""Return the first datetime stamp for ``contract``.

"""
return await self.ib.reqHeadTimeStampAsync(
contract,
whatToShow='TRADES',
useRTH=False,
formatDate=2, # timezone aware UTC datetime
)

async def stream_ticker(
self,
symbol: str,
Expand All @@ -309,7 +360,13 @@ async def stream_ticker(
contract = await self.find_contract(symbol)
ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts))

# define a simple queue push routine that streams quote packets
# to trio over the ``to_trio`` memory channel.

def push(t):
"""Push quotes to trio task.

"""
# log.debug(t)
try:
to_trio.send_nowait(t)
Expand Down Expand Up @@ -346,9 +403,17 @@ async def _aio_get_client(
"""
# first check cache for existing client

# breakpoint()
try:
yield _client_cache[(host, port)]
except KeyError:
if port:
client = _client_cache[(host, port)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This actually wasn't working before and makes it so IB api clients are actually reused by multiple consuming actors; avoids client throttling that would result.

else:
# grab first cached client
client = list(_client_cache.values())[0]

yield client

except (KeyError, IndexError):
# TODO: in case the arbiter has no record
# of existing brokerd we need to broadcast for one.

Expand All @@ -359,9 +424,11 @@ async def _aio_get_client(

ib = NonShittyIB()
ports = _try_ports if port is None else [port]

_err = None
for port in ports:
try:
log.info(f"Connecting to the EYEBEE on port {port}!")
await ib.connectAsync(host, port, clientId=client_id)
break
except ConnectionRefusedError as ce:
Expand All @@ -373,6 +440,7 @@ async def _aio_get_client(
try:
client = Client(ib)
_client_cache[(host, port)] = client
log.debug(f"Caching client for {(host, port)}")
yield client
except BaseException:
ib.disconnect()
Expand All @@ -385,7 +453,6 @@ async def _aio_run_client_method(
from_trio=None,
**kwargs,
) -> None:
log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!")
async with _aio_get_client() as client:

async_meth = getattr(client, meth)
Expand All @@ -402,6 +469,9 @@ async def _trio_run_client_method(
method: str,
**kwargs,
) -> None:
"""Asyncio entry point to run tasks against the ``ib_insync`` api.

"""
ca = tractor.current_actor()
assert ca.is_infected_aio()

Expand Down Expand Up @@ -530,18 +600,60 @@ def normalize(
_local_buffer_writers = {}


@contextmanager
def activate_writer(key: str):
@asynccontextmanager
async def activate_writer(key: str) -> (bool, trio.Nursery):
try:
writer_already_exists = _local_buffer_writers.get(key, False)

if not writer_already_exists:
_local_buffer_writers[key] = True

yield writer_already_exists
async with trio.open_nursery() as n:
yield writer_already_exists, n
else:
yield writer_already_exists, None
finally:
_local_buffer_writers.pop(key, None)


async def fill_bars(
first_bars,
shm,
count: int = 21,
) -> None:
"""Fill historical bars into shared mem / storage afap.

TODO: avoid pacing constraints:
https://github.com/pikers/piker/issues/128

"""
next_dt = first_bars[0].date

i = 0
while i < count:

try:
bars, bars_array = await _trio_run_client_method(
method='bars',
symbol='.'.join(
(first_bars.contract.symbol, first_bars.contract.exchange)
),
end_dt=next_dt,

)
shm.push(bars_array, prepend=True)
i += 1
next_dt = bars[0].date

except RequestError as err:
# TODO: retreive underlying ``ib_insync`` error~~
if err.code == 162:
log.exception(
"Data query rate reached: Press `ctrl-alt-f` in TWS")

await tractor.breakpoint()


# TODO: figure out how to share quote feeds sanely despite
# the wacky ``ib_insync`` api.
# @tractor.msg.pub
Expand Down Expand Up @@ -575,7 +687,9 @@ async def stream_quotes(

# check if a writer already is alive in a streaming task,
# otherwise start one and mark it as now existing
with activate_writer(shm_token['shm_name']) as writer_already_exists:
async with activate_writer(
shm_token['shm_name']
) as (writer_already_exists, ln):

# maybe load historical ohlcv in to shared mem
# check if shm has already been created by previous
Expand All @@ -588,18 +702,29 @@ async def stream_quotes(
# we are the buffer writer
readonly=False,
)
bars = await _trio_run_client_method(

# async def retrieve_and_push():
start = time.time()

bars, bars_array = await _trio_run_client_method(
method='bars',
symbol=sym,

)

if bars is None:
log.info(f"bars_array request: {time.time() - start}")

if bars_array is None:
raise SymbolNotFound(sym)

# write historical data to buffer
shm.push(bars)
shm.push(bars_array)
shm_token = shm.token

# TODO: generalize this for other brokers
# start bar filler task in bg
ln.start_soon(fill_bars, bars, shm)

times = shm.array['time']
delay_s = times[-1] - times[times != times[-1]][-1]
subscribe_ohlc_for_increment(shm, delay_s)
Expand Down Expand Up @@ -656,6 +781,7 @@ async def stream_quotes(

# real-time stream
async for ticker in stream:
# print(ticker.vwap)
quote = normalize(
ticker,
calc_price=calc_price
Expand All @@ -674,6 +800,8 @@ async def stream_quotes(
for tick in iterticks(quote, types=('trade', 'utrade',)):
last = tick['price']

# print(f"{quote['symbol']}: {tick}")

# update last entry
# benchmarked in the 4-5 us range
o, high, low, v = shm.array[-1][
Expand All @@ -687,7 +815,13 @@ async def stream_quotes(
# is also the close/last trade price
o = last

shm.array[['open', 'high', 'low', 'close', 'volume']][-1] = (
shm.array[[
'open',
'high',
'low',
'close',
'volume',
]][-1] = (
o,
max(high, last),
min(low, last),
Expand Down
Loading