In [None]:
%load_ext watermark


In [None]:
import itertools as it
import os
import typing
import warnings

from downstream import dstream
from IPython.display import display
import matplotlib as mpl
import more_itertools as mit
import numpy as np
import pandas as pd
import polars as pl
import seaborn as sns
from teeplot import teeplot as tp
from tqdm.auto import tqdm


In [None]:
%watermark -diwmuv -iv


In [None]:
tp.save[".pgf"] = False
teeplot_subdir = os.environ.get("NOTEBOOK_NAME", "2025-07-19-mem-perf-tilted")
teeplot_subdir


## Calc comparator capacities


In [None]:
algo_names = {
    # "dstream.circular_algo": "simple ringbuf",
    # "control_throwaway_algo": "no-operation",
    "dstream.tilted_algo": "extended ringbuf",
    # "dstream_tilted_algo": "extended ringbuf LUT",
    # "doubling_tilted_algo": "naive doubling",
    # "zhao_tilted_algo": "pyrimidal bucket",
    "zhao_tilted_full_algo": "saturating bucket",
}


df = pl.concat(
    [
        pl.read_csv("https://osf.io/cbqpx/download"),
    ],
)
print("\n".join(df["algo_name"].unique().to_list()))
df = (
    df.cast(
        {
            "memory_bytes": pl.Int32,
            "num_items": pl.Int32,
            "num_sites": pl.Int32,
            "duration_s": pl.Float64,
            "replicate": pl.Int32,
        },
    )
    .filter(
        pl.col("algo_name").is_in(
            algo_names.keys(),
        ),
        pl.col("num_items") == 1_000_000,
    )
    .with_columns(
        algorithm=pl.col("algo_name").map_elements(
            algo_names.__getitem__,
            return_dtype=str,
        ),
        num_retained=(
            pl.when(pl.col("algo_name") == "dstream_tilted_algo")
            .then(pl.col("num_sites"))
            .when(pl.col("algo_name") == "doubling_tilted_algo")
            .then(pl.col("num_sites") * 0.75)
            .when(pl.col("algo_name") == "zhao_tilted_algo")
            .then(19)
            .when(pl.col("algo_name") == "zhao_tilted_full_algo")
            .then(pl.col("num_sites"))
            .otherwise(pl.col("num_sites"))
        ),
    )
    .with_columns(
        (pl.col("memory_bytes") / pl.col("num_retained")).alias(
            "bytes per item"
        ),
    )
    .with_columns(
        (8 * pl.col("memory_bytes") / pl.col("num_retained")).alias(
            "bits per item"
        ),
    )
    .with_columns(
        memory_bits=pl.col("memory_bytes") * 8,
        item_bits=(
            pl.col("data_type").map_elements(
                {"bit": 1, "byte": 8, "word": 16, "double word": 32}.get,
                return_dtype=pl.Int32,
            )
        ),
    )
    .with_columns(
        overhead_bits=(
            pl.col("memory_bits")
            - pl.col("item_bits") * pl.col("num_retained")
        ),
    )
    .with_columns(overhead_bytes=pl.col("overhead_bits") // 8)
    .with_columns(
        (100 * pl.col("item_bits") / pl.col("bits per item")).alias(
            "Memory Efficiency (%)"
        ),
    )
    .with_columns(pl.col("data_type").alias("data type"))
    .filter(pl.col("replicate") == 0)
)

display(df.describe()), display(df.head()), display(df.tail());


In [None]:
records = []
for (data_type, item_bits, num_sites,), group in df.to_pandas().groupby(
    ["data_type", "item_bits", "num_sites"],
):
    assert len(group) == 2
    tilted_memory_bits = group.loc[
        group["algorithm"] == "extended ringbuf", "memory_bits"
    ].item()
    zhao_memory_bits = group.loc[
        group["algorithm"] == "saturating bucket", "memory_bits"
    ].item()

    assert tilted_memory_bits < zhao_memory_bits
    zhao_capacity = (
        num_sites - (zhao_memory_bits - tilted_memory_bits) // item_bits
    )

    records.append(
        {
            "data_type": data_type,
            "num_sites": num_sites,
            "item_bits": item_bits,
            "tilted_memory_bits": tilted_memory_bits,
            "ringbuf_capacity": num_sites,
            "zhao_memory_bits": zhao_memory_bits,
            "zhao_capacity": zhao_capacity,
        }
    )

sizes_df = pd.DataFrame.from_records(records)
sizes_df


## Define


In [None]:
def calc_max_qos_from_segment_lengths(segment_lengths: list[int]) -> float:
    segment_total = sum(segment_lengths)
    return max(
        (segment_length - 1)
        / (segment_total - cumulative - segment_length + 1 or 1)
        for cumulative, segment_length in zip(
            it.accumulate([0, *segment_lengths]),
            [*segment_lengths, 1],
        )
    )


In [None]:
def calc_mean_qos_from_segment_lengths(segment_lengths: list[int]) -> float:
    segment_total = sum(segment_lengths)
    segment_fenceposts = it.accumulate([0, *segment_lengths])
    values = []
    for begin, end in it.pairwise(segment_fenceposts):
        for rank in range(begin, end):
            depth = segment_total - rank
            assert depth
            values.append((end - begin - 1) / depth)

    values.append(0)  # fencepost

    assert len(values) == segment_total + 1
    assert max(values) == calc_max_qos_from_segment_lengths(segment_lengths)
    return np.mean(values)


In [None]:
def calc_max_gaps_dstream(
    buffer_size: int,
    num_items: int,
    calc_qos: typing.Callable,
) -> list[int]:
    inventories = dstream.tilted_algo.lookup_ingest_times_batched(
        buffer_size,
        np.arange(buffer_size, num_items),
    ).astype(np.int64)
    sorted_inventories = np.sort(
        inventories,
        axis=1,
    )
    gaps = np.diff(sorted_inventories, axis=1)
    return [0] * buffer_size + [calc_qos(gap_lengths) for gap_lengths in gaps]


In [None]:
def calc_max_gaps_zhao_tilted_full(
    buffer_size: int,
    num_items: int,
    calc_qos: typing.Callable,
) -> list[int]:
    bucket_sizes = [buffer_size, 0]
    buffer = [*range(buffer_size)]
    max_gaps = [0]

    S = buffer_size
    w = bucket_sizes
    for k in tqdm(range(num_items), leave=False):
        if k < S:
            assert buffer[k] == k
        else:
            bucket_sizes[0] += 1
            i = S
            j = 0
            while w[j] <= w[j + 1]:
                i -= w[j]
                j += 1
                if j == len(w) - 1:
                    w.append(0)

            assert 0 <= i <= S
            w[j] -= 2
            w[j + 1] += 1
            for n in range(i - w[j], S - 1):
                assert 0 < n < S - 1
                buffer[n] = buffer[n + 1]
            buffer[S - 1] = k

        assert buffer == sorted(buffer)
        assert sum(bucket_sizes) == buffer_size

        segment_lengths = [b - a for a, b in mit.pairwise(sorted(buffer))]
        assert (
            sum(segment_lengths)
            == buffer_size + max(k - buffer_size + 1, 0) - 1
        )
        max_gaps.append(
            calc_qos(segment_lengths),
        )

    return max_gaps[:-1]


## Example Plot


In [None]:
qos_num_items = 10_000


def make_df(buffer_size_ringbuf: int, buffer_size_zhao: int) -> pl.DataFrame:
    return pl.concat(
        [
            pl.DataFrame(
                {
                    "Algorithm": "extended ringbuf",
                    "Gap Size Cost": np.mean(
                        calc_max_gaps_dstream(
                            buffer_size_ringbuf,
                            qos_num_items,
                            calc_max_qos_from_segment_lengths,
                        )
                    ),
                    "Gap Size Cost Mean": np.mean(
                        calc_max_gaps_dstream(
                            buffer_size_ringbuf,
                            qos_num_items,
                            calc_mean_qos_from_segment_lengths,
                        )
                    ),
                    "Num Items Ingested": qos_num_items,
                },
                strict=False,
            ),
            pl.DataFrame(
                {
                    "Algorithm": "saturating bucket",
                    "Gap Size Cost": np.mean(
                        calc_max_gaps_zhao_tilted_full(
                            buffer_size_zhao,
                            qos_num_items,
                            calc_max_qos_from_segment_lengths,
                        )
                    )
                    if buffer_size_zhao > 1
                    else np.inf,
                    "Gap Size Cost Mean": np.mean(
                        calc_max_gaps_zhao_tilted_full(
                            buffer_size_zhao,
                            qos_num_items,
                            calc_mean_qos_from_segment_lengths,
                        )
                    )
                    if buffer_size_zhao > 1
                    else np.inf,
                    "Num Items Ingested": qos_num_items,
                },
                strict=False,
            ),
        ],
    )


In [None]:
def _row_to_df(row: dict) -> pl.DataFrame:
    df = make_df(
        buffer_size_ringbuf=row["ringbuf_capacity"],
        buffer_size_zhao=row["zhao_capacity"],
    )
    return df.with_columns(
        data_type=pl.lit(row["data_type"]),
        item_bits=pl.lit(row["item_bits"]),
        num_sites=pl.lit(row["num_sites"]),
        tilted_memory_bits=pl.lit(row["tilted_memory_bits"]),
        zhao_memory_bits=pl.lit(row["zhao_memory_bits"]),
    )


In [None]:
records = sizes_df.to_dict(orient="records")
dfs = [*map(_row_to_df, tqdm(records))]


In [None]:
qos_df = pl.concat(dfs)
qos_df


In [None]:
qos_long_df = (
    qos_df.unpivot(
        index=[
            "data_type",
            "item_bits",
            "num_sites",
            "Algorithm",
            "tilted_memory_bits",
        ],
        on=["Gap Size Cost", "Gap Size Cost Mean"],
        variable_name="metric",
        value_name="value",
    )
    .pivot(
        values="value",
        index=[
            "data_type",
            "item_bits",
            "num_sites",
            "metric",
            "tilted_memory_bits",
        ],
        on="Algorithm",
    )
    .with_columns(tilted_memory_bytes=pl.col("tilted_memory_bits") // 8)
    .with_columns(
        (
            100
            * (
                pl.col("saturating bucket").log()
                - pl.col("extended ringbuf").log()
            )
        )
        .replace(np.nan, np.inf)
        .alias("sympct")
    )
    .with_columns(
        (
            100
            * (pl.col("saturating bucket") - pl.col("extended ringbuf"))
            / (pl.col("extended ringbuf") + pl.col("saturating bucket"))
        )
        .replace(np.inf, 100)
        .replace(np.nan, 100)
        .alias("symetricized")
    )
    .with_columns(
        (
            100
            * (pl.col("saturating bucket") - pl.col("extended ringbuf"))
            / pl.col("extended ringbuf").abs()
        )
        .replace(np.nan, np.inf)
        .alias("percent")
    )
)
qos_long_df


## Perf Data


In [None]:
def get_df_perf_long(extended_ringbuf: str) -> pl.DataFrame:
    algo_names = {
        extended_ringbuf: "extended ringbuf",
        # "dstream.circular_algo": "simple ringbuf",
        # "control_throwaway_algo": "no-operation",
        # "dstream.tilted_algo": "extended ringbuf",
        # "dstream_tilted_algo": "extended ringbuf LUT",
        # "doubling_tilted_algo": "naive doubling",
        # "zhao_tilted_algo": "pyrimidal bucket",
        "zhao_tilted_full_algo": "saturating bucket",
    }

    df_perf = pl.concat(
        [
            pl.read_csv("https://osf.io/cbqpx/download").with_columns(
                platform=pl.lit("ARM Cortex-M0+"),
            ),
            pl.read_csv("https://osf.io/ewmk9/download").with_columns(
                platform=pl.lit("ARM Apple M1"),
            ),
            pl.read_csv("https://osf.io/k3vbu/download").with_columns(
                platform=pl.lit("x86 Intel Core Ultra 7 155U"),
            ),
        ],
    )
    df_perf = (
        df_perf.cast(
            {
                "memory_bytes": pl.Int32,
                "num_items": pl.Int32,
                "num_sites": pl.Int32,
                "duration_s": pl.Float64,
                "replicate": pl.Int32,
            },
        )
        .with_columns(
            duration_per_item_ns=(
                pl.col("duration_s") * 1_000_000_000 / pl.col("num_items")
            )
        )
        .filter(
            pl.col("algo_name").is_in(
                algo_names.keys(),
            ),
        )
        .filter(
            pl.col("num_items") == 1_000_000,
        )
        .with_columns(
            Algorithm=pl.col("algo_name").map_elements(
                algo_names.__getitem__,
                return_dtype=str,
            ),
            item_bits=pl.col("data_type").map_elements(
                {"bit": 1, "byte": 8, "word": 16, "double word": 32}.get,
                return_dtype=pl.Int32,
            ),
            tilted_memory_bits=pl.col("memory_bytes")
            .min()
            .over(
                pl.col("num_sites"),
                pl.col("data_type"),
                pl.col("platform"),
            )
            * 8,
        )
        .with_columns(pl.col("data_type").alias("data type"))
    )

    return (
        df_perf.group_by(
            [
                "data_type",
                "item_bits",
                "num_sites",
                "Algorithm",
                "tilted_memory_bits",
                "platform",
            ],
        )
        .agg(
            [
                pl.col("duration_s").mean().alias("duration_s"),
                pl.col("duration_per_item_ns").mean().alias(
                    "duration_per_item_ns"
                ),
            ]
        )
        .unpivot(
            index=[
                "data_type",
                "item_bits",
                "num_sites",
                "Algorithm",
                "tilted_memory_bits",
                "platform",
            ],
            on=["duration_s"],
            variable_name="platformless metric",
            value_name="value",
        )
        .pivot(
            values="value",
            index=[
                "data_type",
                "item_bits",
                "num_sites",
                "platform",
                "platformless metric",
                "tilted_memory_bits",
            ],
            on="Algorithm",
        )
        .with_columns(
            metric=pl.format(
                "{} ({})",
                pl.col("platformless metric"),
                pl.col("platform"),
            ),
        )
        .with_columns(tilted_memory_bytes=pl.col("tilted_memory_bits") // 8)
        .with_columns(
            (
                100
                * (
                    pl.col("saturating bucket").log()
                    - pl.col("extended ringbuf").log()
                )
            ).alias("sympct"),
        )
        .with_columns(
            (
                100
                * (pl.col("saturating bucket") - pl.col("extended ringbuf"))
                / (pl.col("extended ringbuf") + pl.col("saturating bucket"))
            )
            .replace(np.inf, 100)
            .replace(np.nan, 100)
            .alias("symetricized"),
        )
        .with_columns(
            (
                100
                * (pl.col("saturating bucket") - pl.col("extended ringbuf"))
                / pl.col("extended ringbuf").abs()
            )
            .replace(np.nan, np.inf)
            .alias("percent"),
        )
    )


## Plotting


In [None]:
metrics = {
    "Gap Size Cost": "Gap Size Ratio (max)",
    "Gap Size Cost Mean": "Gap Size Ratio (mean)",
    "blank1": "separator1",
    "duration_s (x86 Intel Core Ultra 7 155U)": "Performance (x86 workstation)",
    "duration_s (ARM Apple M1)": "Performance (ARM workstation)",
    "duration_s (ARM Cortex-M0+)": "Performance (ARM embedded device)",
}
dark_palette = [
    # "#e31a1c",
    "#b15928",
    "#ff7f00",
    "#000000",
    "#0465e4",
    "#33a02c",
    "#66C2A6",
]
light_palette = [
    # "#fb9a99",
    "#c06e42",
    "#ffaf46",
    "#000000",
    "#76c0e7",
    "#94dd54",
    "#CCEBC7",
]


for rc, lut in it.product([{}, {"font.family": "serif"}], [True, False]):
    with mpl.rc_context(
        {
            "hatch.color": "#FFF7F7",
            "hatch.linewidth": 2.0,
            **rc,
        },
    ):
        perf_long_df = get_df_perf_long(
            ["dstream.tilted_algo", "dstream_tilted_algo"][lut]
        )
        long_df = pl.concat(
            [
                qos_long_df,
                perf_long_df,
            ],
            how="diagonal",
        ).with_columns(
            pl.col("metric").replace_strict(metrics),
            advantage=pl.col("symetricized") / 100,
        )
        col_order = ["bit", "byte", "word", "double word"]
        hue_order = [*metrics.values()]
        row_order = sorted(long_df["num_sites"].unique().to_list())
        kwargs = dict(
            x="advantage",
            y="tilted_memory_bytes",
            hue="metric",
            hue_order=hue_order,
            gap=0.1,
            orient="h",
        )

        with tp.teed(
            sns.catplot,
            data=long_df.with_columns(
                pl.col("advantage").clip(lower_bound=0),
            ).to_pandas(),
            col="data_type",
            row="num_sites",
            col_order=col_order,
            row_order=row_order,
            **kwargs,
            aspect=3.0,
            height=1.05,
            kind="bar",
            margin_titles=True,
            palette=dark_palette,
            sharex=True,
            sharey=False,
            teeplot_outattrs={
                "lut": lut,
                "qosnumitems": qos_num_items,
                **rc,
            },
            teeplot_subdir=teeplot_subdir,
        ) as g:
            for (row, col), ax in zip(
                it.product(row_order, col_order),
                g.axes.flat,
            ):
                ax.set_ylim(
                    ax.get_ylim()[0] * 1.25, ax.get_ylim()[1] * 0.6
                )
                for hue, bar in zip(
                    [h for h in hue_order if h != "separator1"],
                    ax.patches,
                ):
                    hue_df = long_df.filter(
                        pl.col("data_type") == col,
                        pl.col("num_sites") == row,
                        pl.col("metric") == hue,
                    )
                    if hue_df.is_empty():
                        continue
                    label_val = hue_df["percent"].item() / 100

                    xpos = min(hue_df["advantage"].item(), 0.0)
                    ax.text(
                        x=xpos,
                        y=(
                            bar.get_y()
                            + bar.get_height() / 2
                        ),
                        s=(
                            f" {label_val} "
                            if not np.isfinite(label_val)
                            else fr"{label_val:.1f}× "
                            if label_val >= 1
                            else fr" {label_val * 100:.0f}% "
                        ),
                        ha="right",
                        va="center",
                        color=["indianred", "#131313"][label_val >= 0],
                        style=["italic", "normal"][
                            label_val >= 0
                            and bool(np.isfinite(label_val))
                        ],
                        weight=["light", "bold"][label_val >= 0],
                        fontsize=8,
                    )

                sns.barplot(
                    data=long_df.with_columns(
                        pl.col("advantage").clip(upper_bound=0),
                    ).filter(
                        pl.col("data_type") == col,
                        pl.col("num_sites") == row,
                    ).to_pandas(),
                    ax=ax,
                    legend=False,
                    **kwargs,
                    palette=light_palette,
                )

            g.set(
                xlim=(-1, 1),
                xlabel="Ringbuf Advantage",
                ylabel="Mem Use",
            )
            g.set_titles(
                col_template="{col_name} dtype",
                row_template="Store Cap\n{row_name} items",
            )

            for ax in g.axes.flat:
                ax.axvline(
                    x=0.0,
                    color="black",
                    linestyle="--",
                    linewidth=1,
                )
                ax.axhline(
                    y=np.average(ax.get_ylim(), weights=[2.23, 3]),
                    color="white",
                    linestyle="-",
                    linewidth=4,
                )
                with warnings.catch_warnings():
                    warnings.simplefilter("ignore", UserWarning)
                    ax.set_yticklabels(
                        [
                            f"{lbl.get_text()} bytes"
                            for lbl in ax.get_yticklabels()
                        ],
                    )
                for lbl in ax.get_yticklabels():
                    lbl.set_rotation(90)
                    lbl.set_ha("right")
                    lbl.set_va("center")

            for ax in g.axes[:, 1:].flat:
                ax.set_ylabel(None)

            for ax in g.axes.flat:
                ax.spines["right"].set_visible(True)
                ax.spines["bottom"].set_visible(False)
                ax.fill_between(
                    x=[0, 1],
                    y1=ax.get_ylim()[0],
                    y2=ax.get_ylim()[1],
                    alpha=0.02,
                    facecolor="limegreen",
                    zorder=-10,
                )
                ax.fill_between(
                    x=[-1, 0],
                    y1=ax.get_ylim()[0],
                    y2=ax.get_ylim()[1],
                    alpha=0.02,
                    facecolor="darkred",
                    hatch=r"//",
                    zorder=-10,
                )

            g.tight_layout()
            g.fig.subplots_adjust(hspace=0.15, wspace=0.3)
