In [19]:
import sqlite3
import pandas as pd

con = sqlite3.connect(":memory:")
con.execute("PRAGMA foreign_keys = ON;")

def q(sql, params=()):
    return pd.read_sql_query(sql, con, params=params)

con.executescript("""
CREATE TABLE ca_outbox (
        ca_id       TEXT NOT NULL,
        version     INTEGER NOT NULL,
        payload     TEXT,
        status      TEXT NOT NULL CHECK (status IN ('PENDING','SENT', 'DEAD_LETTER')),
        is_blocked  INTEGER NOT NULL CHECK (is_blocked IN (0,1)),
        created_at  TEXT NOT NULL DEFAULT (datetime('now')),
        updated_at  TEXT NOT NULL DEFAULT (datetime('now')),
        conflict    INTEGER,
        PRIMARY KEY (ca_id, version)
        );
""")


<sqlite3.Cursor at 0x7c533956e9c0>

In [None]:
upsert_sql = """
INSERT INTO ca_outbox (
  ca_id, version, payload, status, is_blocked, created_at, updated_at, conflict
)
VALUES (?, ?, ?, 'PENDING', ?, datetime('now'), datetime('now'), 0)
ON CONFLICT(ca_id, version) DO UPDATE SET
  payload = excluded.payload,
  status  = CASE WHEN ca_outbox.status = 'SENT'
                 THEN 'SENT'
                 ELSE 'PENDING'
            END,
  updated_at = datetime('now'),
  conflict = conflict + 1;
"""


In [28]:
rows = [
    ("CA-1", 5, '{"v":5}'),
    ("CA-1", 6, '{"v":6}'),
    ("CA-1", 1, '{"v":1}')
]

con.executemany(upsert_sql, rows)


<sqlite3.Cursor at 0x7c5339591940>

In [124]:
pd.read_sql_query("SELECT * FROM ca_outbox ORDER BY ca_id, version", con)

Unnamed: 0,ca_id,version,payload,status,is_blocked,created_at,updated_at,conflict
0,CA-1,1,"{""v"":1}",PENDING,0,2026-01-25 16:19:15,2026-01-25 16:19:15,0
1,CA-1,2,"{""v"":2}",PENDING,0,2026-01-25 16:19:15,2026-01-25 16:19:15,0
2,CA-1,9,"{""v"":2}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
3,CA-2,5,"{""v"":5}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
4,CA-2,9,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
5,CA-2,10,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
6,CA-2,11,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0


In [None]:
rows = [
    ("CA-1", 5, '{"v":5}'),
    ("CA-1", 6, '{"v":6}'),
    ("CA-1", 1, '{"v":1}')
]

In [120]:
con.execute("DELETE FROM ca_outbox")

records = [
    ("CA-1", 1, '{"v":1}', 0),
    ("CA-1", 2, '{"v":2}', 0),
    ("CA-1", 9, '{"v":2}', 1),
    ("CA-2", 5, '{"v":5}', 1),
    ("CA-2", 9, '{"v":9}', 1),
    ("CA-2", 10, '{"v":9}', 1),
    ("CA-2", 11, '{"v":9}', 1),
]

# This sends all records in one go
con.executemany(upsert_sql, records)
con.commit()

con.commit()
pd.read_sql_query("SELECT * FROM ca_outbox ORDER BY ca_id, version", con)


Unnamed: 0,ca_id,version,payload,status,is_blocked,created_at,updated_at,conflict
0,CA-1,1,"{""v"":1}",PENDING,0,2026-01-25 16:19:15,2026-01-25 16:19:15,0
1,CA-1,2,"{""v"":2}",PENDING,0,2026-01-25 16:19:15,2026-01-25 16:19:15,0
2,CA-1,9,"{""v"":2}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
3,CA-2,5,"{""v"":5}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
4,CA-2,9,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
5,CA-2,10,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0
6,CA-2,11,"{""v"":9}",PENDING,1,2026-01-25 16:19:15,2026-01-25 16:19:15,0


In [154]:
params = {"caId": "CA-1"}
pd.read_sql_query("""
            WITH max_seen AS (
              SELECT COALESCE(MAX(version), 0) AS m
              FROM ca_outbox
              WHERE ca_id = :caId
            ),
            missing AS (
              SELECT gs AS v
              FROM max_seen, generate_series(1, max_seen.m) gs
              WHERE NOT EXISTS (
                SELECT 1 FROM ca_outbox o WHERE o.ca_id = :caId AND o.version = gs
              )
            )
            SELECT
              CASE
                WHEN (SELECT m FROM max_seen) = 0 THEN 0
                WHEN EXISTS (SELECT 1 FROM missing) THEN (SELECT MIN(v) FROM missing) - 1
                ELSE (SELECT m FROM max_seen)
              END AS max_contig
            
""", con, params)
    #   CASE
    #             WHEN (SELECT m FROM max_seen) = 0 THEN 0
    #             WHEN EXISTS (SELECT 1 FROM missing) THEN (SELECT MIN(v) FROM missing) - 1
    #             ELSE (SELECT m FROM max_seen)

