In [None]:
!uv pip install polars
import polars as pl
import plotly.express as px

In [None]:
ANONYMIZE = True
ANONYMIZE_SEED = 1
DROP_THE_WORST = True
AGGREGATE_RUNS = True
EXPORT_JSON = True
EXPORT_PATH = "./topk-bench-results"

# List of files (produced by `tb.write_metrics()`) for `mode=ingest`
INGEST_FILES = []
# List of files (produced by `tb.write_metrics()`) for `mode=qps`, `mode=filter`, `mode=rw`
QUERY_FILES = []

ingest_df = pl.concat([pl.read_parquet(file) for file in INGEST_FILES]) 
ingest_df

In [None]:
import random
import builtins

def fmt_int_filter(v):
    if not v:
        return ""
    return f"int {builtins.int(v) // 100}%"

def fmt_keyword_filter(v):
    if not v:
        return ""
    return f"keyword {builtins.int(v.lstrip('0')) // 100}%"

def fmt_selectivity(row):
    keyword_filter = row["keyword_filter"]
    int_filter = row["int_filter"]

    if keyword_filter == '' and int_filter == '':
        return 'unfiltered'
    
    if keyword_filter == '':
        return int_filter
    
    if int_filter == '':
        return keyword_filter

    return f"{int_filter} {keyword_filter}"

def filter_by_type(filter_type, unfiltered=True):
    cond = None

    if unfiltered:
        cond = (pl.col("keyword_filter") == "") & (pl.col("int_filter") == "")

    if filter_type == "keyword_filter":
        cond = cond | (pl.col("keyword_filter") != "")
    elif filter_type == "int_filter":
        cond = cond | (pl.col("int_filter") != "")

    return cond

def anonymize_providers(df):
    providers = set(df['provider'].unique().sort().to_list()) - {"topk"}

    anon_providers = [f"Provider {chr(i)}" for i in range(ord('A'), ord('A') + len(providers))]
    rng = random.Random(ANONYMIZE_SEED)
    rng.shuffle(anon_providers)

    mapping = dict(zip(providers, anon_providers))
    mapping = { "topk": "TopK", **mapping }

    return df.with_columns(pl.col("provider").replace(mapping).alias("provider"))


def drop_worst(df, group_by, ascending=False):
    agg_group_by = [col for col in group_by if col != "run_id"]
    df_sorted = df.sort(by=agg_group_by + ["value"], descending=not ascending)
    df_sorted = df_sorted.with_row_index("row_idx")
    max_idx_per_group = (
        df_sorted.group_by(agg_group_by)
        .agg([pl.col("row_idx").max().alias("max_row_idx")])
    )
    df_sorted = df_sorted.join(max_idx_per_group, on=agg_group_by)
    df_result = df_sorted.filter(pl.col("row_idx") != pl.col("max_row_idx")).drop(["row_idx", "max_row_idx"])
    return df_result


def aggregate_runs(df, group_by):
    return df.group_by([col for col in group_by if col != "run_id"]).agg([
        pl.col("value").mean().alias("value"),
        pl.col("value").min().alias("value_min"),
        pl.col("value").max().alias("value_max"),
    ])

def process_df(df, group_by, asc):
    if DROP_THE_WORST:
        df = drop_worst(df, group_by, asc)

    if AGGREGATE_RUNS:
        df = aggregate_runs(df, group_by)

    if ANONYMIZE:
        df = anonymize_providers(df)

    return df.sort("provider")

def write_df(df, filename):
    filename = filename.replace("/", "_").replace(" ", "_").replace(":", "_")
    path = f"{EXPORT_PATH}/{filename}"
    df.write_json(path)


