In [None]:
import ibis
import gcsfs
import ibis.selectors as s
import plotly.express as px

ibis.options.interactive = True
ibis.options.repr.interactive.max_rows = 40
ibis.options.repr.interactive.max_length = 22
ibis.options.repr.interactive.max_columns = None

px.defaults.template = "plotly_dark"

In [None]:
# YOLO
import warnings

warnings.filterwarnings("ignore")

In [None]:
BUCKET = "ibis-bench"

fs = gcsfs.GCSFileSystem()
fs.ls(f"{BUCKET}/bench_logs_v2/cache")[-5:]

In [None]:
con = ibis.connect("duckdb://")
con.register_filesystem(fs)
con

In [None]:
t = (
    con.read_parquet(f"gs://{BUCKET}/bench_logs_v2/cache/file_id=*.parquet")
    .mutate(
        timestamp=ibis._["timestamp"].cast("timestamp"),
    )
    .cache()
)
t.head()

In [None]:
sfs = sorted(t.distinct(on="sf")["sf"].to_pyarrow().to_pylist())
sfs

In [None]:
systems = sorted(t.distinct(on="system")["system"].to_pyarrow().to_pylist())
systems

In [None]:
instance_types = sorted(
    t.distinct(on="instance_type")["instance_type"].to_pyarrow().to_pylist(),
    key=lambda x: (x.split("-")[0], int(x.split("-")[-1])) if "-" in x else (x, 0),
)
instance_types

In [None]:
query_numbers = sorted(
    t.distinct(on="query_number")["query_number"].to_pyarrow().to_pylist()
)
query_numbers

In [None]:
t.filter(t["query_number"] == t["query_number"].min()).group_by(
    "instance_type", "system", "sf"
).agg(count=ibis._.count()).order_by("instance_type", "system", "sf").head(12)

In [None]:
total_runs_theoretical = (
    len(sfs) * len(systems) * len(instance_types) * len(query_numbers) * 3
)
f"total runs (theoritical): {total_runs_theoretical:,}"

In [None]:
total_runs_actual = t.count().to_pyarrow().as_py()
f"total runs (actual): {total_runs_actual:,}"

In [None]:
f"missing runs: {total_runs_theoretical - total_runs_actual:,}"

In [None]:
t.select("instance_type", "system", "sf", "n_partitions").value_counts().order_by(
    s.across(s.contains("count"), ibis._.desc())
)

In [None]:
a = t.group_by("system").agg(
    present_queries=ibis._["query_number"].collect().unique().sort()
)
a = (
    a.mutate(
        failing_queries=t.distinct(on="query_number")["query_number"]
        .collect()
        .filter(lambda x: ~a["present_queries"].contains(x))
    )
    .mutate(num_failing_queries=ibis._["failing_queries"].length())
    .drop("present_queries")
    .order_by("num_failing_queries", "system")
)
a

In [None]:
b = (
    t.filter(t["instance_type"].endswith("-2"))
    .filter(t["sf"] == 128)
    # .filter(t["system"].contains("duckdb"))
    .mutate(
        run_number=ibis.row_number().over(
            ibis.window(
                group_by=("instance_type", "system", "sf", "query_number"),
                order_by=ibis.asc("timestamp"),
            )
        ),
    )
    .group_by("instance_type", "system", "sf", "run_number")
    .agg(
        completed_queries=t["execution_seconds"].count(),
        total_time=t["execution_seconds"].sum(),
        present_queries=ibis._["query_number"].collect().unique().sort(),
    )
    .order_by(ibis.desc("completed_queries"), ibis.asc("total_time"))
)
b = (
    b.mutate(
        failing_queries=t.distinct(on="query_number")["query_number"]
        .collect()
        .filter(lambda x: ~b["present_queries"].contains(x))
    )
    .drop("present_queries")
    .relocate("instance_type", "system", "sf", "run_number", "failing_queries")
)
b

