In [1]:
from collections import namedtuple
import time
from datetime import datetime
import ipywidgets as widgets
from bokeh.plotting import figure, ColumnDataSource
from bokeh.io import output_notebook, push_notebook, show

import psutil
import rx
import rxsci as rs
import rx.operators as ops

In [2]:
output_notebook()

In [3]:
CpuMeasure = namedtuple("CpuMeasure", ['timestamp', 'value'])

def create_cpu_observable(period=.1):
    return rx.timer(duetime=period, period=period).pipe(
        ops.map(lambda i: CpuMeasure(
            int(datetime.utcnow().timestamp()),
            psutil.cpu_percent()
        ))
    )

In [7]:
def update_histogram(graph_id, source, histogram):
    edges, values = zip(*histogram)
    source.data = {
        'edges': edges,
        'values': values,
    }
    push_notebook(handle=graph_id)

source_cpu_total = ColumnDataSource(data={'edges': [], 'values': []})
source_cpu_recent = ColumnDataSource(data={'edges': [], 'values': []})

p_cpu_total = figure(title="CPU usage total distribution",plot_width=500, plot_height=150)
p_cpu_total.vbar(x='edges', top='values', width=1.0, source=source_cpu_total)

p_cpu_recent = figure(title="CPU usage distribution on last 3 minutes", plot_width=500, plot_height=150)
p_cpu_recent.vbar(x='edges', top='values', width=1.0, source=source_cpu_recent)

outw = widgets.Output()
display(outw)

with outw:
    h_cpu_total = show(p_cpu_total, notebook_handle=True)
    h_cpu_recent = show(p_cpu_recent, notebook_handle=True)

Output()

In [5]:
update_histogram(
    h_cpu_total,
    source_cpu_total,
    [(5, 3), (7, 12), (12, 5), (23, 3), (50, 17)]
)

In [9]:
disposable = create_cpu_observable().pipe(
    rs.state.with_memory_store(pipeline=rx.pipe(
        rs.ops.map(lambda i: i.value),
        rs.math.dist.update(),
        rs.math.dist.histogram(bin_count=20),
        rs.ops.map(lambda i: (h_cpu_total, source_cpu_total, i))
    )),
).subscribe(
    on_next=lambda i: update_histogram(*i),
)

# run for 2 minutes, comment these lines to run indefinitely
time.sleep(120)
disposable.dispose()

In [10]:
def compute_histogram(graph_id, source):
    return rx.pipe(
        rs.ops.map(lambda i: i.value),
        rs.math.dist.update(),
        rs.math.dist.histogram(bin_count=20),
        rs.ops.map(lambda i: (graph_id, source, i))
    )

    
disposable = create_cpu_observable().pipe(
    rs.state.with_memory_store(pipeline=rx.pipe(
        rs.ops.tee_map(
            # unbounded distribution
            compute_histogram(h_cpu_total, source_cpu_total),
            # bounded distribution of 3 minutes
            rs.data.split(
                predicate=lambda i: i.timestamp - (i.timestamp % 180),
                pipeline=compute_histogram(h_cpu_recent, source_cpu_recent),
            ),
            join="merge",
        )
    )),
).subscribe(
    on_next=lambda i: update_histogram(*i),
)

# run for 10 minutes, comment these lines to run indefinitely
time.sleep(600)
disposable.dispose()

In [None]:
# uncomment if you want to stop monitoring on demand, by running this cell
#disposable.dispose()