## Logging latency evaluation (Postgres â†’ Elasticsearch alerts-index)

This notebook:
- Connects to Postgres directly.
- Runs test queries.
- Records wall-clock timestamps around each query.
- Polls Elasticsearch `alerts-index` and records the newest alert `@timestamp` observed after each query.

**Configurations
- Postgres: `localhost:5432`, db `thesisdb`, user `myuser`, password `mypassword`
- Elasticsearch: `http://localhost:9200`



In [1]:
import time
import json
from datetime import datetime, timezone
from typing import Optional, Dict, Any, List

import psycopg2
import psycopg2.extras
import requests


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def parse_es_ts(ts: Optional[str]) -> Optional[datetime]:
    if not ts:
        return None
    # Elasticsearch usually returns RFC3339 / ISO8601 with Z
    # Example: 2025-12-04T05:45:00.968013336Z
    s = ts
    if s.endswith('Z'):
        s = s[:-1] + '+00:00'
    # Python can't parse > 6 fractional digits; trim if needed
    if '.' in s:
        head, frac = s.split('.', 1)
        if '+' in frac:
            frac_digits, tz = frac.split('+', 1)
            frac_digits = frac_digits[:6]
            s = f"{head}.{frac_digits}+{tz}"
    return datetime.fromisoformat(s)


PG = {
    "host": "localhost",
    "port": 5432,
    "dbname": "thesisdb",
    "user": "myuser",
    "password": "mypassword",
}

ES = {
    "base_url": "http://localhost:9200",
    "alerts_index": "alerts-index",
}

print("Configured PG:", {k: v for k, v in PG.items() if k != "password"})
print("Configured ES:", ES)



Configured PG: {'host': 'localhost', 'port': 5432, 'dbname': 'thesisdb', 'user': 'myuser'}
Configured ES: {'base_url': 'http://localhost:9200', 'alerts_index': 'alerts-index'}


In [2]:
def pg_connect():
    conn = psycopg2.connect(**PG)
    conn.autocommit = True
    return conn


def es_latest_alert_ts() -> Optional[str]:
    """Return newest alerts-index @timestamp (string) or None if empty/not found."""
    url = f"{ES['base_url'].rstrip('/')}/{ES['alerts_index']}/_search"
    body = {
        "size": 1,
        "sort": [{"@timestamp": "desc"}],
        "_source": ["@timestamp", "rule_id", "message", "alert_id"],
        "query": {"match_all": {}},
    }
    r = requests.get(url, json=body, timeout=10)
    if r.status_code == 404:
        return None
    r.raise_for_status()
    data = r.json()
    hits = data.get("hits", {}).get("hits", [])
    if not hits:
        return None
    return hits[0].get("_source", {}).get("@timestamp")


def es_wait_new_alert(after_ts: Optional[str], timeout_s: float = 30.0, poll_s: float = 1.0) -> Dict[str, Any]:
    """Poll until alerts-index newest @timestamp changes (or timeout)."""
    start = time.time()
    while True:
        cur = es_latest_alert_ts()
        if cur and cur != after_ts:
            return {"status": "updated", "newest_alert_ts": cur, "wait_s": round(time.time() - start, 3)}
        if time.time() - start > timeout_s:
            return {"status": "timeout", "newest_alert_ts": cur, "wait_s": round(time.time() - start, 3)}
        time.sleep(poll_s)


In [3]:
# Configure your test queries here.
# Tip: use SET application_name so your trigger/audit pipeline can attribute these actions.
# NOTE: If your audit trigger only logs DML, SELECT won't create audit rows.

TEST_QUERIES: List[Dict[str, Any]] = [
    {
        "name": "select_count",
        "sql": "SET application_name = 'eval_logging'; SELECT count(*) FROM test1123_f;",
    }#,
    #{
    #    "name": "delete_one_row",
        # adjust predicate/PK to your schema
    #    "sql": "SET application_name = 'eval_logging'; DELETE FROM test1123_f WHERE row_number = 9999;",
    #},
]

# Polling settings for alerts-index
ALERT_POLL_TIMEOUT_S = 120.0
ALERT_POLL_INTERVAL_S = 1.0

print("Loaded", len(TEST_QUERIES), "test queries")



Loaded 1 test queries


