Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/fill.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,57 @@ describe("Pgslice.fill", () => {
expect(count.count).toBe(3);
});

test("scopes to partition range on first fill", async ({
pgslice,
transaction,
}) => {
vi.useFakeTimers();
vi.setSystemTime(new Date(Date.UTC(2026, 0, 15)));

try {
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (created_at, name) VALUES
('2026-01-10', 'in-range'),
('2025-12-15', 'out-of-range')
`);

await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});

for await (const _batch of pgslice.fill(transaction, {
table: "posts",
})) {
// consume batches
}

const rows = await transaction.any(
sql.type(z.object({ name: z.string() }))`
SELECT name FROM posts_intermediate ORDER BY id ASC
`,
);
expect(rows.map((row) => row.name)).toEqual(["in-range"]);
} finally {
vi.useRealTimers();
}
});

test("returns nothing to fill for empty source", async ({
pgslice,
transaction,
Expand Down
63 changes: 5 additions & 58 deletions src/filler.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { CommonQueryMethods, sql } from "slonik";
import { z } from "zod";
import { Table } from "./table.js";
import { advanceDate, parsePartitionDate } from "./date-ranges.js";
import { Table, transformIdValue } from "./table.js";
import type {
FillBatchResult,
FillOptions,
Expand All @@ -15,29 +14,6 @@ import { formatDateForSql } from "./sql-utils.js";
*/
const idValueSchema = z.union([z.bigint(), z.number(), z.string()]).nullable();

/**
* Transforms a raw ID value from the database into the proper IdValue type.
* Numbers and numeric strings become bigint, ULID strings stay as strings.
*/
function transformIdValue(
val: bigint | number | string | null,
): IdValue | null {
if (val === null) {
return null;
}
if (typeof val === "bigint") {
return val;
}
if (typeof val === "number") {
return BigInt(val);
}
// val is string - check if it's a numeric string or ULID
if (/^\d+$/.test(val)) {
return BigInt(val);
}
return val;
}

export interface FillerOptions {
source: Table;
dest: Table;
Expand Down Expand Up @@ -97,42 +73,13 @@ export class Filler {
throw new Error(`Table not found: ${destTable.toString()}`);
}

// Get partition settings from dest table for time filtering
const settings = await destTable.fetchSettings(tx);

// Determine time filter if dest is partitioned
let timeFilter: TimeFilter | undefined;
if (settings) {
const partitions = await destTable.partitions(tx);
if (partitions.length > 0) {
const firstPartition = partitions[0];
const lastPartition = partitions[partitions.length - 1];

const startingTime = parsePartitionDate(
firstPartition.name,
settings.period,
);
const lastPartitionDate = parsePartitionDate(
lastPartition.name,
settings.period,
);
const endingTime = advanceDate(lastPartitionDate, settings.period, 1);

timeFilter = {
column: settings.column,
cast: settings.cast,
startingTime,
endingTime,
};
}
}
const { settings, partitions, timeFilter } =
await destTable.partitionContext(tx);

// Determine which table to get the schema (columns, primary key) from
let schemaTable: Table;
if (settings) {
const partitions = await destTable.partitions(tx);
schemaTable =
partitions.length > 0 ? partitions[partitions.length - 1] : table;
if (settings && partitions.length > 0) {
schemaTable = partitions[partitions.length - 1];
} else {
schemaTable = table;
}
Expand Down
56 changes: 55 additions & 1 deletion src/synchronizer.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { describe, expect } from "vitest";
import { describe, expect, vi } from "vitest";
import { sql } from "slonik";
import { z } from "zod";

Expand Down Expand Up @@ -289,6 +289,60 @@ describe("Synchronizer", () => {
expect(batches).toEqual([expect.objectContaining({ rowsInserted: 3 })]);
});

test("defaults to partition range when start is omitted", async ({
transaction,
pgslice,
}) => {
vi.useFakeTimers();
vi.setSystemTime(new Date(Date.UTC(2026, 0, 15)));

try {
await transaction.query(sql.unsafe`
CREATE TABLE posts (
id BIGSERIAL PRIMARY KEY,
created_at DATE NOT NULL,
name TEXT
)
`);
await transaction.query(sql.unsafe`
INSERT INTO posts (created_at, name) VALUES
('2026-01-10', 'in-range'),
('2025-12-15', 'out-of-range')
`);

await pgslice.prep(transaction, {
table: "posts",
column: "created_at",
period: "month",
partition: true,
});
await pgslice.addPartitions(transaction, {
table: "posts",
intermediate: true,
past: 0,
future: 0,
});

const synchronizer = await Synchronizer.init(transaction, {
table: "posts",
windowSize: 10,
});

for await (const _batch of synchronizer.synchronize(transaction)) {
// consume batches
}

const rows = await transaction.any(
sql.type(z.object({ name: z.string() }))`
SELECT name FROM posts_intermediate ORDER BY id ASC
`,
);
expect(rows.map((row) => row.name)).toEqual(["in-range"]);
} finally {
vi.useRealTimers();
}
});

test("reports matching rows", async ({ transaction }) => {
await transaction.query(sql.unsafe`
CREATE TABLE posts (id BIGSERIAL PRIMARY KEY, name TEXT)
Expand Down
60 changes: 38 additions & 22 deletions src/synchronizer.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,16 @@
import { CommonQueryMethods } from "slonik";
import { z } from "zod";

import { Table } from "./table.js";
import { sql, valueToSql } from "./sql-utils.js";
import { Table, transformIdValue } from "./table.js";
import { formatDateForSql, sql, valueToSql } from "./sql-utils.js";
import type {
ColumnInfo,
IdValue,
SynchronizeBatchResult,
SynchronizeOptions,
TimeFilter,
} from "./types.js";

/**
* Transforms a raw ID value from the database into the proper IdValue type.
* Numbers and numeric strings become bigint, ULID strings stay as strings.
*/
function transformIdValue(val: bigint | number | string): IdValue {
if (typeof val === "bigint") {
return val;
}
if (typeof val === "number") {
return BigInt(val);
}
// val is string - check if it's a numeric string or ULID
if (/^\d+$/.test(val)) {
return BigInt(val);
}
return val;
}

/**
* Zod schema for a row from the database. All values are nullable.
*/
Expand All @@ -41,6 +24,7 @@ interface SynchronizerOptions {
windowSize: number;
startingId: IdValue;
dryRun: boolean;
timeFilter?: TimeFilter;
}

/**
Expand All @@ -55,6 +39,7 @@ export class Synchronizer {
readonly #windowSize: number;
readonly #startingId: IdValue;
readonly #dryRun: boolean;
readonly #timeFilter?: TimeFilter;

private constructor(options: SynchronizerOptions) {
this.#source = options.source;
Expand All @@ -64,6 +49,7 @@ export class Synchronizer {
this.#windowSize = options.windowSize;
this.#startingId = options.startingId;
this.#dryRun = options.dryRun;
this.#timeFilter = options.timeFilter;
}

get source(): Table {
Expand Down Expand Up @@ -117,11 +103,22 @@ export class Synchronizer {

const primaryKeyColumn = await sourceTable.primaryKey(tx);

const timeFilter = await targetTable.partitionTimeFilter(tx);

let startingId: IdValue;
if (options.start !== undefined) {
startingId = transformIdValue(options.start);
} else {
const minId = await sourceTable.minId(tx);
const minId = await sourceTable.minId(
tx,
timeFilter
? {
column: timeFilter.column,
cast: timeFilter.cast,
startingTime: timeFilter.startingTime,
}
: undefined,
);
if (minId === null) {
throw new Error("No rows found in source table");
}
Expand All @@ -136,6 +133,7 @@ export class Synchronizer {
windowSize: options.windowSize ?? 1000,
startingId,
dryRun: options.dryRun ?? false,
timeFilter,
});
}

Expand Down Expand Up @@ -274,12 +272,30 @@ export class Synchronizer {
);

const operator = includeStart ? sql.fragment`>=` : sql.fragment`>`;
const conditions = [sql.fragment`${pkCol} ${operator} ${startingId}`];

if (this.#timeFilter) {
const timeCol = sql.identifier([this.#timeFilter.column]);
const startDate = formatDateForSql(
this.#timeFilter.startingTime,
this.#timeFilter.cast,
);
const endDate = formatDateForSql(
this.#timeFilter.endingTime,
this.#timeFilter.cast,
);
conditions.push(
sql.fragment`${timeCol} >= ${startDate} AND ${timeCol} < ${endDate}`,
);
}

const whereClause = sql.join(conditions, sql.fragment` AND `);

const result = await connection.any(
sql.type(rowSchema)`
SELECT ${columnList}
FROM ${table.sqlIdentifier}
WHERE ${pkCol} ${operator} ${startingId}
WHERE ${whereClause}
ORDER BY ${pkCol}
LIMIT ${this.#windowSize}
`,
Expand Down
Loading