Skip to content

Commit

Permalink
Made suggested fixes to user guide
Browse files Browse the repository at this point in the history
  • Loading branch information
philippjfr committed Oct 30, 2017
1 parent 88455f0 commit e00d4f8
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 52 deletions.
51 changes: 13 additions & 38 deletions examples/user_guide/15-Streaming_Data.ipynb
Expand Up @@ -96,7 +96,7 @@
"source": [
"## Buffer\n",
"\n",
"While ``Pipe`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, ``Buffer`` on the other hand provides a very powerful means of working with streaming tabular data, defined as pandas dataframes, arrays or dictionaries of columns (as well as ``StreamingDataFrame`` we will cover later). ``Buffer`` automatically accumulates the last ``N`` rows of the tabular data, where ``N`` is defined by the ``backlog``.\n",
"While ``Pipe`` provides a general solution for piping arbitrary data to ``DynamicMap`` callback, ``Buffer`` on the other hand provides a very powerful means of working with streaming tabular data, defined as pandas dataframes, arrays or dictionaries of columns (as well as StreamingDataFrame, which we will cover later). ``Buffer`` automatically accumulates the last ``N`` rows of the tabular data, where ``N`` is defined by the ``backlog``.\n",
"\n",
"The ability to accumulate data allows performing operations on a recent history of data, while plotting backends (such as bokeh) can optimize plot updates by sending just the latest patch."
]
Expand Down Expand Up @@ -274,7 +274,7 @@
"source": [
"#### Using StreamingDataFrame and StreamingSeries\n",
"\n",
"The streamz library provides StreamingDataFrame and StreamingSeries as a powerful way to easily work with streaming tabular data, this makes it perfectly suited to work with ``Buffer``. With the ``StreamingDataFrame`` we can easily stream data, apply computations such as cumulative and rolling statistics and then visualize the data with HoloViews.\n",
"The streamz library provides StreamingDataFrame and StreamingSeries as a powerful way to easily work with streaming tabular data. This makes it perfectly suited to work with ``Buffer``. With the ``StreamingDataFrame`` we can easily stream data, apply computations such as cumulative and rolling statistics and then visualize the data with HoloViews.\n",
"\n",
"The ``streamz.dataframe`` module provides a ``Random`` utility that generates a ``StreamingDataFrame`` emitting random data with a certain frequency at a specified interval. The ``example`` attribute lets us see the structure and dtypes of the data we can expect:"
]
Expand Down Expand Up @@ -540,7 +540,7 @@
"from tornado.ioloop import IOLoop\n",
"from tornado import gen\n",
"\n",
"buffer = Buffer(np.zeros((0, 2)))\n",
"buffer = Buffer(np.zeros((0, 2)), backlog=50)\n",
"\n",
"@gen.coroutine\n",
"def f():\n",
Expand All @@ -558,41 +558,16 @@
"source": [
"<img class=\"gif\" src=\"http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz2.gif\"></img>\n",
"\n",
"Here the plot should update 100 times as before, but now via the Tornado IOLoop which will not block other interactions and work in the notebook."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Real examples"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Using the ``Pipe`` and ``Buffer`` streams we can create complex streaming plots very easily. In addition to the toy examples we presented in this guide it is worth looking at looking at some of the examples using real, live, streaming data."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"* The [streaming_psutil](http://holoviews.org/gallery/apps/bokeh/stream_psutil.html) bokeh app is one such example which display CPU and memory information using the ``psutil`` library (install with ``pip install psutil`` or ``conda install psutil``)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img class=\"gif\" src=\"http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz9.gif\"></img>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here the plot should update 100 times as before, but now via the Tornado IOLoop which will not block other interactions and work in the notebook.\n",
"\n",
"## Real examples\n",
"\n",
"Using the ``Pipe`` and ``Buffer`` streams we can create complex streaming plots very easily. In addition to the toy examples we presented in this guide it is worth looking at looking at some of the examples using real, live, streaming data.\n",
"\n",
"* The [streaming_psutil](http://holoviews.org/gallery/apps/bokeh/stream_psutil.html) bokeh app is one such example which display CPU and memory information using the ``psutil`` library (install with ``pip install psutil`` or ``conda install psutil``)\n",
"\n",
"<img class=\"gif\" src=\"http://assets.holoviews.org/gifs/guides/user_guide/Streaming_Data/streamz9.gif\"></img>\n",
"\n",
"As you can see, streaming data works like streams in HoloViews in general, flexibly handling changes over time under either explicit control or governed by some external data source."
]
}
Expand Down
28 changes: 14 additions & 14 deletions holoviews/streams.py
Expand Up @@ -394,12 +394,12 @@ def hashkey(self):