In [None]:
agg = (
    t.filter(t["sf"] >= 1)
    # .filter((t["system"].contains("duckdb")) | (t["system"].contains("datafusion")))
    # .filter(t["query_number"] == 1)
    .group_by("instance_type", "system", "sf", "n_partitions", "query_number")
    .agg(
        mean_execution_seconds=t["execution_seconds"].mean(),
    )
    .order_by(
        ibis.asc("instance_type"),
        ibis.desc("sf"),
        ibis.asc("n_partitions"),
        ibis.asc("query_number"),
        ibis.desc("system"),
        ibis.asc("mean_execution_seconds"),
    )
)
agg

In [None]:
category_orders = {
    "query_number": sorted(
        agg.select("query_number").distinct().to_pandas()["query_number"].tolist()
    ),
    "system": sorted(agg.select("system").distinct().to_pandas()["system"].tolist()),
    "instance_type": sorted(
        agg.select("instance_type").distinct().to_pandas()["instance_type"].tolist(),
        key=lambda x: (x.split("-")[0], int(x.split("-")[-1])) if "-" in x else (x, 0),
    ),
}

for sf in sorted(sfs):
    c = px.bar(
        agg.filter(agg["sf"] == sf).filter(agg["instance_type"].startswith("n2d")),
        x="query_number",
        y="mean_execution_seconds",
        color="system",
        barmode="group",
        pattern_shape="instance_type",
        category_orders=category_orders,
        title=f"scale factor: {sf} (~{sf} GB of data in memory; ~{sf*2//5}GB on disk in Parquet)",
    )
    c.show()

In [None]:
(
    agg.filter(agg["sf"] == 128)
    .filter(agg["instance_type"] == "n2d-standard-2")
    .group_by("instance_type", "system")
    .agg(
        total_execution_seconds=ibis._["mean_execution_seconds"].sum(),
        completed_queries=ibis._["mean_execution_seconds"].count(),
    )
    .order_by(ibis.desc("completed_queries"), ibis.asc("total_execution_seconds"))
)

In [None]:
(
    agg.filter(agg["sf"] == 128)
    .filter(agg["instance_type"] == "n2d-standard-32")
    .group_by("instance_type", "system")
    .agg(
        total_execution_seconds=ibis._["mean_execution_seconds"].sum(),
        completed_queries=ibis._["mean_execution_seconds"].count(),
    )
    .order_by(ibis.desc("completed_queries"), ibis.asc("total_execution_seconds"))
)

In [None]:
d = (
    agg.filter(agg["sf"] == 128)
    .filter(agg["instance_type"] == "n2d-standard-32")
    .group_by("instance_type", "system", "query_number")
    .agg(
        total_execution_seconds=ibis._["mean_execution_seconds"].sum(),
        completed_queries=ibis._["mean_execution_seconds"].count(),
    )
)
d

In [None]:
delta = 0.000001  # floats!
best_performance_system = d.filter(
    (d["total_execution_seconds"] >= d["total_execution_seconds"].min() - delta,)
    & (d["total_execution_seconds"] <= d["total_execution_seconds"].min() + delta)
)["system"]
e = (
    d.mutate(
        relative_performance_slowdown=(
            (ibis._["total_execution_seconds"] - d["total_execution_seconds"].min())
            / d["total_execution_seconds"].min()
        ),
        relative_queries=(ibis._["completed_queries"] - d["completed_queries"].max()),
    )
).order_by(ibis.desc("completed_queries"), ibis.asc("total_execution_seconds"))
e

In [None]:
best_performance_system.to_pyarrow().to_pylist()[0]

In [None]:
c = px.bar(
    e,
    x="query_number",
    y="relative_performance_slowdown",
    color="system",
    barmode="group",
    pattern_shape="system",
    category_orders=category_orders,
    title="relative performance slowdown (lower is better)",
)
c

In [None]:
(
    t.filter(t["sf"] >= 1)
    .filter(t["instance_type"] == "n2-standard-2")
    .agg(
        total_execution_seconds=t["execution_seconds"].sum(),
        total_execution_minutes=t["execution_seconds"].sum() / 60,
        total_execution_hours=t["execution_seconds"].sum() / 3600,
        total_queries_executed=ibis._.count(),
        total_queries_theoretical=22 * 6 * 6 * 3,
    )
)