In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import ipyvuetify as v
import dataclasses
from typing import Any
from ipywidgets import Output
from pathlib import Path
from molino import prepare_transactions_db
from molino.transactions import ViewGuiTransaction, transactions_db, MultiTransaction, MultiToTransactions, Observation
from molino.ajax import fetch_transactions
from peewee import fn
import math
import traitlets
import numpy as np

In [3]:
stdout = Output()
stdout_container = v.Row(class_="d-none", children=[stdout])  # FIXME This is nasty

dbg_toggle = v.Switch(
    label="Debug me senpai",
    v_model=False,
)


def toggle_stdout(widget, event, data):
    if data:
        stdout_container.class_list.remove("d-none")
    else:
        stdout_container.class_list.add("d-none")


dbg_toggle.on_event("change", toggle_stdout)

c = v.Container(
    children=[
        v.Row(children=[dbg_toggle]),
        stdout_container,
    ],
)
c

Container(children=[Row(children=[Switch(label='Debug me senpai', layout=None, v_model=False)], layout=None), …

In [4]:
database_field = v.TextField(
    clearable=True,
    label="Path to database file",
    prepend_icon="mdi-database",
    v_model="",
)

database_field

TextField(clearable=True, label='Path to database file', layout=None, prepend_icon='mdi-database', v_model='')

In [5]:
class TransactionTable(v.VuetifyTemplate):
    template_file = "transactions-template.vue"

    headers = traitlets.List(traitlets.Dict()).tag(sync=True)
    transactions = traitlets.List(traitlets.Dict(), default_value=[]).tag(sync=True)
    transactions_count = traitlets.Integer(default_value=0).tag(sync=True)
    options = traitlets.Dict(default_value={}).tag(sync=True)
    filters = traitlets.Dict(default_value={}).tag(sync=True)
    penalty_threshold = traitlets.Float(default_value=0.05).tag(sync=True)
    penalty_range = traitlets.Tuple().tag(sync=True)
    penalty_mode = traitlets.Int(default_value=0).tag(sync=True)
    watermark_range = traitlets.Tuple().tag(sync=True)
    size_range = traitlets.Tuple().tag(sync=True)

    #  Execution time histograms
    hist_dialog = traitlets.Bool(default_value=False).tag(sync=True)
    hist_item = traitlets.Dict(default_value={}).tag(sync=True)
    hist_iso = traitlets.List(default_value=[]).tag(sync=True)
    hist_itf = traitlets.List(default_value=[]).tag(sync=True)
    hist_labels = traitlets.List(default_value=[]).tag(sync=True)

    @traitlets.default("headers")
    def _default_headers(self):
        return [
            {"text": "Name", "value": "mtr"},
            {"text": "Victim", "value": "tr"},
            {"text": "Size", "value": "size", "align": "right"},
            {"text": "Classification (PML)", "value": "interfering"},
            {"text": "HWM", "value": "hwm", "align": "right"},
            {"text": "LWM", "value": "lwm", "align": "right"},
            {"text": "Isolation (HWM)", "value": "ref_hwm", "align": "right"},
            {"text": "Penalty (HWM)", "value": "penalty_hwm", "align": "right"},
            {"text": "Isolation (LWM)", "value": "ref_lwm", "align": "right"},
            {"text": "Penalty (LWM)", "value": "penalty_lwm", "align": "right"},
            {"text": "Actions", "value": "actions"},
        ]

    def is_database_ready(self) -> bool:
        """Check if database connection is ready."""
        return transactions_db.is_connection_usable()

    @traitlets.default("size_range")
    def _default_size_range(self):
        if not self.is_database_ready():
            return tuple()
        r = MultiToTransactions.select(fn.MIN(MultiToTransactions.arity),
                                       fn.MAX(MultiToTransactions.arity)).scalar(as_tuple=True)
        return int(math.floor(r[0])), int(math.ceil(r[1]))

    @traitlets.default("penalty_range")
    def _default_penalty_range(self):
        if not self.is_database_ready():
            return tuple()
        r = ViewGuiTransaction.select(fn.MIN(ViewGuiTransaction.penalty_hwm),
                                      fn.MAX(ViewGuiTransaction.penalty_hwm)).scalar(as_tuple=True)
        return int(math.floor(r[0])), int(math.ceil(r[1]))

    @traitlets.default("watermark_range")
    def _default_watermark_range(self):
        if not self.is_database_ready():
            return tuple()
        r = ViewGuiTransaction.select(fn.MIN(ViewGuiTransaction.hwm), fn.MAX(ViewGuiTransaction.lwm)).scalar(
            as_tuple=True)
        return int(math.floor(r[0])), int(math.ceil(r[1]))

    @traitlets.observe("options")
    def update_options(self, changes):
        data = changes["new"]
        self.options = data
        self.refresh_contents()

    def refresh_defaults(self):
        self.watermark_range = self._default_watermark_range()
        self.penalty_range = self._default_penalty_range()
        self.size_range = self._default_size_range()

    def refresh_contents(self):
        if not self.is_database_ready():
            return
        with stdout:
            # Cleanup classification values
            if "interfering" in self.filters:
                def _classif_to_filter(v: str):
                    match v:
                        case "True" | "1":
                            return "1"
                        case "False" | "0":
                            return "0"
                        case "None" | "":
                            return ""
                    return None

                self.filters["interfering"] = [_classif_to_filter(v) for v in self.filters["interfering"]]
            # Format filters
            filters = [(k, v) for (k, f) in self.filters.items() for v in f if len(f) > 0]
            if len(filters) == 0:
                filters = None
            #
            print("Options:", self.options)
            print("Filters:", self.filters, "Actual:", filters)
            tr_count, trs = fetch_transactions(
                page=self.options["page"],
                page_size=self.options["itemsPerPage"],
                sort_by=self.options["sortBy"][0] if len(self.options["sortBy"]) else "id",
                filters=filters,
                descending=self.options["sortDesc"][0] if len(self.options["sortDesc"]) else False,
                logger=None,
            )
            # Update visible transactions
            updated_transactions = [dataclasses.asdict(t) for t in trs]
            # FIX Names not in expected format
            for t in updated_transactions:
                t["mtr_name"] = t["mtr"]
                t["mtr"] = t["mtr"].split("||")
                t["penalty_hwm"] = (
                        (t["timing_isolation"]["hwm"]["value"] - t["timing"]["lwm"]["value"]) /
                        t["timing_isolation"]["hwm"]["value"]
                )
                t["penalty_lwm"] = (
                        (t["timing_isolation"]["lwm"]["value"] - t["timing"]["hwm"]["value"]) /
                        t["timing_isolation"]["lwm"]["value"]
                )
            self.transactions = updated_transactions
            self.transactions_count = tr_count

    def vue_update_filter(self, change: tuple[str, Any]):
        with stdout:
            field, val = change
            print("Update filter:", field, val)
            if val is None or len(val) == 0:
                updated_filters = dict(self.filters)
                del updated_filters[field]
                self.filters = updated_filters
            else:
                updated_filters = self.filters
                updated_filters[field] = val if isinstance(val, list) else [val]
                self.filters = updated_filters
            self.refresh_contents()

    def vue_update_timing_histogram(self, change: dict):
        with stdout:
            print("Update timing histogram:", change)
            self.hist_dialog = True
            self.hist_item = change
            # Retrieve multi-transaction id in isolation
            iso_mtri = MultiToTransactions.select(MultiToTransactions.multi_transaction).where(
                (MultiToTransactions.transaction == change["aid"]) &
                (MultiToTransactions.arity == 1)
            ).scalar()
            # Collect and format observations
            itf = list(Observation.select(Observation.value).where(
                (Observation.transaction == change["aid"]) & (
                            Observation.multi_transaction == change["mtri"])).tuples().execute())
            iso = list(Observation.select(Observation.value).where(
                (Observation.transaction == change["aid"]) & (Observation.multi_transaction == iso_mtri)).tuples().execute())
            itf = [i[0] for i in itf]
            iso = [i[0] for i in iso]
            #
            BINS_COUNT = 10
            RANGE = (min(itf + iso), max(itf + iso))
            # Move observations into bins
            hist_itf, edges = np.histogram(itf, bins=BINS_COUNT, range=RANGE, density=True)
            hist_iso, _ = np.histogram(iso, bins=BINS_COUNT, range=RANGE, density=True)
            # Cast to Python float (for ipyvuetify compatibility)
            self.hist_itf = [n.item() for n in hist_itf]
            self.hist_iso = [n.item() for n in hist_iso]
            self.hist_labels = [f"{round(edges[i].item(), 2)}" for i, _ in enumerate(edges[:-1])]
            # self.hist_labels = [f"[{round(edges[i].item(), 2)}:{round(edges[i + 1].item(), 2)})" for i, _ in
            #                     enumerate(edges[:-1])]


t = TransactionTable()


def update_database(widget, event, data):
    database_path = Path(data)
    if not database_path.exists():
        database_field.error = True
        database_field.error_messages = "Database does not exists"
    else:
        database_field.error = False
        database_field.error_messages = []
        database_field.loading = True
        try:
            prepare_transactions_db(database_path)
            t.refresh_contents()
            t.refresh_defaults()
        finally:
            database_field.loading = False


database_field.on_event("change", update_database)

c = v.Container(
    children=[
        v.Row(children=[t]),
    ]
)
c



Container(children=[Row(children=[TransactionTable(events=['update_filter', 'update_timing_histogram'], header…