In [2]:
%pip install supabase psycopg

Collecting supabase
  Downloading supabase-2.27.3-py3-none-any.whl.metadata (4.6 kB)
Collecting psycopg
  Downloading psycopg-3.3.2-py3-none-any.whl.metadata (4.3 kB)
Collecting realtime==2.27.3 (from supabase)
  Downloading realtime-2.27.3-py3-none-any.whl.metadata (7.0 kB)
Collecting supabase-functions==2.27.3 (from supabase)
  Downloading supabase_functions-2.27.3-py3-none-any.whl.metadata (2.4 kB)
Collecting storage3==2.27.3 (from supabase)
  Downloading storage3-2.27.3-py3-none-any.whl.metadata (2.1 kB)
Collecting supabase-auth==2.27.3 (from supabase)
  Downloading supabase_auth-2.27.3-py3-none-any.whl.metadata (6.4 kB)
Collecting postgrest==2.27.3 (from supabase)
  Downloading postgrest-2.27.3-py3-none-any.whl.metadata (3.4 kB)
Collecting httpx<0.29,>=0.26 (from supabase)
  Downloading httpx-0.28.1-py3-none-any.whl.metadata (7.1 kB)
Collecting yarl>=1.22.0 (from supabase)
  Downloading yarl-1.22.0-cp312-cp312-macosx_11_0_arm64.whl.metadata (75 kB)