In [None]:
def plot_bars(df, **kwargs):
    labels = kwargs.pop('labels', {})   
    y = kwargs.pop('y', "value")
    
    if ANONYMIZE:
        provider_order = ["TopK", "Provider A", "Provider B", "Provider C", "Provider D"]
    else:
        providers = df["provider"].unique().sort().to_list()
        if "topk" in providers:
            providers.remove("topk")
            providers = ["topk"] + providers
            
        provider_order = providers

    fig = px.bar(
        df,
        barmode="group",
        labels={
            "provider": "Provider",
            **labels,
        },
        category_orders={
            "size": ["100k", "1m", "10m"],
            "concurrency": [1, 2, 4, 8],
            "provider": provider_order,
            "keyword_filter": ["keyword 100%", "keyword 10%", "keyword 1%"],
            "int_filter": ["int 100%", "int 10%", "int 1%"],
            "selectivity": ["unfiltered", "int 100%", "int 10%", "int 1%", "keyword 100%", "keyword 10%", "keyword 1%"],
            "read_write": ["false", "true"],
        },
        y=y,
        **kwargs,
    )
    return fig

def plot_latency(df, x, color, title):
    group_by = set(["provider", x, color, "size", "run_id"])

    latency_df = (
        df
        .filter(pl.col("metric") == "bench.query.latency_ms")
        .group_by(group_by)
        .agg([
            pl.col("value").quantile(0.99).alias("value")
        ])
    )
    latency_df = process_df(latency_df, group_by, asc=True)

    fig = plot_bars(
        latency_df,
        x=x,
        color=color,
        facet_col="size",
        title=f"{title}: p99 latency",
    )
    fig.show()

    if EXPORT_JSON:
        write_df(latency_df, f"{title}_latency.json")

def plot_qps(df, x, color, title):
    group_by = set(["provider", x, color, "size", "run_id"])
    qps_df = (
        df
        .filter(pl.col("metric") == "bench.query.latency_ms") 
        .group_by(group_by)
        .agg([
            pl.col("ts").min().alias("start"),
            pl.col("ts").max().alias("end"),
            pl.len().alias("n_requests"),
        ])
        .with_columns(
            (pl.col("n_requests") / ((pl.col("end") - pl.col("start")).dt.total_seconds())).alias("value")
        )
    )
    qps_df = process_df(qps_df, group_by, asc=False)
    
    fig = plot_bars(
        qps_df,
        x=x,
        color=color,
        facet_col="size",
        title=f"{title}: QPS",
    )
    fig.show()

    if EXPORT_JSON:
        write_df(qps_df, f"{title}_qps.json")

def plot_recall(df, x, color, title):
    group_by = set(["provider", x, color, "size", "run_id"])    
    recall_df = (
        df
        .filter(pl.col("metric") == "bench.query.recall")
        .group_by(group_by)
        .agg([
            pl.col("value").mean().alias("value"),
        ])
    )
    recall_df = process_df(recall_df, group_by, asc=False)

    fig = plot_bars(
        recall_df,
        x=x,
        color=color,
        facet_col="size",
        title=f"{title}: recall (avg)",
    )
    fig.update_yaxes(range=[
        recall_df['value'].min() - 0.01,
        recall_df['value'].max() + 0.01,
    ])
    fig.show()

    if EXPORT_JSON:
        write_df(recall_df, f"{title}_recall.json")


In [None]:
df = pl.concat([pl.read_parquet(p) for p in QUERY_FILES]) 
df = df.filter(pl.col('warmup') == 'false') # exclude warmups
df = df.with_columns(
    pl.col("int_filter").map_elements(fmt_int_filter, return_dtype=pl.String).alias("int_filter"),
    pl.col("keyword_filter").map_elements(fmt_keyword_filter, return_dtype=pl.String).alias("keyword_filter"),
)

In [None]:
latency_df = (
    ingest_df
    .filter(pl.col("metric") == "bench.ingest.latency_ms")
    .group_by(["provider", "size", "run_id"])
    .agg([
        pl.col("ts").min().alias("start_ts"),
        pl.col("ts").max().alias("end_ts"),
    ])
    .with_columns([
        (pl.col("end_ts") - pl.col("start_ts")).dt.total_seconds(fractional=True).alias("value")
    ])
)
latency_df = anonymize_providers(latency_df)

fig = px.bar(
    latency_df.sort("value"),
    x="value",
    y="provider",
    color="provider",
    facet_col="size",
    title="Total Ingest Latency (s)",
    category_orders={
        "size": ["100k", "1m", "10m"],
    },
)
fig.update_xaxes(matches=None)
fig.show()

