# bokeh in the notebook: data streaming / live plots update 

## About this notebook

This notebook belongs to a series of small projects which aim is to evaluate the [Jupyter](http://jupyter.org/) ecosystem for science experiments control. The main idea is to use the _Juypter notebook_ as a convergence platform in order to offer a fully featured environment to scientists. 

## About bokeh

Experiments control requires both static and dynamic (i.e live) data visualization. Since Jupyter doesn't provide any 'official' data visualization solution, we need to select one. Among the available solutions, [bokeh](http://bokeh.pydata.org/en/latest) presents the highest potential for our application.

Bokeh as been selected for its:
1. [built-in notebook integration](http://bokeh.pydata.org/en/latest/docs/user_guide/notebook.html)
2. built-in [data streaming](http://bokeh.pydata.org/en/latest/docs/reference/models/sources.html#bokeh.models.sources.ColumnDataSource.patch) [features](http://bokeh.pydata.org/en/latest/docs/reference/models/sources.html#bokeh.models.sources.ColumnDataSource.stream) for live plots update 
3. ability to add [custom or specialized behaviors](http://bokeh.pydata.org/en/latest/docs/user_guide/interaction/callbacks.html) in response to property changes and other events
4. [graphics quality](http://bokeh.pydata.org/en/latest/docs/gallery.html#gallery)

Have a look to this [quickstart](http://bokeh.pydata.org/en/latest/docs/user_guide/quickstart.html) for a bokeh overview.

## Topic of the day

Asynchronous (i.e. non blocking) plots update. We arre going to use our tiny Datastream pattern whose model is the following:
    
<img src="./tango_meeting_florence_0617/resources/images/DataStreamUML.png" width="650" height="500">

The idea is to implement a DataSource (see XDS, YDS, ...) and attach it to a Channel, which is itself attached to a DataStream, which is itself attached to a DataStreamer, which is itself attached to an optional DataStreamerController. Note that the ScalarChannel and SprectrumChannel are multi-DataSource channels whereas the ImageChannel is a single DataSource one.

In [None]:
import threading
import datetime
import time
import math
import random
import logging
import copy
from collections import deque
import numpy as np
from bokeh.models.ranges import Range1d
from bokeh.palettes import Plasma256, Viridis256, Inferno256, Greys256
from bokeh.layouts import column
from common.plots import *

In [None]:
logging.getLogger('tornado').setLevel(logging.CRITICAL)
logging.getLogger('bokeh.server').setLevel(logging.CRITICAL)
logging.getLogger('fs.client.jupyter').setLevel(logging.DEBUG)

In [None]:
# select the context in which we are running: NOTEBOOK or LAB
import os
os.environ["JUPYTER_CONTEXT"] = "LAB"

In [None]:
#--------------------------------------------------------------------------
class XDS(DataSource):
    
    def __init__(self, name, num_points=128):
        DataSource.__init__(self, name)
        self._l = num_points
    
    def pull_data(self): 
        cd = ChannelData(self.name)
        start = random.uniform(-math.pi/2, math.pi/2)
        end = 2 * math.pi + start
        cd.buffer = np.linspace(start, end, self._l)
        return cd
        
    def cleanup(self):
        super(XDS, self).cleanup()
        
#--------------------------------------------------------------------------
class YDS(DataSource):
    
    def __init__(self, name, channels=None, num_points=128):
        DataSource.__init__(self, name)
        self._l = num_points
    
    def pull_data(self): 
        cd = ChannelData(self.name)
        p = random.uniform(-math.pi/2, math.pi/2)
        start = 0 + p
        end = 2 * (math.pi + p)
        x = np.linspace(start, end, self._l)
        d = random.uniform(1.0, 4.0) * np.sin(x)
        cd.buffer = random.uniform(1.0, 4.0) * np.sin(x)
        return cd

    def cleanup(self):
        super(YDS, self).cleanup() 
        
#--------------------------------------------------------------------------
class SCTest(SpectrumChannel):
    
    def __init__(self, name, data_sources=None, model_properties=None):
        SpectrumChannel.__init__(self, name, data_sources, model_properties)
        self._lock = threading.Lock()
        self._selection_range = self.__initial_selection_range()
        
    def __initial_selection_range(self):
        return {'x0':0, 'x1':0, 'y0':0, 'y1':0, 'width':0, 'height':0}
    
    def on_box_selection_change(self, selection_range):
        with self._lock:
            self._selection_range = selection_range
        
    def on_box_selection_reset(self):
        with self._lock:
            self._selection_range = self.__initial_selection_range()
            
    def cleanup(self):
        super(SCTest, self).cleanup()

In [None]:
#--------------------------------------------------------------------------
class XYDS(DataSource):
    
    def __init__(self, name):
        DataSource.__init__(self, name)
        self._inc = 1
        self._current_index = 0
        self._iw, self._ih = 1000, 1000
        x, y = np.linspace(0, 10, self._iw), np.linspace(0, 10, self._ih)
        xx, yy = np.meshgrid(x, y)
        self._full_image = np.sin(xx) * np.cos(yy)

    def pull_data(self):
        cd = ChannelData(self.name)    
        i = self._current_index
        cd.buffer = self._full_image[0:i+1, 0:i+1]
        self._current_index += self._inc
        if self._current_index > self._ih:
            self._current_index = self._ih
            self._inc *= -1 
        elif self._current_index < 0:
            self._current_index = 0
            self._inc *= -1
        return cd

    def cleanup(self):
        super(XYDS, self).cleanup()
    
#--------------------------------------------------------------------------
class ICTest(ImageChannel):
    
    def __init__(self, name, data_source=None, model_properties=None):
        ImageChannel.__init__(self, name, data_source=data_source, model_properties=model_properties)
        self._iw, self._ih = 1000, 1000
        self._selection_range = self.__initial_selection_range()
        self._bsm = None
        self._lock = threading.Lock()

    def __initial_selection_range(self):
        return {'x0':0, 'x1':self._iw, 'y0':0, 'y1':self._ih, 'width':self._iw, 'height':self._ih}
    
    def setup_model(self, **kwargs):
        if 'selection_manager' not in kwargs:
            scb = self.on_box_selection_change
            rcb = self.on_box_selection_reset
            self._bsm = BoxSelectionManager(selection_callback=scb, reset_callback=rcb)
            kwargs['selection_manager'] = self._bsm
        return super(ICTest, self).setup_model(**kwargs)
    
    def on_box_selection_change(self, selection_range):
        with self._lock:
            self._selection_range = selection_range
        
    def on_box_selection_reset(self):
        with self._lock:
            self._selection_range = self.__initial_selection_range()
        
    def cleanup(self):
        super(ICTest, self).cleanup()

In [None]:
def on_box_selection_change(selection_range):
    print("on_box_selection_change: {}".format(selection_range))

def on_box_selection_reset():
    print("on_box_selection_reset")

In [None]:
def layout_model_properties():
    lmp = dict()
    lmp['width'] = 475
    lmp['height'] = 250
    lmp['layout'] = 'grid'
    lmp['merge_tools'] = True
    lmp['show_legend'] = True
    lmp['palette'] = Viridis256
    scb = on_box_selection_change
    rcb = on_box_selection_reset
    lmp['selection_manager'] = BoxSelectionManager(selection_callback=scb, reset_callback=rcb)  
    lmp['refresh_mode'] = 'one-by-one'
    return lmp

def image_model_properties(s=0, e=1000):
    xshsp = dict()
    xshsp['start'] = s
    xshsp['end'] = e
    xshsp['num_points'] = abs(e - s)
    xshsp['label'] = 'x-axis'
    xshsp['unit'] = 'mm'
    yshsp = dict()
    yshsp['start'] = s
    yshsp['end'] = e
    yshsp['num_points'] = abs(e - s)
    yshsp['label'] = 'y-axis'
    yshsp['unit'] = 'mm'
    imp = dict()
    imp['x_scale'] = Scale(**xshsp)
    imp['y_scale'] = Scale(**yshsp)
    return imp

In [None]:
# x Scale # ------------------------------------------------
shsp = dict()
shsp['label'] = 'angle'
shsp['unit'] = 'rad'
shsp['channel'] = 'x_scale'
shsp['start'] = -1
shsp['end'] = 1
x_scale = Scale(**shsp)
# y Scale # ------------------------------------------------
spsp = dict()
spsp['label'] = 'amplitude'
spsp['unit'] = 'a.u.'
y_scale = Scale(**spsp)
# SpectrumChannel 2 ----------------------------------------
s1 = list()
s1.append(XDS('x_scale'))
s1.extend([YDS(n) for n in ['y1.1', 'y1.2', 'y1.3']]) 
c1 = SCTest('c1', data_sources=s1)
# SpectrumChannel 1 ----------------------------------------
s2 = list()
s2.append(XDS('x_scale'))
s2.extend([YDS(n) for n in ['y2.1', 'y2.2', 'y2.3']]) 
c2 = SCTest('c2', data_sources=s2)
# layout parameters  ---------------------------------------
lmp = dict()
lmp['width'] = 475
lmp['height'] = 250
lmp['layout'] = 'grid'
lmp['merge_tools'] = True
lmp['show_legend'] = True
lmp['x_scale'] = x_scale
lmp['y_scale'] = y_scale
# LayoutChannel --------------------------------------------
l1 = LayoutChannel('l1', channels=[c1, c2], model_properties=lmp)
# ImageChannel 1 -------------------------------------------
ic0 = ICTest("i0", data_source=XYDS("is0"), model_properties=image_model_properties(0, 1000))
ic1 = ICTest("i1", data_source=XYDS("is1"), model_properties=image_model_properties(0, 1000))
# LayoutChannel --------------------------------------------
l2 = LayoutChannel('l2', channels=[ic0, ic1], model_properties=layout_model_properties())
# DataStream -----------------------------------------------
s1 = DataStream('s1', channels=[l1, l2])
# DataStreamer ---------------------------------------------
m1 = DataStreamer('m1', data_streams=[s1], update_period=1.)
# DataStreamerController -----------------------------------
c1 = DataStreamerController('c1', m1)

Let's try a GenericChannel : unknown channel type till the first call to pull_data returns a SCALAR, a SPECTRUM or an IMAGE.

In [None]:
class RingBuffer(np.ndarray):
    """
    a multidimensional ring buffer
    see https://gist.github.com/mtambos/aa435461084b5c0025d1
    """

    def __new__(cls, input_array):
        obj = np.asarray(input_array).view(cls)
        return obj

    def __array_finalize__(self, obj):
        if obj is None: return

    def __array_wrap__(self, out_arr, context=None):
        return np.ndarray.__array_wrap__(self, out_arr, context)

    def append(self, x):
        """adds element x to the ring buffer"""
        self[:-1] = self[1:]
        self[-1] = x

In [None]:
class TSDS(DataSource):
    
    def __init__(self, name, num_points=128):
        DataSource.__init__(self, name)
        array = np.empty((num_points,))
        array.fill(np.nan)
        self._data_buffer = RingBuffer(array)
        array = np.empty((num_points,), dtype=float)
        array.fill(np.nan)
        self._time_buffer = RingBuffer(array)
        self._cnt = -1
    
    def pull_data(self): 
        try:
            self._cnt += 1
            y = self._cnt % 16
            val = ChannelData()
            self._time_buffer.append( time.time() * 1000. )
            if 4 <= self._cnt <= 8:
                self._data_buffer.append(math.nan)
                val.set_error(err='Testing error handling...', exc=None)
            else:
                self._data_buffer.append(y)
            val.set_data(self._data_buffer, self._time_buffer, ChannelData.Format.SCALAR)
            self._cnt = y
        except Exception as e:
            print(e)
        return val
    
    def cleanup(self):
        super(TSDS, self).cleanup()

In [None]:
sch = GenericChannel('sch', data_source=TSDS('tsds'))
scd = DataStream('scd', channels=[sch])
scm = DataStreamer('scm', data_streams=[scd], update_period=1.)
scr = DataStreamerController('scr', scm)