# 🔧 Utilities

Some utilities for demos, used in other notebooks.

In [None]:
if __name__ == "__main__" and "pyodide" in __import__("sys").modules:
    %pip install -q -r requirements.txt

In [None]:
import asyncio
import json
import random
import traceback
from pathlib import Path
from typing import Any

import ipywidgets as W
import traitlets as T

import ipyforcegraph.behaviors as B
import ipyforcegraph.graphs as G
import ipyforcegraph.sources as S

In [None]:
def make_a_demo(source=None, dataset="datasets/miserables.json", GraphClass=None):
    GraphClass = GraphClass or G.ForceGraph
    if source is None:
        data = json.loads(Path(dataset).read_text())
        source = S.DataFrameSource(**data)
    fg = GraphClass(source=source, layout=dict(min_height="500px", flex="1"))
    style = W.HTML(
        """<style>
        .jp-fg-demo{
            --jp-widgets-container-padding: 0.25em;
            --jp-widgets-inline-width: auto;
        } 
        .widget-box:empty{display:none;}
    </style>"""
    )
    description_ui = W.HTML(layout=dict(flex="0"))
    graph_ui = W.VBox()
    node_ui = W.VBox()
    link_ui = W.VBox()
    ui = W.VBox(
        [graph_ui, node_ui, link_ui], layout=dict(width="500px", overflow_y="scroll")
    )
    layout = dict(height="100%", max_height="100vh")
    fg_wrap = W.VBox([description_ui, fg], layout=dict(flex="1", **layout))
    box = W.HBox([style, fg_wrap, ui], layout=layout)
    box.add_class("jp-fg-demo")
    box.add_traits(
        behaviors=T.Dict(),
        node_ui=T.Dict(),
        link_ui=T.Dict(),
        graph_ui=T.Dict(),
        description=T.Unicode(),
    )

    def on_box_behaviors(change=None):
        new_behaviors = [b for b in box.behaviors.values() if b is not None]
        fg.behaviors = tuple(new_behaviors)

    box.observe(on_box_behaviors, ["behaviors"])

    T.dlink((box, "graph_ui"), (graph_ui, "children"), lambda d: tuple(d.values()))
    T.dlink((box, "node_ui"), (node_ui, "children"), lambda d: tuple(d.values()))
    T.dlink((box, "link_ui"), (link_ui, "children"), lambda d: tuple(d.values()))
    T.dlink((box, "description"), (description_ui, "value"), lambda d: d or "")
    return fg, box

In [None]:
def make_an_rgba_picker(**colors):
    kw = {"min": 0, "layout": {"width": "100%"}}
    r, g, b = [
        W.IntSlider(colors.get(x, 0), description=x, max=255, **kw) for x in "rgb"
    ]
    a = W.FloatSlider(colors.get("a", 0), description="a", max=1, **kw)
    sliders = [r, g, b, a]
    box = W.VBox(sliders, layout=dict(width="100%"))
    box.add_traits(color=T.Unicode())

    def update(*args):
        return f"rgba({r.value}, {g.value}, {b.value}, {a.value})"

    [T.dlink((s, "value"), (box, "color"), update) for s in sliders]
    return box

In [None]:
def make_a_collapsible_picker(title, children):
    ui_children = []
    ui = W.HBox(layout=dict(flex_wrap="wrap"))
    box = W.Accordion([ui], titles=[title])
    select = None
    if len(children) > 1:
        select = W.Dropdown(options=list(children), layout=dict(flex="0"))
        ui.children = [select]
        T.dlink((select, "value"), (ui, "children"), lambda x: [select, *children[x]])
        T.dlink((select, "value"), (box, "titles"), lambda x: (f"{title} ({x})",))
        box.titles = (f"{title} ({select.value})",)
    elif len(children) == 1:
        ui.children = tuple(list(children.values())[0])
    else:
        raise ValueError(f"unexpected number of children {children}")
    box._select = select
    return box

In [None]:
def make_random_color_series(fg, trait, column_name):
    df = getattr(fg.source, trait)
    digits = "01234567abcdef"
    df[column_name] = [
        "#" + "".join([random.choice(digits) for j in range(6)]) for i in range(len(df))
    ]
    fg.source.send_state(trait)

In [None]:
def make_link_dropdown_responsive(behavior, label, ui, box):
    box.behaviors = {**box.behaviors, label: None}

    def on_select(change: T.Bunch = None):
        new_behaviors = dict(box.behaviors.items())
        if ui._select.value == "off":
            new_behaviors[label] = None
        else:
            new_behaviors[label] = behavior
        box.behaviors = new_behaviors

    ui._select.observe(on_select, T.All)
    on_select()