Collecting deprecation>=2.1.0 (f

In [3]:
from supabase import create_client

import os

SUPABASE_URL = "https://zzojrniuppueizhnmqfd.supabase.co"
SUPABASE_KEY = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJzdXBhYmFzZSIsInJlZiI6Inp6b2pybml1cHB1ZWl6aG5tcWZkIiwicm9sZSI6ImFub24iLCJpYXQiOjE3NjU1MTg3NDYsImV4cCI6MjA4MTA5NDc0Nn0.ctuOSKEpdyPF9OaTsU4OC49VjYRxG22mC7A03LcH6h0"

supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

In [5]:
import time

sql = """
"""

for _ in range(5):
    st = time.time()
    result = supabase.rpc(
        "set_timeout_and_execute_raw_sql",
        {
            "sql_query": sql,
            "page_idx": 0,
            "limit_num": 50,
            "offset_num": 0,
        },
    ).execute()
    print(time.time() - st, " seconds")

APIError: {'message': 'syntax error at or near "LIMIT"', 'code': '42601', 'hint': None, 'details': None}

In [None]:
from __future__ import annotations

import json
import os
import statistics
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Tuple, Optional

import psycopg

# Adjust these for your workload
DATABASE_URL = "postgresql://postgres:harperdataset!@db.zzojrniuppueizhnmqfd.supabase.co:5432/postgres"
TSQUERY = "engineer | researcher | developder | software | scientist | system <-> engineer | computer"
LANG = os.environ.get("TSLANG", "english")

LIMIT = int(os.environ.get("LIMIT", "100"))
WARMUP = int(os.environ.get("WARMUP", "3"))
RUNS = int(os.environ.get("RUNS", "20"))

# Optional: make sure no disk spill during sort affects results unpredictably
WORK_MEM_MB = int(os.environ.get("WORK_MEM_MB", "128"))

# Optional: avoid long hangs
STATEMENT_TIMEOUT_MS = int(os.environ.get("STATEMENT_TIMEOUT_MS", "180000"))  # 3 min


# -----------------------------
# Queries (same WHERE)
# -----------------------------
# Using a CTE to ensure the tsquery is built once per query execution.
Q_ORDER_BY_ID = f"""
WITH q AS (
  SELECT to_tsquery(%s, %s) AS tsq
)
SELECT c.id
FROM candid c
CROSS JOIN q
WHERE c.fts @@ q.tsq
ORDER BY c.id
LIMIT {LIMIT};
"""

Q_ORDER_BY_RANK = f"""
WITH q AS (
  SELECT to_tsquery(%s, %s) AS tsq
)
SELECT c.id
FROM candid c
CROSS JOIN q
WHERE c.fts @@ q.tsq
ORDER BY ts_rank(c.fts, q.tsq) DESC
LIMIT {LIMIT};
"""

# EXPLAIN versions (FORMAT JSON is easiest to parse)
EXPLAIN_PREFIX = "EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) "
X_ORDER_BY_ID = EXPLAIN_PREFIX + Q_ORDER_BY_ID
X_ORDER_BY_RANK = EXPLAIN_PREFIX + Q_ORDER_BY_RANK


# -----------------------------
# Helpers
# -----------------------------
@dataclass
class RunResult:
    client_ms: float
    server_exec_ms: Optional[float]
    planning_ms: Optional[float]
    execution_ms: Optional[float]
    rows: Optional[int]
    sort_method: Optional[str]
    sort_space_kb: Optional[int]
    buffers_shared_hit: Optional[int]
    buffers_shared_read: Optional[int]


def pct(values: List[float], p: float) -> float:
    """Percentile with linear interpolation."""
    if not values:
        return float("nan")
    xs = sorted(values)
    if len(xs) == 1:
        return xs[0]
    k = (len(xs) - 1) * (p / 100.0)
    f = int(k)
    c = min(f + 1, len(xs) - 1)
    if f == c:
        return xs[f]
    return xs[f] + (xs[c] - xs[f]) * (k - f)


def safe_get(d: Dict[str, Any], path: List[str]) -> Optional[Any]:
    cur: Any = d
    for key in path:
        if not isinstance(cur, dict) or key not in cur:
            return None
        cur = cur[key]
    return cur


def find_first_plan_node(plan: Dict[str, Any], node_type: str) -> Optional[Dict[str, Any]]:
    """DFS to find first node whose 'Node Type' matches."""
    stack = [plan]
    while stack:
        n = stack.pop()
        if isinstance(n, dict) and n.get("Node Type") == node_type:
            return n
        if isinstance(n, dict) and "Plans" in n and isinstance(n["Plans"], list):
            stack.extend(n["Plans"])
    return None


def extract_explain_metrics(explain_json: Any) -> Dict[str, Any]:
    """
    explain_json comes as:
      [ { "Plan": {...}, "Planning Time": x, "Execution Time": y, ... } ]
    """
    top = explain_json[0] if isinstance(explain_json, list) and explain_json else {}
    plan = top.get("Plan", {}) if isinstance(top, dict) else {}

    planning_ms = top.get("Planning Time")
    execution_ms = top.get("Execution Time")
    server_exec_ms = (planning_ms + execution_ms) if (planning_ms is not None and execution_ms is not None) else None

    # Rows actually returned by the top plan node
    rows = plan.get("Actual Rows")

    # Sort info (if Sort node exists)
    sort_node = find_first_plan_node(plan, "Sort")
    sort_method = sort_node.get("Sort Method") if sort_node else None
    sort_space_kb = sort_node.get("Sort Space Used") if sort_node else None  # often in kB

    # Buffers: summarize shared hit/read at the top plan node (good enough for comparison)
    buf_hit = safe_get(plan, ["Shared Hit Blocks"])
    buf_read = safe_get(plan, ["Shared Read Blocks"])

    return {
        "planning_ms": planning_ms,
        "execution_ms": execution_ms,
        "server_exec_ms": server_exec_ms,
        "rows": rows,
        "sort_method": sort_method,
        "sort_space_kb": sort_space_kb,
        "buffers_shared_hit": buf_hit,
        "buffers_shared_read": buf_read,
        "plan_root": plan,
    }


def run_explain(conn: psycopg.Connection, sql: str, params: Tuple[Any, ...]) -> Tuple[RunResult, Dict[str, Any]]:
    """
    Executes EXPLAIN (ANALYZE...) and returns:
      - measured client latency
      - parsed EXPLAIN metrics
    """
    t0 = time.perf_counter()
    with conn.cursor() as cur:
        cur.execute(sql, params)
        row = cur.fetchone()
    t1 = time.perf_counter()

    # EXPLAIN FORMAT JSON returns one column containing JSON.
    explain_json = row[0]
    metrics = extract_explain_metrics(explain_json)

    rr = RunResult(
        client_ms=(t1 - t0) * 1000.0,
        server_exec_ms=metrics["server_exec_ms"],
        planning_ms=metrics["planning_ms"],
        execution_ms=metrics["execution_ms"],
        rows=metrics["rows"],
        sort_method=metrics["sort_method"],
        sort_space_kb=metrics["sort_space_kb"],
        buffers_shared_hit=metrics["buffers_shared_hit"],
        buffers_shared_read=metrics["buffers_shared_read"],
    )
    return rr, metrics


def summarize(name: str, results: List[RunResult]) -> None:
    client = [r.client_ms for r in results]
    server = [r.server_exec_ms for r in results if r.server_exec_ms is not None]

    def line(label: str, xs: List[float]) -> str:
        return (
            f"{label:>10}  "
            f"median={statistics.median(xs):8.2f} ms  "
            f"p95={pct(xs, 95):8.2f} ms  "
            f"mean={statistics.mean(xs):8.2f} ms  "
            f"min={min(xs):8.2f} ms  "
            f"max={max(xs):8.2f} ms"
        )

    print(f"\n=== {name} ===")
    print(line("client", client))
    if server:
        print(line("server", server))

    # Some helpful “what happened” signals
    rows = [r.rows for r in results if r.rows is not None]
    if rows:
        print(f"{'rows':>10}  median={statistics.median(rows)}  min={min(rows)}  max={max(rows)}")

    sorts = [r.sort_method for r in results if r.sort_method]
    if sorts:
        # show most common sort method
        common = max(set(sorts), key=sorts.count)
        print(f"{'sort':>10}  most_common={common}  samples={len(sorts)}/{len(results)}")

    reads = [r.buffers_shared_read for r in results if r.buffers_shared_read is not None]
    hits = [r.buffers_shared_hit for r in results if r.buffers_shared_hit is not None]
    if reads and hits:
        print(f"{'buffers':>10}  shared_read(median)={int(statistics.median(reads))}  shared_hit(median)={int(statistics.median(hits))}")


def main() -> None:
    params = (LANG, TSQUERY)

    # Note: autocommit helps keep timings cleaner (no implicit transaction overhead)
    with psycopg.connect(DATABASE_URL, autocommit=True) as conn:
        with conn.cursor() as cur:
            # Make the runs more comparable
            cur.execute(f"SET statement_timeout = {STATEMENT_TIMEOUT_MS};")
            cur.execute(f"SET work_mem = '{WORK_MEM_MB}MB';")
            # Keep these stable; you can comment them out if you prefer defaults
            cur.execute("SET jit = off;")  # jit can add variance for small queries
            cur.execute("SET enable_partitionwise_aggregate = on;")

        print("Connected.")
        print(f"TSLANG={LANG}")
        print(f"TSQUERY={TSQUERY}")
        print(f"LIMIT={LIMIT}  WARMUP={WARMUP}  RUNS={RUNS}")
        print(f"work_mem={WORK_MEM_MB}MB  statement_timeout={STATEMENT_TIMEOUT_MS}ms")

        # Warmup both variants (caches + plan)
        print("\nWarming up...")
        for _ in range(WARMUP):
            run_explain(conn, X_ORDER_BY_ID, params)
            run_explain(conn, X_ORDER_BY_RANK, params)

        # Actual benchmark (interleave to reduce drift effects)
        id_results: List[RunResult] = []
        rank_results: List[RunResult] = []
        last_id_plan: Optional[Dict[str, Any]] = None
        last_rank_plan: Optional[Dict[str, Any]] = None

        print("\nRunning benchmark...")
        for i in range(RUNS):
            r1, m1 = run_explain(conn, X_ORDER_BY_ID, params)
            id_results.append(r1)
            last_id_plan = m1

            r2, m2 = run_explain(conn, X_ORDER_BY_RANK, params)
            rank_results.append(r2)
            last_rank_plan = m2

            print(
                f"Run {i+1:02d}/{RUNS}: "
                f"id client={r1.client_ms:7.2f}ms, rank client={r2.client_ms:7.2f}ms"
            )

        summarize("ORDER BY id", id_results)
        summarize("ORDER BY ts_rank DESC", rank_results)

        # Ratio (median client latency)
        id_med = statistics.median([r.client_ms for r in id_results])
        rank_med = statistics.median([r.client_ms for r in rank_results])
        ratio = (rank_med / id_med) if id_med > 0 else float("inf")
        print(f"\nMedian latency ratio (rank/id): {ratio:.2f}x")

        # Print a compact plan hint (root node + whether Sort appears)
        def plan_hint(metrics: Dict[str, Any]) -> str:
            plan = metrics.get("plan_root", {})
            node = plan.get("Node Type")
            rows = plan.get("Actual Rows")
            # Find first Sort node (if any)
            sort_node = find_first_plan_node(plan, "Sort")
            sort_method = sort_node.get("Sort Method") if sort_node else None
            return f"root={node}, rows={rows}, sort={sort_method}"

        if last_id_plan and last_rank_plan:
            print("\nPlan hints (last run):")
            print(f"  id   : {plan_hint(last_id_plan)}")
            print(f"  rank : {plan_hint(last_rank_plan)}")

        print("\nDone.")

if __name__ == "__main__":
    main()