In [None]:
import datetime
from pandas import Timestamp

from ipywidgets import interact

from bokeh.models.sources import ColumnDataSource
from bokeh.plotting import figure
from bokeh.io import push_notebook, show, output_notebook

from confluent_kafka import Consumer, KafkaError

In [None]:
output_notebook()

In [None]:
consumer = Consumer({'bootstrap.servers': 'localhost:9092', 'group.id': 'test-group',
              'default.topic.config': {'auto.offset.reset': 'earliest'}})

consumer.subscribe(['prices'])

In [None]:
price_figure = figure(title='Prices and binned prices', plot_height=300, 
                      plot_width=600, y_range=(0, 140), x_axis_type='datetime')
price_figure.xaxis.axis_label = 'Price timestamp'
price_figure.yaxis.axis_label = 'Price'

price_data_A = ColumnDataSource(data=dict(x=[datetime.datetime(2017,1,1)], 
                                        y=[100]))
price_data_B = ColumnDataSource(data=dict(x=[datetime.datetime(2017,1,1)], 
                                        y=[100]))
line_AAA = price_figure.line(x="x", y="y", color="blue", source=price_data_A, legend='AAA')
line_BBB = price_figure.line(x="x", y="y", color="red", source=price_data_B, legend='BBB')
handle = show(price_figure, notebook_handle=True)

xA, yA = [], []
xB, yB = [], []
updated_data_A = dict(x=xA, y=yA)
updated_data_B = dict(x=xB, y=yB)

n_show = 100


In [None]:
running = True
while running:
    msg = consumer.poll()
    if not msg.error():
        #print(f'Received message: {msg.value().decode("utf-8")}')
        dt, sec, prc = msg.value().decode("utf-8").split(',')
        dt = Timestamp(dt).to_pydatetime()
        prc = float(prc)

        (price_data, updated_data, xUp, yUp) = (price_data_A, updated_data_A, xA, yA) \
                                    if sec == 'AAA' else \
                                    (price_data_B, updated_data_B, xB, yB)
        xUp.append(dt)
        yUp.append(prc)
        
        updated_data['x'] = xUp = xUp[-n_show:]
        updated_data['y'] = yUp= yUp[-n_show:]
        price_data.stream(updated_data, n_show)
        
        push_notebook(handle=handle)
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False
        
c.close()