In [None]:
def make_link_behavior_with_ui(WidgetClass, label, column_name, is_color=False):
    def add_behavior(fg, box, column_name=column_name):
        behavior = WidgetClass(column_name=column_name)
        if is_color:
            if column_name not in fg.source.links:
                make_random_color_series(fg, "links", column_name)
        ui_column_name = W.Dropdown(
            options=list(fg.source.links.columns), value=column_name
        )
        subscribe_to_columns(fg, "links", ui_column_name, "options")
        ui_template = W.Textarea()
        ui_template_enabled = W.Checkbox(description="enabled?")
        T.link((ui_column_name, "value"), (behavior, "column_name"))
        T.dlink(
            (ui_template, "value"),
            (behavior, "template"),
            lambda x: x if ui_template_enabled.value else "",
        )

        ui = make_a_collapsible_picker(
            label,
            {
                "off": [],
                "column": [ui_column_name],
                "template": [W.VBox([ui_template_enabled, ui_template])],
            },
        )

        box.link_ui = {**box.link_ui, label: ui}

        make_link_dropdown_responsive(behavior, label, ui, box)

        return fg, box

    return add_behavior

In [None]:
def subscribe_to_columns(fg, elements, ui, trait, tx=None):
    tx = tx or list

    def on_source_change(change):
        setattr(ui, trait, tx(getattr(fg.source, elements).columns))

    fg.observe(on_source_change, ["source"])

## Forces

In [None]:
def ensure_graph_forces(fg, box):
    if "graph_forces" not in box.behaviors:
        box.behaviors = {**box.behaviors, "graph_forces": B.GraphForces()}

    return box.behaviors["graph_forces"]

In [None]:
def make_a_force_picker(title, children):
    ui_children = []
    ui = W.HBox(layout=dict(flex_wrap="wrap"))
    box = W.Accordion([ui], titles=[title])
    select = None
    if len(children) > 1:
        options = [(key, value) for key, value in children.items()]
        select = W.Dropdown(options=options, layout=dict(flex="0"))
        ui.children = [select]
        T.dlink((select, "value"), (ui, "children"), lambda x: [select, *select.value])
        T.dlink(
            (select, "value"),
            (box, "titles"),
            lambda x: (f"{title} ({[n for n, v in select.options if v is x][0]})",),
        )
        box.titles = (f"{title} ({select.value})",)
    elif len(children) == 1:
        ui.children = tuple(list(children.values())[0])
    else:
        raise ValueError(f"unexpected number of children {children}")
    box._select = select
    return box

In [None]:
def ensure_graph_force_ui(fg, box, children: dict):
    if "forces" not in box.graph_ui:
        ui = make_a_force_picker("Forces", {"link": [], "charge": [], "center": []})
        box.graph_ui = {**box.graph_ui, "forces": ui}

    ui = box.graph_ui["forces"]
    existing = {key: child for key, child in ui._select.options}
    for key, child in children.items():
        existing[key] = child

    ui._select.options = [(key, value) for key, value in existing.items()]
    return ui, existing

## Tests

In [None]:
def wait_for_change(widget: W.Widget, value: Any, timeout=10) -> asyncio.Future:
    """Initial pattern from
    https://ipywidgets.readthedocs.io/en/stable/examples/Widget%20Asynchronous.html
    """

    future: asyncio.Future = asyncio.Future()

    def getvalue(change: T.Bunch) -> None:
        """make the new value available"""
        future.set_result(change.new)

    def unobserve(f: Any) -> None:
        """unobserves the `getvalue` callback"""
        widget.unobserve(getvalue, value)

    future.add_done_callback(unobserve)

    widget.observe(getvalue, value)

    return asyncio.wait_for(future, timeout)

In [None]:
async def run_tests(fg, box, tests: list, timeout=30):
    result = ""
    errors = []
    for test in tests:
        box.description = test.__doc__

        try:
            await asyncio.wait_for(test(fg, box), timeout)
            box.description = f"✅ {test.__doc__}"
            result += "✅"
        except Exception as err:
            box.description = f"💥 {test.__doc__}"
            errors += [str(err), traceback.format_exc()]
            result += "💥"
    box.description = f"{result} Tests Completed"
    if errors:
        nl = "\n"
        box.description += f"""<pre>{nl.join(errors)}</pre>"""