diff --git a/src/fill.test.ts b/src/fill.test.ts index 34d2832..b177813 100644 --- a/src/fill.test.ts +++ b/src/fill.test.ts @@ -279,6 +279,57 @@ describe("Pgslice.fill", () => { expect(count.count).toBe(3); }); + test("scopes to partition range on first fill", async ({ + pgslice, + transaction, + }) => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); + + try { + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-10', 'in-range'), + ('2025-12-15', 'out-of-range') + `); + + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + for await (const _batch of pgslice.fill(transaction, { + table: "posts", + })) { + // consume batches + } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + expect(rows.map((row) => row.name)).toEqual(["in-range"]); + } finally { + vi.useRealTimers(); + } + }); + test("returns nothing to fill for empty source", async ({ pgslice, transaction, diff --git a/src/filler.ts b/src/filler.ts index 49f0c4a..9ed4843 100644 --- a/src/filler.ts +++ b/src/filler.ts @@ -1,7 +1,6 @@ import { CommonQueryMethods, sql } from "slonik"; import { z } from "zod"; -import { Table } from "./table.js"; -import { advanceDate, parsePartitionDate } from "./date-ranges.js"; +import { Table, transformIdValue } from "./table.js"; import type { FillBatchResult, FillOptions, @@ -15,29 +14,6 @@ import { formatDateForSql } from "./sql-utils.js"; */ const idValueSchema = z.union([z.bigint(), z.number(), z.string()]).nullable(); -/** - * Transforms a raw ID value from the database into the proper IdValue type. - * Numbers and numeric strings become bigint, ULID strings stay as strings. - */ -function transformIdValue( - val: bigint | number | string | null, -): IdValue | null { - if (val === null) { - return null; - } - if (typeof val === "bigint") { - return val; - } - if (typeof val === "number") { - return BigInt(val); - } - // val is string - check if it's a numeric string or ULID - if (/^\d+$/.test(val)) { - return BigInt(val); - } - return val; -} - export interface FillerOptions { source: Table; dest: Table; @@ -97,42 +73,13 @@ export class Filler { throw new Error(`Table not found: ${destTable.toString()}`); } - // Get partition settings from dest table for time filtering - const settings = await destTable.fetchSettings(tx); - - // Determine time filter if dest is partitioned - let timeFilter: TimeFilter | undefined; - if (settings) { - const partitions = await destTable.partitions(tx); - if (partitions.length > 0) { - const firstPartition = partitions[0]; - const lastPartition = partitions[partitions.length - 1]; - - const startingTime = parsePartitionDate( - firstPartition.name, - settings.period, - ); - const lastPartitionDate = parsePartitionDate( - lastPartition.name, - settings.period, - ); - const endingTime = advanceDate(lastPartitionDate, settings.period, 1); - - timeFilter = { - column: settings.column, - cast: settings.cast, - startingTime, - endingTime, - }; - } - } + const { settings, partitions, timeFilter } = + await destTable.partitionContext(tx); // Determine which table to get the schema (columns, primary key) from let schemaTable: Table; - if (settings) { - const partitions = await destTable.partitions(tx); - schemaTable = - partitions.length > 0 ? partitions[partitions.length - 1] : table; + if (settings && partitions.length > 0) { + schemaTable = partitions[partitions.length - 1]; } else { schemaTable = table; } diff --git a/src/synchronizer.test.ts b/src/synchronizer.test.ts index 9712239..def7eb0 100644 --- a/src/synchronizer.test.ts +++ b/src/synchronizer.test.ts @@ -1,4 +1,4 @@ -import { describe, expect } from "vitest"; +import { describe, expect, vi } from "vitest"; import { sql } from "slonik"; import { z } from "zod"; @@ -289,6 +289,60 @@ describe("Synchronizer", () => { expect(batches).toEqual([expect.objectContaining({ rowsInserted: 3 })]); }); + test("defaults to partition range when start is omitted", async ({ + transaction, + pgslice, + }) => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(Date.UTC(2026, 0, 15))); + + try { + await transaction.query(sql.unsafe` + CREATE TABLE posts ( + id BIGSERIAL PRIMARY KEY, + created_at DATE NOT NULL, + name TEXT + ) + `); + await transaction.query(sql.unsafe` + INSERT INTO posts (created_at, name) VALUES + ('2026-01-10', 'in-range'), + ('2025-12-15', 'out-of-range') + `); + + await pgslice.prep(transaction, { + table: "posts", + column: "created_at", + period: "month", + partition: true, + }); + await pgslice.addPartitions(transaction, { + table: "posts", + intermediate: true, + past: 0, + future: 0, + }); + + const synchronizer = await Synchronizer.init(transaction, { + table: "posts", + windowSize: 10, + }); + + for await (const _batch of synchronizer.synchronize(transaction)) { + // consume batches + } + + const rows = await transaction.any( + sql.type(z.object({ name: z.string() }))` + SELECT name FROM posts_intermediate ORDER BY id ASC + `, + ); + expect(rows.map((row) => row.name)).toEqual(["in-range"]); + } finally { + vi.useRealTimers(); + } + }); + test("reports matching rows", async ({ transaction }) => { await transaction.query(sql.unsafe` CREATE TABLE posts (id BIGSERIAL PRIMARY KEY, name TEXT) diff --git a/src/synchronizer.ts b/src/synchronizer.ts index d257cdc..6e94ae2 100644 --- a/src/synchronizer.ts +++ b/src/synchronizer.ts @@ -1,33 +1,16 @@ import { CommonQueryMethods } from "slonik"; import { z } from "zod"; -import { Table } from "./table.js"; -import { sql, valueToSql } from "./sql-utils.js"; +import { Table, transformIdValue } from "./table.js"; +import { formatDateForSql, sql, valueToSql } from "./sql-utils.js"; import type { ColumnInfo, IdValue, SynchronizeBatchResult, SynchronizeOptions, + TimeFilter, } from "./types.js"; -/** - * Transforms a raw ID value from the database into the proper IdValue type. - * Numbers and numeric strings become bigint, ULID strings stay as strings. - */ -function transformIdValue(val: bigint | number | string): IdValue { - if (typeof val === "bigint") { - return val; - } - if (typeof val === "number") { - return BigInt(val); - } - // val is string - check if it's a numeric string or ULID - if (/^\d+$/.test(val)) { - return BigInt(val); - } - return val; -} - /** * Zod schema for a row from the database. All values are nullable. */ @@ -41,6 +24,7 @@ interface SynchronizerOptions { windowSize: number; startingId: IdValue; dryRun: boolean; + timeFilter?: TimeFilter; } /** @@ -55,6 +39,7 @@ export class Synchronizer { readonly #windowSize: number; readonly #startingId: IdValue; readonly #dryRun: boolean; + readonly #timeFilter?: TimeFilter; private constructor(options: SynchronizerOptions) { this.#source = options.source; @@ -64,6 +49,7 @@ export class Synchronizer { this.#windowSize = options.windowSize; this.#startingId = options.startingId; this.#dryRun = options.dryRun; + this.#timeFilter = options.timeFilter; } get source(): Table { @@ -117,11 +103,22 @@ export class Synchronizer { const primaryKeyColumn = await sourceTable.primaryKey(tx); + const timeFilter = await targetTable.partitionTimeFilter(tx); + let startingId: IdValue; if (options.start !== undefined) { startingId = transformIdValue(options.start); } else { - const minId = await sourceTable.minId(tx); + const minId = await sourceTable.minId( + tx, + timeFilter + ? { + column: timeFilter.column, + cast: timeFilter.cast, + startingTime: timeFilter.startingTime, + } + : undefined, + ); if (minId === null) { throw new Error("No rows found in source table"); } @@ -136,6 +133,7 @@ export class Synchronizer { windowSize: options.windowSize ?? 1000, startingId, dryRun: options.dryRun ?? false, + timeFilter, }); } @@ -274,12 +272,30 @@ export class Synchronizer { ); const operator = includeStart ? sql.fragment`>=` : sql.fragment`>`; + const conditions = [sql.fragment`${pkCol} ${operator} ${startingId}`]; + + if (this.#timeFilter) { + const timeCol = sql.identifier([this.#timeFilter.column]); + const startDate = formatDateForSql( + this.#timeFilter.startingTime, + this.#timeFilter.cast, + ); + const endDate = formatDateForSql( + this.#timeFilter.endingTime, + this.#timeFilter.cast, + ); + conditions.push( + sql.fragment`${timeCol} >= ${startDate} AND ${timeCol} < ${endDate}`, + ); + } + + const whereClause = sql.join(conditions, sql.fragment` AND `); const result = await connection.any( sql.type(rowSchema)` SELECT ${columnList} FROM ${table.sqlIdentifier} - WHERE ${pkCol} ${operator} ${startingId} + WHERE ${whereClause} ORDER BY ${pkCol} LIMIT ${this.#windowSize} `, diff --git a/src/table.ts b/src/table.ts index 9bacd45..892cede 100644 --- a/src/table.ts +++ b/src/table.ts @@ -5,7 +5,14 @@ import { IdentifierSqlToken, } from "slonik"; import { z } from "zod"; -import type { Cast, ColumnInfo, IdValue, SequenceInfo } from "./types.js"; +import { advanceDate, parsePartitionDate } from "./date-ranges.js"; +import type { + Cast, + ColumnInfo, + IdValue, + SequenceInfo, + TimeFilter, +} from "./types.js"; import { TableSettings } from "./table-settings.js"; import { formatDateForSql } from "./sql-utils.js"; @@ -20,7 +27,11 @@ const idValueSchema = z.union([z.bigint(), z.number(), z.string()]).nullable(); * Transforms a raw ID value from the database into the proper IdValue type. * Numbers and numeric strings become bigint, ULID strings stay as strings. */ -function transformIdValue( +export function transformIdValue(val: bigint | number | string): IdValue; +export function transformIdValue( + val: bigint | number | string | null, +): IdValue | null; +export function transformIdValue( val: bigint | number | string | null, ): IdValue | null { if (val === null) { @@ -55,6 +66,37 @@ function dataTypeToCast(dataType: string): Cast | null { } } +function derivePartitionTimeFilter( + settings: TableSettings, + partitions: Table[], +): TimeFilter | undefined { + if (partitions.length === 0) { + return undefined; + } + + const firstPartition = partitions[0]; + const lastPartition = partitions[partitions.length - 1]; + const startingTime = parsePartitionDate(firstPartition.name, settings.period); + const lastPartitionDate = parsePartitionDate( + lastPartition.name, + settings.period, + ); + const endingTime = advanceDate(lastPartitionDate, settings.period, 1); + + return { + column: settings.column, + cast: settings.cast, + startingTime, + endingTime, + }; +} + +export interface PartitionContext { + settings: TableSettings | null; + partitions: Table[]; + timeFilter?: TimeFilter; +} + /** * Gets the server version number. */ @@ -307,7 +349,7 @@ export class Table { /** * Gets all child partitions of this table. */ - async partitions(tx: DatabaseTransactionConnection): Promise