DatabaseError: Execution failed on sql '
            WITH max_seen AS (
              SELECT COALESCE(MAX(version), 0) AS m
              FROM ca_outbox
              WHERE ca_id = :caId
            ),
            missing AS (
              SELECT gs AS v
              FROM max_seen, generate_series(1, max_seen.m) gs
              WHERE NOT EXISTS (
                SELECT 1 FROM ca_outbox o WHERE o.ca_id = :caId AND o.version = gs
              )
            )
            SELECT
              CASE
                WHEN (SELECT m FROM max_seen) = 0 THEN 0
                WHEN EXISTS (SELECT 1 FROM missing) THEN (SELECT MIN(v) FROM missing) - 1
                ELSE (SELECT m FROM max_seen)
              END AS max_contig

': no such table: generate_series

In [128]:
pd.read_sql_query("""
SELECT
        ca_id,
        version,
        LAG(version) OVER (PARTITION BY ca_id ORDER BY version) AS prev_version,
        is_blocked,
        -- Check if THIS specific row starts a gap
        CASE 
            WHEN (LAG(version) OVER (PARTITION BY ca_id ORDER BY version) IS NULL AND version <> 1) THEN 1
            WHEN (version - LAG(version) OVER (PARTITION BY ca_id ORDER BY version) <> 1) THEN 1
            ELSE 0
        END AS is_gap_start
        FROM ca_outbox
""", con)

Unnamed: 0,ca_id,version,prev_version,is_blocked,is_gap_start
0,CA-1,1,,0,0
1,CA-1,2,1.0,0,0
2,CA-1,9,2.0,1,1
3,CA-2,5,,1,1
4,CA-2,9,5.0,1,1
5,CA-2,10,9.0,1,0
6,CA-2,11,10.0,1,0


In [138]:
sql_gap_version = \
"""
WITH local_check AS (
    SELECT
        *,
        LAG(version) OVER (PARTITION BY ca_id ORDER BY version) AS prev_version,
        CASE 
            WHEN (LAG(version) OVER (PARTITION BY ca_id ORDER BY version) IS NULL) and (version <> 1) THEN 1 --'YES'
            WHEN (version - LAG(version) OVER (PARTITION BY ca_id ORDER BY version) <>1)  THEN 1 --'NO'
            ELSE 0 
            END AS
        is_gap_start
        FROM ca_outbox   
),
chain_check AS (
    SELECT 
        *,
        -- Sum up all "gap starts" from the beginning of time for this ca_id
        SUM(is_gap_start) OVER (PARTITION BY ca_id ORDER BY version) AS cumulative_gaps
    FROM local_check
)
SELECT 
    ca_id,
    version,
    prev_version,
    is_blocked,
    CASE WHEN cumulative_gaps > 0 THEN 'NOT_INSYNC' ELSE 'INSYNC' END AS insync_or_not
FROM chain_check
WHERE insync_or_not = 'NOT_INSYNC'
AND ca_id = ?
       
"""

In [142]:
def get_gap_version(ca_id: int) -> list[int]:
    # We use global to reference the string outside the function scope
    global sql_gap_version
    
    # Execute query with positional parameter (passed as a tuple)
    df = pd.read_sql_query(sql_gap_version, con, params=(ca_id,))

    
    # Return the 'version' column as a Python list
    return df[['prev_version', 'version']].values.tolist()

# pd.read_sql_query(sql_gap_version, con, params=('CA-1',))

gap = get_gap_version('CA-2')
gap



[[nan, 5.0], [5.0, 9.0], [9.0, 10.0], [10.0, 11.0]]

In [None]:
 WITH max_seen AS (
              SELECT COALESCE(MAX(version), 0) AS m
              FROM ca_outbox
              WHERE ca_id = :caId
            ),
            missing AS (
              SELECT gs AS v
              FROM max_seen, generate_series(1, max_seen.m) gs
              WHERE NOT EXISTS (
                SELECT 1 FROM ca_outbox o WHERE o.ca_id = :caId AND o.version = gs
              )
            )
            SELECT
              CASE
                WHEN (SELECT m FROM max_seen) = 0 THEN 0
                WHEN EXISTS (SELECT 1 FROM missing) THEN (SELECT MIN(v) FROM missing) - 1
                ELSE (SELECT m FROM max_seen)
              END AS max_contig