# Streamz 数据流组件

`PnStreamz` 组件可以渲染 [Streamz](https://streamz.readthedocs.io/en/latest/) Stream 对象发出的任意对象，与专门处理 streamz DataFrame 和 Series 对象并公开各种格式化选项的 `DataFrame` 组件不同。

底层实现为`panel.pane.Streamz`，参数基本一致，参考文档：https://panel.holoviz.org/reference/panes/Streamz.html


In [1]:
##ignore
%load_ext vuepy
from panel_vuepy import vpanel


## 基本用法

> **注意**：如果您尚未使用 Streamz 库，我们建议使用 Param 和 Panel 生态系统中的功能，例如[反应式表达式](https://param.holoviz.org/user_guide/Reactive_Expressions.html)、[生成器函数](https://param.holoviz.org/user_guide/Generators.html)和/或*周期性回调*。我们发现这些功能得到更加可靠的支持。

`PnStreamz` 组件使用默认的 Panel 解析方式来确定如何渲染 Stream 返回的对象。默认情况下，该组件只有在显示时才会监视 `Stream`，我们可以通过设置 `always_watch=True` 让它在创建后立即开始监视流：


In [2]:
%%vuepy_run --plugins vpanel --show-code --backend='panel'
<template>
  <PnStreamz :object="stream_map.value" :always_watch="True"/>
</template>
<script lang='py'>
from vuepy import ref, onMounted
from streamz import Stream

def increment(x):
    return x + 1

source = Stream()
stream_map = ref(source.map(increment))

# 注意：为了确保流的静态渲染显示内容
# 我们设置 always_watch=True 并在显示前发出一个事件
@onMounted
def emit():
    source.emit(1)
</script>

{"vue": "<!-- --plugins vpanel --show-code --backend='panel' -->\n<template>\n  <PnStreamz :object=\"stream_map.value\" :always_watch=\"True\"/>\n</template>\n<script lang='py'>\nfrom vuepy import ref, onMounted\nfrom streamz import Stream\n\ndef increment(x):\n    return x + 1\n\nsource = Stream()\nstream_map = ref(source.map(increment))\n\n# \u6ce8\u610f\uff1a\u4e3a\u4e86\u786e\u4fdd\u6d41\u7684\u9759\u6001\u6e32\u67d3\u663e\u793a\u5185\u5bb9\n# \u6211\u4eec\u8bbe\u7f6e always_watch=True \u5e76\u5728\u663e\u793a\u524d\u53d1\u51fa\u4e00\u4e2a\u4e8b\u4ef6\n@onMounted\ndef emit():\n    source.emit(1)\n</script>\n", "setup": ""}



现在我们可以定义一个周期性回调，它在 `Stream` 上发出递增的计数：


In [3]:
%%vuepy_run --plugins vpanel --show-code
<template>
  <PnStreamz :object="stream_map" :always_watch="True" ref="streamz_pane" />
  <PnButton @click="start_emit()">开始发送数据</PnButton>
  <PnButton @click="stop_emit()">停止发送数据</PnButton>
</template>
<script lang='py'>
from vuepy import ref, onMounted
from streamz import Stream
import panel as pn

def increment(x):
    return x + 1

source = Stream()
stream_map = source.map(increment)
streamz_pane = ref(None)

# 为了确保流的静态渲染显示内容
@onMounted
def emit():
    source.emit(1)

count = 1
callback = None

def emit_count():
    nonlocal count
    count += 1
    source.emit(count)

def start_emit():
    nonlocal callback
    if callback is None or not callback.running:
        callback = pn.state.add_periodic_callback(emit_count, period=100)

def stop_emit():
    nonlocal callback
    if callback and callback.running:
        callback.stop()
</script>

{"vue": "<!-- --plugins vpanel --show-code -->\n<template>\n  <PnStreamz :object=\"stream_map\" :always_watch=\"True\" ref=\"streamz_pane\" />\n  <PnButton @click=\"start_emit()\">\u5f00\u59cb\u53d1\u9001\u6570\u636e</PnButton>\n  <PnButton @click=\"stop_emit()\">\u505c\u6b62\u53d1\u9001\u6570\u636e</PnButton>\n</template>\n<script lang='py'>\nfrom vuepy import ref, onMounted\nfrom streamz import Stream\nimport panel as pn\n\ndef increment(x):\n    return x + 1\n\nsource = Stream()\nstream_map = source.map(increment)\nstreamz_pane = ref(None)\n\n# \u4e3a\u4e86\u786e\u4fdd\u6d41\u7684\u9759\u6001\u6e32\u67d3\u663e\u793a\u5185\u5bb9\n@onMounted\ndef emit():\n    source.emit(1)\n\ncount = 1\ncallback = None\n\ndef emit_count():\n    nonlocal count\n    count += 1\n    source.emit(count)\n\ndef start_emit():\n    nonlocal callback\n    if callback is None or not callback.running:\n        callback = pn.state.add_periodic_callback(emit_count, period=100)\n\ndef stop_emit():\n    nonlocal ca

VBox(children=(VBox(children=(VBox(children=(VBox(children=(BokehModel(combine_events=True, render_bundle={'do…


## 复杂数据流

`PnStreamz` 组件可以用于流式传输任何类型的数据。例如，我们可以创建一个 streamz DataFrame，将数据累积到滑动窗口中，然后将其映射到 Altair `line_plot` 函数：


In [4]:
%%vuepy_run --plugins vpanel --show-code
<template>
  <PnStreamz 
    :object="altair_stream" 
    :height="350" 
    :sizing_mode="'stretch_width'" 
    :always_watch="True" 
    ref="altair_pane" />
  <PnButton @click="start_emit()">开始发送数据</PnButton>
  <PnButton @click="stop_emit()">停止发送数据</PnButton>
</template>
<script lang='py'>
from vuepy import ref
import numpy as np
import altair as alt
import pandas as pd
from datetime import datetime
from streamz.dataframe import DataFrame as sDataFrame
import panel as pn

altair_pane = ref(None)

# 创建一个 streamz DataFrame
df = sDataFrame(example=pd.DataFrame({'y': []}, index=pd.DatetimeIndex([])))

def line_plot(data):
    return alt.Chart(pd.concat(data).reset_index()).mark_line().encode(
        x='index',
        y='y',
    ).properties(width="container")

# 创建累积数据的流，使用滑动窗口，并映射到图表函数
altair_stream = df.cumsum().stream.sliding_window(50).map(line_plot)

# 初始数据
for i in range(20):
    df.emit(pd.DataFrame({'y': [np.random.randn()]}, index=pd.DatetimeIndex([datetime.now()])))

callback = None

def emit():
    df.emit(pd.DataFrame({'y': [np.random.randn()]}, index=pd.DatetimeIndex([datetime.now()])))

def start_emit():
    nonlocal callback
    if callback is None or not callback.running:
        callback = pn.state.add_periodic_callback(emit, period=500)

def stop_emit():
    nonlocal callback
    if callback and callback.running:
        callback.stop()
</script>

{"vue": "<!-- --plugins vpanel --show-code -->\n<template>\n  <PnStreamz \n    :object=\"altair_stream\" \n    :height=\"350\" \n    :sizing_mode=\"'stretch_width'\" \n    :always_watch=\"True\" \n    ref=\"altair_pane\" />\n  <PnButton @click=\"start_emit()\">\u5f00\u59cb\u53d1\u9001\u6570\u636e</PnButton>\n  <PnButton @click=\"stop_emit()\">\u505c\u6b62\u53d1\u9001\u6570\u636e</PnButton>\n</template>\n<script lang='py'>\nfrom vuepy import ref\nimport numpy as np\nimport altair as alt\nimport pandas as pd\nfrom datetime import datetime\nfrom streamz.dataframe import DataFrame as sDataFrame\nimport panel as pn\n\naltair_pane = ref(None)\n\n# \u521b\u5efa\u4e00\u4e2a streamz DataFrame\ndf = sDataFrame(example=pd.DataFrame({'y': []}, index=pd.DatetimeIndex([])))\n\ndef line_plot(data):\n    return alt.Chart(pd.concat(data).reset_index()).mark_line().encode(\n        x='index',\n        y='y',\n    ).properties(width=\"container\")\n\n# \u521b\u5efa\u7d2f\u79ef\u6570\u636e\u7684\u6d41\uff

VBox(children=(VBox(children=(VBox(children=(VBox(children=(BokehModel(combine_events=True, render_bundle={'do…


## API

### 属性

| 属性名            | 说明                          | 类型                                                           | 默认值 |
| ---------------- | ----------------------------- | ---------------------------------------------------------------| ------- |
| object           | 被监视的 streamz.Stream 对象    | ^[streamz.Stream]                                              | None |
| always_watch     | 是否在未显示时也监视流         | ^[bool]                                                        | False |
| rate_limit       | 事件之间的最小间隔（秒）       | ^[float]                                                       | 0.1 |
| sizing_mode      | 尺寸调整模式                  | ^[str]                                                         | 'fixed'  |
| width            | 宽度                          | ^[int, str]                                                    | None    |
| height           | 高度                          | ^[int, str]                                                    | None    |
| min_width        | 最小宽度                      | ^[int]                                                         | None    |
| min_height       | 最小高度                      | ^[int]                                                         | None    |
| max_width        | 最大宽度                      | ^[int]                                                         | None    |
| max_height       | 最大高度                      | ^[int]                                                         | None    |
| margin           | 外边距                        | ^[int, tuple]                                                  | 5       |
| css_classes      | CSS类名列表                   | ^[list]                                                        | []      |

### Slots

| 插槽名   | 说明               |
| ---     | ---               |
| default | 自定义默认内容      |


In [5]:
##controls
import panel as pn
import numpy as np
import pandas as pd
from datetime import datetime
from streamz import Stream

pn.extension('vega')

def increment(x):
    return x + 1

source = Stream()

streamz_pane = pn.pane.Streamz(source.map(increment), always_watch=True)

# 注意：为了确保流的静态渲染显示内容
# 我们设置 always_watch=True 并在显示前发出一个事件
source.emit(1)

pn.Row(streamz_pane.controls(), streamz_pane)