Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 109 additions & 1 deletion internal-packages/replication/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,31 @@ export class LogicalReplicationClient {
return false;
}

if (await this.#doesPublicationExist()) {
const publicationExists = await this.#doesPublicationExist();

if (publicationExists) {
// Validate the existing publication is correctly configured
const validationError = await this.#validatePublicationConfiguration();

if (validationError) {
this.logger.error("Publication exists but is misconfigured", {
name: this.options.name,
table: this.options.table,
slotName: this.options.slotName,
publicationName: this.options.publicationName,
error: validationError,
});

this.events.emit("error", new LogicalReplicationClientError(validationError));
return false;
}

this.logger.info("Publication exists and is correctly configured", {
name: this.options.name,
table: this.options.table,
publicationName: this.options.publicationName,
});

return true;
}

Expand Down Expand Up @@ -459,6 +483,90 @@ export class LogicalReplicationClient {
return res.rows[0].exists;
}

async #validatePublicationConfiguration(): Promise<string | null> {
if (!this.client) {
return "Cannot validate publication configuration: client not connected";
}

// Check if the publication has the correct table
const tablesRes = await this.client.query(
`SELECT schemaname, tablename
FROM pg_publication_tables
WHERE pubname = '${this.options.publicationName}';`
);

const tables = tablesRes.rows;
const expectedTable = this.options.table;

// Check if the table is in the publication
const hasTable = tables.some(
(row) => row.tablename === expectedTable && row.schemaname === "public"
);

if (!hasTable) {
if (tables.length === 0) {
return `Publication '${this.options.publicationName}' exists but has NO TABLES configured. Expected table: "public.${expectedTable}". Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
} else {
const tableList = tables.map((t) => `"${t.schemaname}"."${t.tablename}"`).join(", ");
return `Publication '${this.options.publicationName}' exists but does not include the required table "public.${expectedTable}". Current tables: ${tableList}. Run: ALTER PUBLICATION ${this.options.publicationName} ADD TABLE "${expectedTable}";`;
}
}

// Check if the publication has the correct actions configured
if (this.options.publicationActions && this.options.publicationActions.length > 0) {
const actionsRes = await this.client.query(
`SELECT pubinsert, pubupdate, pubdelete, pubtruncate
FROM pg_publication
WHERE pubname = '${this.options.publicationName}';`
);

if (actionsRes.rows.length === 0) {
return `Publication '${this.options.publicationName}' not found when checking actions`;
}

const actualActions = actionsRes.rows[0];
const missingActions: string[] = [];

for (const action of this.options.publicationActions) {
switch (action) {
case "insert":
if (!actualActions.pubinsert) missingActions.push("insert");
break;
case "update":
if (!actualActions.pubupdate) missingActions.push("update");
break;
case "delete":
if (!actualActions.pubdelete) missingActions.push("delete");
break;
case "truncate":
if (!actualActions.pubtruncate) missingActions.push("truncate");
break;
}
}

if (missingActions.length > 0) {
const currentActions: string[] = [];
if (actualActions.pubinsert) currentActions.push("insert");
if (actualActions.pubupdate) currentActions.push("update");
if (actualActions.pubdelete) currentActions.push("delete");
if (actualActions.pubtruncate) currentActions.push("truncate");

return `Publication '${
this.options.publicationName
}' is missing required actions. Expected: [${this.options.publicationActions.join(
", "
)}], Current: [${currentActions.join(", ")}], Missing: [${missingActions.join(
", "
)}]. Run: ALTER PUBLICATION ${
this.options.publicationName
} SET (publish = '${this.options.publicationActions.join(", ")}');`;
}
}

// All validations passed
return null;
}

async #createSlot(): Promise<boolean> {
if (!this.client) {
this.events.emit("error", new LogicalReplicationClientError("Cannot create slot"));
Expand Down