# Working with Streaming Data

In [None]:
import numpy as np
import pandas as pd
import holoviews as hv
from holoviews.streams import DataStream, DataFrameStream

from streamz import Stream
from streamz.dataframe import StreamingDataFrame, Random

hv.extension('bokeh')

Streaming Data is data that is continuously generated by different sources. This kind of data is common for financial time series, web server logs, scientific instruments, IoT telemetry, and more. There are many ways of processing streaming data and in this user guide we will discover how to work with streaming data using ``HoloViews`` and the ``streamz`` library. In particular we will cover working with the ``DataStream`` and ``DataFrameStream`` HoloViews streams, first on their own and then using the ``streamz`` library.

HoloViews also has the concept of a ``Stream`` which provides a way to push arbitrary data to a ``DynamicMap`` callback, which then drives your plot. By connecting a HoloViews ``Stream`` to a ``streamz.Stream`` we can use the powerful flow control features provided by the streamz library with the powerful capabilities of HoloViews. For an introduction to ``streamz`` see [this blog post](http://matthewrocklin.com/blog/work/2017/10/16/streaming-dataframes-1) by Matthew Rocklin the author of the library.

## DataStream

In the [Responding to Events](./11-Responding_to_Events.ipynb) user guide we covered how to work with HoloViews streams to push updates to a DynamicMap callback letting us drive a visualization dynamically. The ``DataStream`` Stream works just in the same way but instead of pushing some small amount of metadata it is meant to be used as a pipe to push actual data to a callback.

Let's start with a fairly simple example we will declare a ``streamz.Stream`` and a ``hv.streams.DataStream`` object and then define a pipe we can push data into. In this example we will declare a ``sliding_window`` of 10, which will wait for 10 sets of stream updates to accumulate and then apply pd.concat to combine those updates. Finally we will use the ``sink`` method on the ``Stream`` to ``send`` the data to ``DataStream``.

Now we can declare a ``DynamicMap`` that takes the sliding window of concatenated DataFrames and displays it using a ``Scatter`` Element.

In [None]:
stream = Stream()
stream_data = hv.streams.DataStream(data=[])
stream.sliding_window(10).map(pd.concat).sink(stream_data.send)
hv.DynamicMap(hv.Scatter, streams=[stream_data]).redim.range(x=(-3, 3), y=(-3, 3))

Now that we have set up our pipeline we can start pushing data into it and see our plot update. For that purpose we will use ``stream.emit`` and send small chunks of random pandas DataFrames to our plot:

In [None]:
for i in range(100):
    stream.emit(pd.DataFrame(np.random.randn(100, 2), columns=['x', 'y']))

#### Asynchronous updates

Pushing updates to the Stream manually is usually not desirable instead we want our object to push data asynchronously. Since both Jupyter and Bokeh server run on Tornado we can use the tornado ``IOLoop`` in both cases defining a non-blocking co-routine to emit a new data for our stream. In this case we will use the ``rate_limit`` method to limit how quickly events are emitted and emit simply NumPy arrays, which we will again accumulate in a ``sliding_window`` and concatenate.

In [None]:
%%opts Curve [width=600] {+framewise}
from tornado.ioloop import IOLoop
from tornado import gen

source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
stream_data = DataStream(data=[])
source.rate_limit(0.1).sink(stream_data.send)

@gen.coroutine
def f():
    for x in range(100):
        yield source.emit(np.random.rand(10))
        
IOLoop.current().add_callback(f)
hv.DynamicMap(hv.Curve, streams=[stream_data]).redim.range(y=(0, 1))

## StreamingDataFrame

While ``DataStream`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, the ``streamz.dataframe.StreamingDataFrame`` and the corresponding ``hv.stream.DataFrameStream`` provide a very powerful means of working with streaming pandas dataframes.

#### A simple example

The ``streamz.dataframe`` module provides a ``Random`` utility which generates a StreamingDataFrame which emits data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:

In [None]:
source = Random(freq='10ms', interval='100ms')
print(source.index)
source.example.dtypes

Since the ``StreamingDataFrame`` provides a pandas-like API we can specify operations on the data directly. In this example we subtract a fixed offset and then compute the cumulative sum giving us a randomly drifting timeseries. We can then pass the x-values of this dataframe to the HoloViews ``DataFrameStream`` and supply ``hv.Curve`` as the ``DynamicMap`` callback, i.e. we will stream the data into the HoloViews ``Curve``:

In [None]:
%%opts Curve [width=500 show_grid=True]
sdf = (source-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x)])

The ``Random`` StreamingDataFrame will asynchronously emit events until it is stopped, which we can do by calling the ``stop`` method.

In [None]:
source.stop()

#### Making use of the StreamingDataFrame API

So far we have only computed the cumulative sum, but the ``StreamingDataFrame`` actually has a broad API letting us easily apply streaming computations on our data. In this case we will apply a rolling mean to our x-values with a window of 500ms and overlay it on top of the 'raw' data:

In [None]:
%%opts Curve [width=500 show_grid=True]
source = Random(freq='5ms', interval='100ms')
sdf = (source-0.5).cumsum()
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x)]).relabel('raw') *\
hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x.rolling('500ms').mean())]).relabel('smooth')

In [None]:
source.stop()

#### Controlling the backlog

By default the ``DataFrameStream`` accumulates a ``backlog`` of 1000 samples. In many cases this is overkill and we just need a few samples, by specifying a shorter (or longer) backlog value we can control how much history we accumulate:

In [None]:
source = Random(freq='5ms', interval='100ms')
sdf = (source-0.5).cumsum()
hv.DynamicMap(hv.Table, streams=[DataFrameStream(sdf.x, backlog=10)]) +\
hv.DynamicMap(lambda data: hv.BoxWhisker(data, [], 'x'), streams=[DataFrameStream(sdf.x, backlog=100)])

#### Updating multiple cells

Since a ``StreamingDataFrame`` will emit data until it is stopped we can subscribe multiple plots across different cells to the same stream:

In [None]:
hv.DynamicMap(hv.Scatter, streams=[DataFrameStream(sdf.x)])

In [None]:
source.stop()

#### Applying operations

As we discovered above the ``DataFrameStream`` lets us define a backlog window defining how many samples we want to accumulate. We can use this to our advantage and apply an operation over this backlog window. In this example we simply declare a ``Dataset`` and then apply the ``histogram`` operation to compute a ``Histogram`` over the specified ``backlog`` window:

In [None]:
source = Random(freq='5ms', interval='100ms')
sdf = (source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Dataset, streams=[DataFrameStream(sdf.x, backlog=500)])
hv.operation.histogram(dmap, dimension='x')

In [None]:
source.stop()

#### Datashading

The same approach will also work for the datashader operation letting us datashade the entire ``backlog`` window even if we make it very large:

In [None]:
%%opts RGB [width=600]
import datashader as ds
from holoviews.operation.datashader import datashade

source = Random(freq='1ms', interval='500ms')
sdf = (source-0.5).cumsum()
dmap = hv.DynamicMap(hv.Curve, streams=[DataFrameStream(sdf.x, backlog=50000)])
datashade(dmap, streams=[hv.streams.PlotSize], normalization='linear')

In [None]:
source.stop()