write_df(latency_df, "ingest_latency.json")

In [None]:
throughput_df = (
    ingest_df
    .filter(pl.col("metric") == "bench.ingest.upserted_bytes")
    .group_by(["provider", "size", "run_id"])
    .agg([
        pl.col("value").sum().alias("total_bytes"),
        pl.col("ts").min().alias("start_ts"),
        pl.col("ts").max().alias("end_ts"),
    ])
    .with_columns([
        (pl.col("total_bytes") / (pl.col("end_ts") - pl.col("start_ts")).dt.total_seconds(fractional=True)).alias("value")
    ])
)
throughput_df = anonymize_providers(throughput_df)

fig = px.bar(
    throughput_df.sort("value", descending=True),
    y="provider",
    x="value",
    color="provider",
    facet_col="size",
    title="Ingest Throughput (bytes/sec)",
    labels={"value": "Avg Throughput (bytes/sec)"},
    category_orders={
        "size": ["100k", "1m", "10m"],
    },
)
fig.show()

write_df(throughput_df, "ingest_throughput.json")

In [None]:
freshness_df = (
    ingest_df
    .filter(pl.col("metric") == "bench.ingest.freshness_latency_ms")
    .sort("ts")
    .group_by(["provider", "size", "run_id"])
    .agg([
        pl.col("value").quantile(0.99).alias("p99"),
        pl.col("value").quantile(0.90).alias("p90"),
        pl.col("value").quantile(0.50).alias("p50"),
    ])
    .unpivot(
        index=["provider", "size", "run_id"],
        on=["p99", "p90", "p50"],
        variable_name="quantile",
        value_name="value"
    )
)
freshness_df = anonymize_providers(freshness_df)

fig = plot_bars(
    freshness_df,
    x="provider",
    color="quantile",
    facet_col="size",
    title="Freshness Quantiles (p99, p90, p50)",
)
fig.show()

write_df(freshness_df, "ingest_freshness.json")

In [None]:
concurrency_df = df.filter(pl.col("mode") == "qps")

concurrency_latency_df = plot_latency(concurrency_df, x="concurrency", color="provider", title="concurrency")
concurrency_qps_df = plot_qps(concurrency_df, x="concurrency", color="provider", title="concurrency")

In [None]:
overhead_df = (
    df
    .filter(pl.col("mode") == "filter")
    .filter(
        ((pl.col("keyword_filter") == "") & (pl.col("int_filter") == ""))
        | ((pl.col("keyword_filter") == "keyword 100%") & (pl.col("int_filter") == ""))
        | ((pl.col("keyword_filter") == "") & (pl.col("int_filter") == "int 100%"))
    )
    .with_columns(
        pl.struct([pl.col("keyword_filter"), pl.col("int_filter")])
        .map_elements(fmt_selectivity, return_dtype=pl.String)
        .alias("selectivity")
    )
)

plot_latency(overhead_df, x="provider", color="selectivity", title="filtering overhead")
plot_qps(overhead_df, x="provider", color="selectivity", title="filtering overhead")

In [None]:
base_df = (
    df
    .filter(pl.col("mode") == "filter")
    .filter(filter_by_type("int_filter"))
)

plot_latency(base_df, x="provider", color="int_filter", title="int_filter")
plot_qps(base_df, x="provider", color="int_filter", title="int_filter")
plot_recall(base_df, x="provider", color="int_filter", title="int_filter")

In [None]:
base_df = (
    df
    .filter(pl.col("mode") == "filter")
    .filter(filter_by_type("keyword_filter"))
)

plot_latency(base_df, x="provider", color="keyword_filter", title="keyword_filter")
plot_qps(base_df, x="provider", color="keyword_filter", title="keyword_filter")
plot_recall(base_df, x="provider", color="keyword_filter", title="keyword_filter")

In [None]:
rw_df = df.filter(pl.col("mode") == "rw")

plot_latency(rw_df, x="provider", color="read_write", title="read/write")
plot_qps(rw_df, x="provider", color="read_write", title="read/write")