class Buffer(Pipe):
"""
Buffer provides a means stream and accumulate tabular datasets.
The data may be in the form of a pandas DataFrame, 2D arrays of
rows and columns or dictionaries of column arrays. Buffer will
accumulate the last N rows, where N is defined by the specified
``backlog``. The accumulated data is then made available via the
``data`` parameter.
Buffer allows streaming and accumulating incoming chunks of rows
from tabular datasets. The data may be in the form of a pandas
DataFrame, 2D arrays of rows and columns or dictionaries of column
arrays. Buffer will accumulate the last N rows, where N is defined
by the specified ``backlog``. The accumulated data is then made
available via the ``data`` parameter.
A Buffer may also be instantiated with a streamz.StreamingDataFrame
or a streamz.StreamingSeries, it will automatically subscribe to
Expand All @@ -416,7 +416,7 @@ def __init__(self, columns, backlog=1000, index=True, **params):
if columns.ndim != 2:
raise ValueError("Only 2D array data may be streamed by Buffer.")
example = columns
elif isinstance(column, dict):
elif isinstance(columns, dict):
if not all(isinstance(v, np.ndarray) for v in columns.values()):
raise ValueError("Columns in dictionary must be of array types.")
elif len(set(len(v) for v in columns.values())) > 1:
Expand Down Expand Up @@ -458,7 +458,7 @@ def verify(self, x):
elif x.shape[1] != self.data.shape[1]:
raise ValueError("Streamed array data expeced to have %d columns "
"got %d" % (self.data.shape[1], x.shape[1]))
elif pd and isinstance(x, pd.DataFrame) and list(x.columns) != list(self.data.columns):
elif util.pd and isinstance(x, util.pd.DataFrame) and list(x.columns) != list(self.data.columns):
raise IndexError("Input expected to have columns %s, got %s" %
(self.data.columns, x.columns))
elif isinstance(x, dict):
Expand All @@ -474,7 +474,7 @@ def clear(self):
"Clears the data in the stream"
if isinstance(self.data, np.ndarray):
data = self.data[:, :0]
elif pd and isinstance(self.data, pd.DataFrame):
elif util.pd and isinstance(self.data, util.pd.DataFrame):
data = self.data.iloc[:0]
elif isinstance(self.data, dict):
data = {k: v[:0] for k, v in self.data.items()}
Expand All @@ -495,12 +495,10 @@ def _concat(self, data):
data = np.concatenate([prev_chunk, data])
elif data_length > self.backlog:
data = data[-self.backlog:]
elif pd and isinstance(data, util.pd.DataFrame):
elif util.pd and isinstance(data, util.pd.DataFrame):
data_length = len(data)
if self._index:
data = data.reset_index()
if data_length < self.backlog:
prev_chunk = self.data.iloc[-(self.backlog-data_length)]
prev_chunk = self.data.iloc[-(self.backlog-data_length):]
data = util.pd.concat([prev_chunk, data])
elif data_length > self.backlog:
data = data.iloc[-self.backlog:]
Expand All @@ -509,7 +507,7 @@ def _concat(self, data):
new_data = {}
for k, v in data.items():
if data_length < self.backlog:
prev_chunk = self.data[k][-(self.backlog-data_length)]
prev_chunk = self.data[k][-(self.backlog-data_length):]
new_data[k] = np.concatenate([prev_chunk, v])
elif data_length > self.backlog:
new_data[k] = v[-self.backlog:]
Expand All @@ -524,6 +522,8 @@ def update(self, **kwargs):
"""
data = kwargs.get('data')
if data is not None:
if util.pd and isinstance(data, util.pd.DataFrame) and self._index:
data = data.reset_index()
self.verify(data)
kwargs['data'] = self._concat(data)
self._count += 1
Expand Down

0 comments on commit e00d4f8

Please sign in to comment.