# Example InfluxDB Jupyter notebook - stream data

This example demonstrates how to query data from InfluxDB 2.0 using Flux and display results in real time.

Prerequisites:
1. Start InfluxDB: `./scripts/influxdb-restart.sh`
2. Start Telegraf: `telegraf  -config ./notebooks/telegraf.conf`
3. install fallowing dependencies: `rx`, `pandas`, `streamz`, `hvplot`

In [None]:
# Import dev version of client

import os
import sys

sys.path.insert(0, os.path.abspath('../'))

In [None]:
from datetime import timedelta

import hvplot.streamz
import pandas as pd
import rx
from rx import operators as ops

from streamz.dataframe import Random, DataFrame
from streamz import Stream
from influxdb_client import InfluxDBClient

In [None]:
def source_data(auto_refresh: int, sink: Stream):
    rx \
        .interval(period=timedelta(seconds=auto_refresh)) \
        .pipe(ops.map(lambda start: f'from(bucket: "my-bucket") '
                                    f'|> range(start: -{auto_refresh}s, stop: now()) '
                                    f'|> filter(fn: (r) => (r._measurement == "cpu") or (r._measurement == "mem")) ')) \
        .pipe(ops.map(lambda query: client.query_api().query_stream(query))) \
        .pipe(ops.flat_map(lambda records: rx.from_iterable(records))) \
        .subscribe(observer=lambda record: sink.emit(record), on_error=lambda error: print(error))
    pass

In [None]:
client = InfluxDBClient(url="http://localhost:9999", token="my-token", org="my-org", debug=False)

sink = Stream()
source_data(auto_refresh=5, sink=sink)

In [None]:
cpu_example = pd.DataFrame({'value': []}, columns=['value'])

cpu_sink = sink\
    .filter(lambda record: (record["_measurement"] == "cpu") & (record["_field"] == "usage_user"))\
    .map(lambda record: pd.DataFrame({'value': [record["_value"]]}, columns=['value'], index=[record["_time"]]))
cpu = DataFrame(cpu_sink, example=cpu_example)

In [None]:
mem_example = pd.DataFrame({'field': [], 'value': []}, columns=['field', 'value'])

mem_sink = sink \
    .filter(lambda record: record["_measurement"] == "mem") \
    .filter(lambda record: record["_field"] in ["total", "used", "free", "available"]) \
    .map(lambda record: pd.DataFrame({'field': record["_field"], 'value': record["_value"]},
                                     columns=['field', 'value'], index=[record["_time"], record["_field"]]))
mem = DataFrame(mem_sink, example=mem_example)

In [None]:
from bokeh.models.formatters import DatetimeTickFormatter

# Time formatter
formatter = DatetimeTickFormatter(
    microseconds = ["%H:%M:%S"],
    milliseconds = ["%H:%M:%S"],
    seconds = ["%H:%M:%S"],
    minsec = ["%H:%M:%S"],
    minutes = ["%H:%M:%S"],
    hourmin = ["%H:%M:%S"],
    hours=["%H:%M:%S"],
    days=["%H:%M:%S"],
    months=["%H:%M:%S"],
    years=["%H:%M:%S"],
)

cpu.hvplot(width=700, backlog=50, title='CPU % usage', xlabel='Time', ylabel='%', xformatter=formatter)

In [None]:
 mem.groupby('field').sum().hvplot.bar()