In [None]:
import asyncio
from ipykernel.eventloops import register_integration

@register_integration('asyncio')
def loop_asyncio(kernel):
    '''Start a kernel with asyncio event loop support.'''
    loop = asyncio.get_event_loop()

    def kernel_handler():
        loop.call_soon(kernel.do_one_iteration)
        loop.call_later(kernel._poll_interval, kernel_handler)

    loop.call_soon(kernel_handler)
    try:
        if not loop.is_running():
            loop.run_forever()
    finally:
        loop.run_until_complete(loop.shutdown_asyncgens())
        loop.close()

In [None]:
%gui asyncio

In [None]:
import attr
import pandas as pd
from numismatic.feeds import BitfinexFeed
from numismatic.events import PriceUpdate, Trade

In [None]:
bfx = BitfinexFeed()

In [None]:
subs = bfx.subscribe(['BTC'], ['USD'], channels='trades')

In [None]:
subs

In [None]:
sub = subs['Bitfinex~BTC~USD~TRADES']

In [None]:
trade_stream = sub.event_stream.filter(lambda ev: isinstance(ev, Trade))
json_stream = trade_stream.map(lambda ev: ev.json())
df_stream = trade_stream.map(lambda trade: pd.DataFrame(attr.asdict(trade), index=[pd.Timestamp.now()]))

In [None]:
# sub.event_stream.sink(print)
# json_stream.sink(print)
# df_stream.sink(print)

In [None]:
trade_df = pd.DataFrame({attribute.name:[] for attribute in attr.fields(Trade)}, index=[])

In [None]:
from streamz.dataframe import StreamingDataFrame

In [None]:
sdf = StreamingDataFrame(df_stream, example=trade_df)

In [None]:
price = StreamingDataFrame({'price': sdf.price, 'ma(5)':sdf.price.rolling(5).mean()})

In [None]:
price.plot()

In [None]:
# price.stream.sink(print)