# Benchmark Postgres with Q3C extension

In [None]:
# Move to the top of the repo
%cd ..

In [None]:
results = {}

In [None]:
# First make sure our services are running
! docker compose -f docker-compose.postgres.yaml up -d

import time

time.sleep(3)  # Wait for services to start

In [None]:
PORT = 5435
DB_NAME = "boom_benchmarking"

In [None]:
import psycopg2

# Now drop the database if it exists and create a table for our
# alerts
sql_drop = f"DROP DATABASE IF EXISTS {DB_NAME};"
sql_create = f"CREATE DATABASE {DB_NAME};"

# Connect to the PostgreSQL database and execute the SQL commands
conn = psycopg2.connect(
    host="localhost", port=PORT, user="postgres", password="postgres"
)
conn.autocommit = True  # Enable autocommit mode
cursor = conn.cursor()
cursor.execute(sql_drop)
cursor.execute(sql_create)
print("Database created successfully")

In [None]:
# Check that q3c is installed
conn = psycopg2.connect(
    host="localhost",
    port=PORT,
    user="postgres",
    password="postgres",
    database=DB_NAME,
)
# Get the q3c version
cursor = conn.cursor()
res = cursor.execute(
    """
        CREATE EXTENSION IF NOT EXISTS q3c;
        SELECT q3c_version()
    """
)
cursor.fetchall()

In [None]:
import psycopg2

# Now drop the database if it exists and create a table for our
# alerts
sql = """
-- Enable the q3c extension for spatial indexing
CREATE EXTENSION IF NOT EXISTS q3c;

CREATE TABLE IF NOT EXISTS alert (
    id SERIAL PRIMARY KEY,
    survey_id INT NOT NULL,
    object_id VARCHAR(255) NOT NULL,
    cand_id VARCHAR(255) NOT NULL,
    candidate JSONB NOT NULL,
    ra DOUBLE PRECISION NOT NULL,
    dec DOUBLE PRECISION NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Create a Q3C index on the ra and dec columns for faster querying
CREATE INDEX idx_objects_q3c ON alert (q3c_ang2ipix(ra, dec));
"""

# Connect to the PostgreSQL database and execute the SQL commands
conn = psycopg2.connect(
    host="localhost",
    port=PORT,
    user="postgres",
    password="postgres",
    database=DB_NAME,
)

with conn.cursor() as cursor:
    cursor.execute(sql)
    conn.commit()
    print("Table created successfully.")

In [None]:
%load_ext calkit.magics

In [None]:
%%stage --name ztf-avro-to-pg-rows \
    --dep-path "data/ztf_public_20250614" \
    --environment py \
    --verbose \
    --out-storage none \
    --out rows

# Insert all the ZTF alerts
import glob
import fastavro
from tqdm.auto import tqdm

ztf_avro_fpaths = glob.glob("data/ztf_public_20250614/*.avro")

print(f"Found {len(ztf_avro_fpaths)} ZTF alerts")

print("Converting to rows for insertion into PostgreSQL")
rows = []
for alert_avro_fpath in tqdm(ztf_avro_fpaths):
    with open(alert_avro_fpath, "rb") as f:
        reader = fastavro.reader(f)
        for alert in reader:
            alert_fmt = {
                "object_id": alert["objectId"],
                "survey_id": 0,
                "cand_id": alert["candid"],
                "candidate": alert["candidate"],
                "ra": alert["candidate"]["ra"],
                "dec": alert["candidate"]["dec"],
            }
            rows.append(alert_fmt)
# This line left blank so we don't try to print the line above

In [None]:
print(len(rows))

## Insert ZTF alerts

In [None]:
# Convert rows in to tuples for insertion
from psycopg2.extras import Json

rows_tuples = [
    (
        row["survey_id"],
        row["object_id"],
        row["cand_id"],
        Json(row["candidate"]),
        row["ra"],
        row["dec"],
    )
    for row in rows
]

In [None]:
import time

from psycopg2.extras import execute_values
from tqdm.auto import tqdm

# Insert all rows into the PostgreSQL database
insert_sql = """
INSERT INTO alert (survey_id, object_id, cand_id, candidate, ra, dec)
VALUES %s;
"""

n_iterations = 5  # TODO: Should be a project parameter?
n_inserted = []
insert_times = []

with conn.cursor() as cursor:
    for _ in tqdm(range(n_iterations)):
        t0 = time.time()
        execute_values(
            cursor, insert_sql, rows_tuples, page_size=1000
        )  # Use page_size for batch insertion
        conn.commit()
        t1 = time.time()
        n_inserted.append(len(rows_tuples))
        insert_times.append(t1 - t0)

results["n_ztf_alerts_inserted"] = n_inserted
results["ztf_alerts_insert_time_s"] = insert_times
results

## Run a cone search query

In [None]:
# See how long it will take to filter some alerts based on their RA and DEC
# TODO: This should probably be done multiple times because there is some
# variance in the time it takes
import time

t0 = time.time()

sql = """
    SELECT *
    FROM alert
    WHERE q3c_radial_query(ra, dec, 180.0, 0.0, 1.0)
    LIMIT 20;
"""

with conn.cursor() as cursor:
    res = cursor.execute(sql)
    res = cursor.fetchall()
t1 = time.time()
results["cone_search_query_time_s"] = t1 - t0
print(
    f"Found {len(res)} rows matching the query in "
    f"{results['cone_search_query_time_s']} seconds."
)
res[0][:4]

In [None]:
# Write results out to file
import json
import os

os.makedirs("results", exist_ok=True)

with open("results/postgres.json", "w") as f:
    json.dump(results, f, indent=4)

In [None]:
! docker compose -f docker-compose.postgres.yaml down