# 在Notebook中使用Bokeh Server

Bokeh提供了服务器模式，可以使用服务器中的Python函数响应浏览器Javascript产生的事件，并在事件响应函数中修改`Document`刷新浏览器的显示。通常为了使用Bokeh服务器，需要编写一个`py`文件，例如`demo.py`，然后调用`bokeh serve demo.py`运行服务器，最后在浏览器中打开`http://localhost:5006`连接服务器。

In [1]:
from bokeh.io import show, output_notebook
from bokeh.models import Slider, TextInput
from bokeh.layouts import row
output_notebook()

## 运行服务器

Notebook中调用`show()`时如果传递的是一个函数对象，Bokeh会为该函数创建服务器，并在Notebook中连接该服务器。

传递给`show()`的函数的参数为`Document`对象，在该函数之内创建控件和图表，并将包含它们的布局对象传递给`Document.add_root()`。下面的例子中创建一个`TextInput`和一个`Slider`控件，在`Slider`控件的`value`修改事件中设置`TextInput`控件的`value`属性。

当用户在界面中调节滑动条控件的值时，浏览器会向服务器发送值修改事件，在该事件的响应函数`callback()`中修改`text.value`的值，引起Python中的`Document`对象的改变，服务器会将改变发送给浏览器，更新浏览器中的`Document`对象。

In [3]:
def app(doc):
    text = TextInput()
    slider = Slider(title="value", start=0, end=100, value=0, step=1)
    def callback(attr, old, new):
        text.value = str(new)
    slider.on_change("value", callback)
    doc.add_root(row(slider, text))

每运行一次`show()`，都将创建一个新的服务器。每连接一次服务器将创建一个新的`Session`对象。然后创建一个新的`Document`对象，并调用`app()`更新它。每个`Session`中保存有自己的`Document`对象。

In [4]:
show(app)

下面查看当前的所有服务器：

In [8]:
from bokeh.io.state import curstate
curstate().uuid_to_server

{'ed68a542b64748688ee9ff58251d5253': <bokeh.server.server.Server at 0x1f205d84c18>}

下面查看第一个服务器中的所有`Session`对象：

In [14]:
server = list(curstate().uuid_to_server.values())[0]
server.get_sessions()

[<bokeh.server.session.ServerSession at 0x1f205f43cc0>]

下面显示`Document`对象中的所有模型，其中`Row`、`Slider`和`TextInput`是`app()`中产生的，而`WidgetBox`是为了包装两个控件自动创建的。

In [17]:
session = server.get_sessions()[0]
session.document._all_models

{'2fbaa6ff-f7b4-40e6-95df-ff8a741d2bf8': Row(id='2fbaa6ff-f7b4-40e6-95df-ff8a741d2bf8', ...),
 '5cda7547-40d8-4eaa-a237-4679a8b773d5': Slider(id='5cda7547-40d8-4eaa-a237-4679a8b773d5', ...),
 '6053213c-8295-4a6b-b7f8-ec836340e67d': TextInput(id='6053213c-8295-4a6b-b7f8-ec836340e67d', ...),
 '60af5b4e-a88e-43c6-9ff6-4180ffe83ccb': WidgetBox(id='60af5b4e-a88e-43c6-9ff6-4180ffe83ccb', ...),
 'abe50140-f542-424a-b25a-ad909bd7f9e2': WidgetBox(id='abe50140-f542-424a-b25a-ad909bd7f9e2', ...)}

为了让浏览器能直接连接服务器，需要配置Web Socket的来源。下面将`localhost:server.port`添加进`_websocket_origins`中。并显示服务器的访问地址：

In [19]:
from bokeh.io.state import curstate
server = list(curstate().uuid_to_server.values())[0]
server_link = "localhost:{}".format(server.port)
server._tornado._websocket_origins.add(server_link)
print(server_link)

localhost:52506




在浏览器中打开上面的地址，即可看到一组新的滚动条控件和文本输入框，它们和本Notebook中显示的控件属于不同的`Document`对象，因此是相互独立的。

再次查看`Session`对象中的`Document`对象:

In [22]:
[session.document for session in server.get_sessions()]

[<bokeh.document.document.Document at 0x1f205f43be0>,
 <bokeh.document.document.Document at 0x1f206108390>]

## 定期更新

### 使用周期回调函数实现定期更新

使用`Document.add_periodic_callback(callback, period_milliseconds)`添加定期回调函数。在该回调函数中修改`nonlocal`变量`count`的值，并在文本框中显示其值。

In [24]:
def period_update_app(doc):
    text = TextInput()
    count = 0
    def callback():
        nonlocal count
        text.value = str(count)
        count += 1
    doc.add_periodic_callback(callback, 100)
    doc.add_root(text)
    
show(period_update_app)

上面的例子使用回调函数实现定期更新，在回调函数中更新的变量需要`nonlocal`声明。还可以使用Python的`async`函数，在该函数的内部使用循环更新界面。

### 使用async函数实现定期更新

下面的例子中，使用`doc.add_next_tick_callback()`将`async`函数`task`添加进需要立即处理的队列中。`task`函数需要`@without_document_lock`修饰，否则服务器会等待该函数的运行结束。由于`task`运行时没有锁定文档，因此无法在其中直接更新文档，而需要通过`add_next_tick_callback()`将更新界面的函数添加进处理队列。

