# Perspective Streaming Data Example
In this example, we will use [csp](https://github.com/point72/csp) and [superstore](https://github.com/timkpaine/superstore) to build a streaming data graph, and pump data into `perspective` widgets.

`superstore` is a library with some simulated data sources, and `csp` is a stream processing library. The details of `csp` are not super important for the purpose of this talk, but here we construct a streaming DAG of data which will feed `perspective` at different intervals

In [1]:
import csp
import pandas as pd
from datetime import timedelta
from ipywidgets import HBox, VBox
from perspective import PerspectiveWidget
from csp_nodes import *

In [2]:
m = machines()
pd.DataFrame(m)

Unnamed: 0,machine_id,kind,cores,region,zone
0,16e51c20e5d9,edge,4,eu,A
1,86d9adf37ac0,core,32,ap,A
2,b2150cf80c80,core,8,eu,B
3,ebd69d51048f,core,16,eu,B
4,6e9b01a7ff9d,worker,64,na,D
...,...,...,...,...,...
95,7d0f6a8aea23,edge,4,ap,D
96,33799ea29b31,core,16,ap,B
97,ec281a4e6e3d,worker,32,na,D
98,1f47c9c99837,worker,64,na,D


In [5]:
u = usage(m[4])
u = usage(u)
u

{'machine_id': '6e9b01a7ff9d',
 'kind': 'worker',
 'cores': 64,
 'region': 'na',
 'zone': 'D',
 'cpu': 82.7,
 'mem': 72.39,
 'free': 27.61,
 'network': 74.81,
 'disk': 89.81}

In [6]:
status(u)

{'machine_id': '6e9b01a7ff9d',
 'kind': 'worker',
 'cores': 64,
 'region': 'na',
 'zone': 'D',
 'cpu': 82.7,
 'mem': 72.39,
 'free': 27.61,
 'network': 74.81,
 'disk': 89.81,
 'last_update': datetime.datetime(2024, 4, 24, 9, 20, 41, 439707),
 'status': 'capacity'}

In [8]:
jobs(u)

{'machine_id': '6e9b01a7ff9d',
 'job_id': '8bfe7e7d5c30',
 'name': 'nocturnal-puma',
 'units': 1,
 'start_time': datetime.datetime(2024, 4, 24, 9, 20, 54, 701459),
 'end_time': datetime.datetime(2024, 4, 24, 9, 21, 24, 701460)}

In [9]:
@csp.graph
def main_graph(
    machines_widget: PerspectiveWidget,
    usage_widget: PerspectiveWidget,
    status_widget: PerspectiveWidget,
    jobs_widget: PerspectiveWidget,
):
    # A randomly generated list of "machines"
    all_machines = machines()

    # construct three ticking nodes for usage, status, and jobs
    usage = machine_usage(all_machines, timedelta(seconds=1))
    status = machine_status(usage, timedelta(seconds=5))
    jobs = machine_jobs(all_machines, timedelta(seconds=5))

    # push all of our data to 4 separate perspective tables
    push_to_perspective(csp.const(all_machines), machines_widget)
    push_to_perspective(usage, usage_widget)
    push_to_perspective(status, status_widget)
    push_to_perspective(jobs, jobs_widget)

In [10]:
# construct 4 separate perspective widgets. Each will have its own table internally
machines_widget = PerspectiveWidget(MACHINE_SCHEMA, index="machine_id", settings=False)
usage_widget = PerspectiveWidget(USAGE_SCHEMA, index="machine_id", settings=False)
status_widget = PerspectiveWidget(STATUS_SCHEMA, index="machine_id", sort=[["last_update", "desc"]], settings=False)
jobs_widget = PerspectiveWidget(JOBS_SCHEMA, sort=[["start_time", "desc"]], settings=False)

In [11]:
# a little bit of layout with ipywidgets
VBox(children=[
    HBox(children=[machines_widget, usage_widget]),
    HBox(children=[status_widget, jobs_widget]),
])

VBox(children=(HBox(children=(PerspectiveWidget(columns=['machine_id', 'kind', 'cores', 'region', 'zone'], set…

In [12]:
csp.run_on_thread(main_graph, machines_widget, usage_widget, status_widget, jobs_widget, realtime=True, daemon=True)

<csp.impl.wiring.threaded_runtime.ThreadRunner at 0x163479550>

In [13]:
status_widget.plugin = "X Bar"
status_widget.group_by = ["status"]
status_widget.columns = ["machine_id"]
status_widget.aggregates = {"status": "last"}