In [4]:
def run_eval(test_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    results: List[Dict[str, Any]] = []

    baseline_alert_ts = es_latest_alert_ts()
    print("Baseline newest alert @timestamp:", baseline_alert_ts)

    with pg_connect() as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            for q in test_queries:
                name = q["name"]
                sql = q["sql"]

                # Record pre-query alert timestamp (newest alert doc)
                pre_alert_ts = es_latest_alert_ts()

                # Run query and time it
                t_start_iso = utc_now_iso()
                t_start = time.time()
                try:
                    cur.execute(sql)
                    # Fetch all result sets if any (only last SELECT is fetchable)
                    row = None
                    if cur.description is not None:
                        row = cur.fetchone()
                except Exception as e:
                    row = None
                    err = repr(e)
                else:
                    err = None
                t_end = time.time()
                t_end_iso = utc_now_iso()

                # Wait for a new alert to appear (best-effort)
                wait_result = es_wait_new_alert(pre_alert_ts, timeout_s=ALERT_POLL_TIMEOUT_S, poll_s=ALERT_POLL_INTERVAL_S)

                results.append({
                    "name": name,
                    "sql": sql,
                    "query_start_utc": t_start_iso,
                    "query_end_utc": t_end_iso,
                    "query_duration_s": round(t_end - t_start, 6),
                    "pg_result_first_row": row,
                    "error": err,
                    "alerts_newest_ts_before": pre_alert_ts,
                    **wait_result,
                })

                print(f"[{name}] duration={results[-1]['query_duration_s']}s alert_status={wait_result['status']} wait={wait_result['wait_s']}s")

    return results


eval_results = run_eval(TEST_QUERIES)
print("Done. Rows:", len(eval_results))



Baseline newest alert @timestamp: 2025-12-18T04:11:57.464Z
[select_count] duration=0.065739s alert_status=updated wait=61.87s
Done. Rows: 1


In [5]:
import os
import pandas as pd

# Append results to CSV (preserve old runs)
out_path = "./logging_latency_eval.csv"
new_df = pd.DataFrame(eval_results)

if os.path.exists(out_path) and os.path.getsize(out_path) > 0:
    old_df = pd.read_csv(out_path)
    combined = pd.concat([old_df, new_df], ignore_index=True)
else:
    combined = new_df

combined.to_csv(out_path, index=False)

print(f"Wrote {out_path} (rows now: {len(combined)})")
combined


Wrote ./logging_latency_eval.csv (rows now: 9)


Unnamed: 0,name,sql,query_start_utc,query_end_utc,query_duration_s,pg_result_first_row,error,alerts_newest_ts_before,status,newest_alert_ts,wait_s
0,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:29:25.821124+00:00,2025-12-18T03:29:25.842123+00:00,0.020969,"RealDictRow([('count', 1)])",,2025-12-18T03:23:58.226Z,updated,2025-12-18T03:30:16.350Z,54.639
1,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:36:20.111979+00:00,2025-12-18T03:36:20.125813+00:00,0.013808,"RealDictRow([('count', 1)])",,2025-12-18T03:30:16.350Z,updated,2025-12-18T03:36:24.343Z,7.285
2,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:42:25.177831+00:00,2025-12-18T03:42:25.192965+00:00,0.01512,"RealDictRow([('count', 1)])",,2025-12-18T03:36:24.343Z,updated,2025-12-18T03:42:39.768Z,17.596
3,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:48:10.109009+00:00,2025-12-18T03:48:10.159437+00:00,0.050395,"RealDictRow([('count', 12999)])",,2025-12-18T03:42:39.768Z,updated,2025-12-18T03:48:57.729Z,51.546
4,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:53:22.792045+00:00,2025-12-18T03:53:22.816820+00:00,0.024743,"RealDictRow([('count', 12999)])",,2025-12-18T03:48:57.729Z,updated,2025-12-18T03:54:12.887Z,53.547
5,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T03:59:21.513030+00:00,2025-12-18T03:59:21.640041+00:00,0.12696,"RealDictRow([('count', 12999)])",,2025-12-18T03:54:12.887Z,updated,2025-12-18T03:59:41.333Z,23.791
6,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T04:06:15.540392+00:00,2025-12-18T04:06:15.569676+00:00,0.029253,"RealDictRow([('count', 12999)])",,2025-12-18T03:59:41.333Z,updated,2025-12-18T04:06:42.371Z,30.857
7,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T04:11:24.116871+00:00,2025-12-18T04:11:24.141354+00:00,0.02445,"RealDictRow([('count', 12999)])",,2025-12-18T04:06:42.371Z,updated,2025-12-18T04:11:57.464Z,37.182
8,select_count,SET application_name = 'eval_logging'; SELECT ...,2025-12-18T04:17:09.608202+00:00,2025-12-18T04:17:09.673958+00:00,0.065739,{'count': 12999},,2025-12-18T04:11:57.464Z,updated,2025-12-18T04:18:08.206Z,61.87