在`task`函数中使用`await gen.sleep(0.1)`使其等待0.1秒。在其等待的过程中Bokeh服务器仍然可以处理其它事务。

该例虽然比使用周期回掉函数的方法麻烦，但是所有的处理逻辑都在一个`async`函数内部，而界面处理则在另一个函数中。起到了逻辑分离的作用。而我们可以用`await`等待其它的事件，例如等待从网络获取新的数据。

In [29]:
from bokeh.document import without_document_lock
import itertools
from functools import partial
from tornado import gen

def period_coroutine_update_app(doc):
    text = TextInput()
    
    def update(count):
        text.value = str(count)
    
    @without_document_lock
    async def task():
        for i in itertools.count():
            doc.add_next_tick_callback(partial(update, i))
            await gen.sleep(0.1)
            
    doc.add_next_tick_callback(task)
    doc.add_root(text)
    
show(period_coroutine_update_app)

## 用zmq与其它进程通信

Bokeh中所有的服务器运行于同一个线程之中，如果有比较费时的运算，会导致所有的协程停止运行。因此通常这些计算都放在别的进程中运行，服务器和这些进程之间使用`zmq`进行通信。

下面的`ZmqSubProcessClient`类由Bokeh的应用程序创建。调用其`start_subprocess()`将创建一个新的进程运行指定的文件`pyfile`。`message_callback`是当从子进程接收到消息时的回调函数。而`send()`则可以用于向子进程发送消息。

In [9]:
%%file zmq_subprocess.py

import sys
import subprocess
import zmq
import zmq.asyncio
from bokeh.document import without_document_lock
from bokeh.application.handlers import Handler


def on_session_destroyed(self, session_context):
    if hasattr(session_context, "on_destroyed"):
        return session_context.on_destroyed(session_context)

Handler.on_session_destroyed = on_session_destroyed


class ZmqSubProcessClient:
    def __init__(self, doc, port=0):
        ctx = zmq.asyncio.Context.instance()
        self.socket = zmq.asyncio.Socket(ctx, zmq.PAIR) #ctx.socket(zmq.PAIR)
        if port == 0:
            port = self.socket.bind_to_random_port("tcp://127.0.0.1")
        else:
            addr = "tcp://127.0.0.1:{}".format(port)
            self.socket.bind(addr)
        self.port = port
        self.doc = doc
    
    def start_subprocess(self, pyfile, args, message_callback):
        self.process = subprocess.Popen(["python", pyfile] + [str(self.port)] + list(args))
        self.message_callback = message_callback
        self.doc.add_next_tick_callback(self.message_loop)
        self.doc.session_context.on_destroyed = self.destroy

    def destroy(self, session_context):
        self.process.kill()

    @without_document_lock
    async def message_loop(self):
        while True:
            message = await self.socket.recv_pyobj()
            self.message_callback(message)

    def send(self, message):
        @without_document_lock
        async def _send_message():
            await self.socket.send_pyobj(message)
        self.doc.add_next_tick_callback(_send_message)


class ZmqSubProcess:
    def __init__(self, port=None):
        if port is None:
            port = int(sys.argv[1])
        ctx = zmq.Context.instance()
        self.socket = ctx.socket(zmq.PAIR)
        self.socket.connect("tcp://127.0.0.1:{}".format(port))

    def send(self, obj):
        self.socket.send_pyobj(obj)

    def poll(self):
        return self.socket.poll(timeout=0)

    def recv(self):
        return self.socket.recv_pyobj()

Overwriting zmq_subprocess.py


下面的`zmq_subprocess_app()`创建一个按钮和一个`Div`节点。当接收到消息时在`Div`中显示消息的内容，当按钮按下时发送`reset`消息。

In [16]:
def zmq_subprocess_app(doc):
    global subproc
    from os import path
    from bokeh.models import Button, Div
    from bokeh.layouts import row
    from bokeh.document import without_document_lock
    from zmq_subprocess import ZmqSubProcessClient

    ok_button = Button(label="reset", width=80)
    div = Div()

    def ok_button_clicked():
        subproc.send("reset")

    ok_button.on_click(ok_button_clicked)

    def process_message(message, doc=doc):
        def show():
            div.text = str(message)
        doc.add_next_tick_callback(show)


    subproc = ZmqSubProcessClient(doc)
    subproc.start_subprocess("calc_process.py", (), process_message)

    doc.add_root(row(ok_button, div))

下面是子进程的程序，它持续发送计数器的值，当接收到`reset`消息时重置计数器。

In [17]:
%%file calc_process.py
import sys
import time
import zmq
from zmq_subprocess import ZmqSubProcess

zsp = ZmqSubProcess()
count = 0

while True:
    if zsp.poll() != 0:
        message = zsp.recv()
        if message == "reset":
            count = 0
    zsp.send(count)
    count += 1
    time.sleep(1.0)

Overwriting calc_process.py


启动服务器程序，将看到一个一秒钟更新一次的计数器，按reset按钮将计数器清零。

In [18]:
show(zmq_subprocess_app)