Skip to content

Commit

Permalink
Adjust and add notes for python-trio/trio#2258
Browse files Browse the repository at this point in the history
  • Loading branch information
goodboy committed Feb 28, 2022
1 parent 89a98c4 commit b1cce8f
Showing 1 changed file with 45 additions and 28 deletions.
73 changes: 45 additions & 28 deletions piker/data/feed.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for piker0)
# Copyright (C) Tyler Goodlet (in stewardship for pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand All @@ -25,7 +25,7 @@
from functools import partial
from types import ModuleType
from typing import (
Any, Sequence,
Any,
AsyncIterator, Optional,
Awaitable,
)
Expand Down Expand Up @@ -56,7 +56,9 @@
)
from ..ui import _search
from ._sampling import (
_shms,
# TODO: should probably group these in a compound type at this point XD
_ohlcv_shms,
_subscribers,
_incrementers,
increment_ohlc_buffer,
iter_ohlc_periods,
Expand Down Expand Up @@ -108,6 +110,7 @@ async def start_task(
self,
target: Awaitable,
*args,

) -> None:

async def start_with_cs(
Expand Down Expand Up @@ -159,30 +162,31 @@ def get_feed_bus(
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
brokername: str
brokername: str,

) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
'''
try:
async with trio.open_nursery() as service_nursery:
get_console_log(tractor.current_actor().loglevel)

# assign a nursery to the feeds bus for spawning
# background tasks from clients
bus = get_feed_bus(brokername, service_nursery)
global _bus
assert not _bus

# unblock caller
await ctx.started()
async with trio.open_nursery() as service_nursery:
# assign a nursery to the feeds bus for spawning
# background tasks from clients
get_feed_bus(brokername, service_nursery)

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
finally:
# TODO: this needs to be shielded?
bus.nursery.cancel_scope.cancel()
# unblock caller
await ctx.started()

# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()


async def manage_history(
Expand All @@ -194,6 +198,8 @@ async def manage_history(
some_data_ready: trio.Event,
feed_is_live: trio.Event,

task_status: TaskStatus = trio.TASK_STATUS_IGNORED,

) -> None:
'''
Load and manage historical data including the loading of any
Expand All @@ -202,13 +208,15 @@ async def manage_history(
buffer.
'''
# TODO:
# history retreival, see if we can pull from an existing
# TODO: history retreival, see if we can pull from an existing
# ``marketstored`` daemon
# log.info('Scanning for existing `marketstored`')
# from .marketstore import load_history
# arrays = await load_history(symbol)
log.info('Scanning for existing `marketstored`')
fqsn = mk_fqsn(mod.name, symbol)
# from .marketstore import manage_history
# arrays = await manage_history(symbol)

arrays = {}
task_status.started()

opened = we_opened_shm
# TODO: history validation
Expand All @@ -218,6 +226,8 @@ async def manage_history(

if opened:
if arrays:
await tractor.breakpoint()

# push to shm
# set data ready
# some_data_ready.set()
Expand Down Expand Up @@ -245,7 +255,7 @@ async def manage_history(
await feed_is_live.wait()

if opened:
_shms.setdefault(delay_s, []).append(shm)
_ohlcv_shms.setdefault(delay_s, []).append(shm)

# start shm incrementing for OHLC sampling at the current
# detected sampling period if one dne.
Expand Down Expand Up @@ -309,7 +319,13 @@ async def allocate_persistent_feed(
# - a history loader / maintainer
# - a real-time streamer which consumers and sends new data to any
# consumers as well as writes to storage backends (as configured).
bus.nursery.start_soon(

# XXX: neither of these will raise but will cause an inf hang due to:
# https://github.com/python-trio/trio/issues/2258
# bus.nursery.start_soon(
# await bus.start_task(

await bus.nursery.start(
manage_history,
mod,
shm,
Expand Down Expand Up @@ -345,7 +361,9 @@ async def allocate_persistent_feed(

# yield back control to starting nursery once we receive either
# some history or a real-time quote.
log.info(f'waiting on history to load: {fqsn}')
await some_data_ready.wait()

bus.feeds[symbol.lower()] = (init_msg, first_quotes)
task_status.started((init_msg, first_quotes))

Expand Down Expand Up @@ -518,8 +536,8 @@ async def open_sample_step_stream(
portal.open_stream_from,
iter_ohlc_periods,
),

kwargs={'delay_s': delay_s},

) as (cache_hit, istream):
if cache_hit:
# add a new broadcast subscription for the quote stream
Expand Down Expand Up @@ -623,9 +641,8 @@ async def search(text: str) -> dict[str, Any]:

@asynccontextmanager
async def open_feed(

brokername: str,
symbols: Sequence[str],
symbols: list[str],
loglevel: Optional[str] = None,

backpressure: bool = True,
Expand Down Expand Up @@ -725,7 +742,7 @@ async def open_feed(
async def maybe_open_feed(

brokername: str,
symbols: Sequence[str],
symbols: list[str],
loglevel: Optional[str] = None,

**kwargs,
Expand Down

0 comments on commit b1cce8f

Please sign in to comment.