Skip to content

.live(): re-issue queries when underlying data changes.#72

Merged
ryanrasti merged 1 commit intomainfrom
ryan_live
May 2, 2026
Merged

.live(): re-issue queries when underlying data changes.#72
ryanrasti merged 1 commit intomainfrom
ryan_live

Conversation

@ryanrasti
Copy link
Copy Markdown
Owner

for await (const rows of db.live(query)) {
  render(rows);
}

Typegres sits between the client and pg, so mutations and subscribers are both in-process -- no triggers, WAL, or external CDC needed.

This means we see all mutations and have a direct line back to client -- the perfect place for a reactive setup.

How it works:

  1. Extraction (src/live/extractor.ts): top-level AND'd equality predicates from the query's WHERE/ON: col = literal (anchor) or col = other.col (edge).
  2. Predicate query: runs alongside the user's query in the same REPEATABLE READ txn. Returns watched values per (table, col); JS-side equivalence-class closure propagates literals across edges (so users.id=1 flows to notes.user_id even when no matching notes exist yet).
  3. Bus subscription (src/live/bus.ts): per-Database, opt-in via db.startLive(). Reverse index on (table, col, value) → subscriptions; in-memory backfill window; xid-based dispatch (events whose xid is invisible to the sub's snapshot trigger a re-run).
  4. Event emission: every mutation on a Table that opted in via transformer: TypegresLiveEvents.makeTransformer() is wrapped in a CTE that atomically appends to _typegres_live_events.

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a reactive “live query” capability to Typegres (db.live(query)) by extracting equality predicates from queries, emitting in-transaction mutation events into a shadow table, and using an in-process polling bus to re-run queries when relevant commits become visible.

Changes:

  • Introduces live-query pipeline: predicate extraction/closure, snapshot visibility logic, event bus polling + subscriptions, and Database.startLive()/live()/stopLive().
  • Adds mutation SQL transformation hooks on tables and wraps INSERT/UPDATE/DELETE in event-emitting CTE chains.
  • Updates type/runtime + codegen to support typed parameters and safer primitive-overload matching; refactors tests to use new shared DB helpers.

Reviewed changes

Copilot reviewed 38 out of 69 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tsconfig.json Bumps TS target to ES2024 (supports new runtime usage like Promise.withResolvers).
src/types/runtime.ts Narrows PgOp to accept Raw operators; aligns runtime types with SQL node changes.
src/types/overrides/any.ts Uses TypedParam for primitive literals; improves column-vs-expression validation for mutations.
src/types/generated/xid.ts Regenerated overloads / primitive matching for xid operators.
src/types/generated/tsvector.ts Regenerated overloads / primitive matching for text search methods.
src/types/generated/tsquery.ts Regenerated overloads / primitive matching for tsquery methods.
src/types/generated/timetz.ts Regenerated overloads / primitive matching changes (notably primitive allowances).
src/types/generated/timestamptz.ts Regenerated overloads / primitive matching changes.
src/types/generated/time.ts Regenerated overloads / primitive matching changes.
src/types/generated/regconfig.ts Regenerated overloads / primitive matching changes.
src/types/generated/polygon.ts Regenerated overloads / primitive matching changes.
src/types/generated/pg_lsn.ts Regenerated overloads / primitive matching changes.
src/types/generated/numeric.ts Regenerated overloads / primitive matching changes.
src/types/generated/money.ts Regenerated overloads / primitive matching changes.
src/types/generated/jsonb.ts Regenerated overloads / primitive matching changes.
src/types/generated/json.ts Regenerated overloads / primitive matching changes.
src/types/generated/interval.ts Regenerated overloads / primitive matching changes.
src/types/generated/int8.ts Regenerated overloads / primitive matching changes.
src/types/generated/inet.ts Regenerated overloads / primitive matching changes.
src/types/generated/circle.ts Regenerated overloads / primitive matching changes.
src/types/generated/bytea.ts Regenerated overloads / primitive matching changes.
src/types/generated/bit.ts Regenerated overloads / primitive matching changes.
src/types/generated/anyrange.ts Regenerated overloads / primitive matching changes for range types.
src/types/generated/anymultirange.ts Regenerated overloads / primitive matching changes for multirange types.
src/types/generated/anycompatiblearray.ts Regenerated overloads / primitive matching changes for compatible arrays.
src/types/generated/anyarray.ts Regenerated overloads / primitive matching changes for arrays.
src/types/generate.ts Reworks “allowPrimitive” codegen logic to avoid ambiguous overload resolution across types sharing TS primitives.
src/test-helpers.ts New shared Vitest DB lifecycle + transaction helpers; adds SQL structural equality helper.
src/table.ts Adds per-table mutation transformer hook and isTableClass helper; updates Table() factory signature.
src/table.test.ts Migrates tests to new shared setupDb() helpers.
src/rpc.test.ts Migrates tests to new shared setupDb() helpers.
src/live/test-helpers.ts Adds helper to create/truncate _typegres_live_events for live tests.
src/live/snapshot.ts Implements snapshot parsing + JS visibility predicate for xid checks.
src/live/snapshot.test.ts Tests snapshot parsing + visibility rules.
src/live/run.ts Implements a single live iteration (repeatable read txn + extractor + query).
src/live/run.test.ts Tests runLiveIteration outputs (rows + cursor + predicate set).
src/live/extractor.ts Implements predicate extraction, alias ordering, extractor SQL generation, and predicate closure.
src/live/extractor.test.ts Tests traversal ordering, SQL shape, and predicate closure behavior.
src/live/events.ts Implements mutation wrappers that append before/after images into _typegres_live_events.
src/live/events.test.ts Tests insert/update/delete event emission and RETURNING passthrough.
src/live/db-live.test.ts End-to-end tests for db.live() reruns on matching commits.
src/live/bus.ts Implements polling bus, reverse-indexed subscription matching, and backfill window handling.
src/live/bus.test.ts Tests bus signaling, cursor visibility skipping, backfill, and CursorTooOldError.
src/live/ISSUES.md Tracks known gaps and follow-ups for live queries.
src/live/AGENTS.md Documents live-query design, scope, and semantics.
src/index.ts Exposes transaction option types publicly.
src/exoeval/tool.ts Removes per-file eslint no-redeclare suppression (handled globally now).
src/exoeval/index.ts Removes per-file eslint no-redeclare suppression (handled globally now).
src/exoeval/expr.ts Removes per-file eslint no-redeclare suppression (handled globally now).
src/database.ts Adds transaction isolation options + nesting rules; adds startLive/live/stopLive APIs.
src/database.test.ts Adds tests for isolation nesting rules; migrates to shared DB helpers.
src/builder/update.ts Introduces FinalizedUpdate, returningMerge, transformer hook support, and shared SET compilation.
src/builder/update.test.ts Migrates to shared DB helpers.
src/builder/test-helper.ts Removes old always-on DB test helper (replaced by src/test-helpers.ts).
src/builder/sql.ts Refines SQL node types (Op uses Raw, Column fields), adds TypedParam, and overloads `sql`` for Raw literals.
src/builder/query.ts Adds FinalizedQuery, exposes opts, adds mergeReturning, and uses isTableClass.
src/builder/query.test.ts Migrates to shared DB helpers.
src/builder/insert.ts Introduces FinalizedInsert, returningMerge, and transformer hook support.
src/builder/insert.test.ts Migrates to shared DB helpers.
src/builder/delete.ts Introduces FinalizedDelete, returningMerge, and transformer hook support.
src/builder/delete.test.ts Migrates to shared DB helpers.
eslint.config.js Disables no-redeclare globally (TS overloads).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/live/events.ts
Comment on lines +115 to +134
const wrapInsertOrDelete = (
builder: InsertBuilder<any, any, any> | DeleteBuilder<any, any, any>,
side: "before" | "after",
): Sql => {
const tableName = builder.tableName;
const liveKey = `__live_${side}` as const;

// returningMerge() has the same shape on both InsertBuilder and
// DeleteBuilder; we cast to one of them to pick a single overload
// rather than writing a generic structural type.
const innerFinalized = (builder as InsertBuilder<any, any, any>)
.returningMerge(liveAfterReturning(tableName, side))
.finalize();

const beforeRef = side === "before" ? sql.ident(liveKey) : sql`NULL::jsonb`;
const afterRef = side === "after" ? sql.ident(liveKey) : sql`NULL::jsonb`;
const events = eventsInsertCte(tableName, beforeRef, afterRef);
const projection = userReturningProjection(innerFinalized.opts.returning ?? {}, liveKey);
return sql`WITH cte AS (${innerFinalized}), _events AS (${events}) SELECT ${projection} FROM cte`;
};
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapInsertOrDelete() always emits WITH cte AS (...), _events AS (...) ... FROM cte using unscoped, generic CTE identifiers. A user table named cte (or a query that already uses those identifiers in a larger composed SQL fragment) can conflict because CTE names shadow relations within the WITH statement. Recommend switching to uniquely prefixed internal CTE names (and quoting them via sql.ident) to avoid accidental name shadowing.

Copilot uses AI. Check for mistakes.
Comment thread src/live/events.ts Outdated
Comment on lines +175 to +187
// FinalizedUpdate.bind() can't be reused: the live wrap needs `FROM "before"
// WHERE foos.ctid = "before".ctid` instead of the user's WHERE inline.
const beforeCte = sql`SELECT *, ctid FROM ${sql.ident(tableName)} AS ${alias} WHERE ${where.toSql()} FOR UPDATE`;
const ctidJoin = sql`${sql.column(alias, sql.ident("ctid"))} = ${sql.ident("before")}.${sql.ident("ctid")}`;
const updateCte = sql`UPDATE ${sql.ident(tableName)} AS ${alias} SET ${sql.join(compileSetClauses(instance, setRow))} FROM ${sql.ident("before")} WHERE ${ctidJoin} RETURNING ${compileSelectList(returning)}`;

const events = eventsInsertCte(tableName, sql.ident("__live_before"), sql.ident("__live_after"));
const projection = userReturningProjection(returning ?? {}, "__live_before", "__live_after");

return sql.withScope(
[alias],
sql`WITH ${sql.ident("before")} AS (${beforeCte}), cte AS (${updateCte}), _events AS (${events}) SELECT ${projection} FROM cte`,
);
Copy link

Copilot AI May 1, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UPDATE live-events wrapper hard-codes a CTE name "before" and also uses generic CTE names like "cte"/"_events". If the target table is named "before" (or another chosen CTE name), Postgres will resolve references inside the CTE body against the CTE name instead of the base table and error (non-recursive CTE self-reference) or behave ambiguously. Consider using a Typegres-prefixed, collision-resistant CTE name (e.g. "__typegres_before") or generating a fresh Alias/Scope-registered name for these internal CTEs so user table names can’t conflict.

Copilot uses AI. Check for mistakes.
@ryanrasti ryanrasti force-pushed the ryan_live branch 2 times, most recently from 990169e to 0e20519 Compare May 2, 2026 03:14
@ryanrasti ryanrasti merged commit cf8600b into main May 2, 2026
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants