From 8c89a2be65b09bcc26ceb8a748bc555ff12c629d Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 7 Jan 2025 17:19:50 +0200 Subject: [PATCH 1/3] Added heartbeat to mysql binlog listener connection. --- .../src/replication/BinLogStream.ts | 29 +++++++++++++++++-- .../src/replication/zongji/zongji.d.ts | 10 +++++++ 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 65f0ced80..6251eb5e3 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -476,11 +476,36 @@ AND table_type = 'BASE TABLE';`, return; } - logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + await new Promise((resolve, reject) => { + zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + function (error: any, results: any, fields: any) { + if (error) { + reject(error); + } else { + resolve(results); + } + } + ); + }); + logger.info('Successfully set up replication connection heartbeat...'); + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = zongji.connection._socket!; + socket.setTimeout(60_000, () => { + socket.destroy(new Error('Replication connection timeout.')); + }); + + logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); zongji.start({ + // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the collection alive includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], excludeEvents: [], includeSchema: { [this.defaultSchema]: includedTables }, @@ -492,7 +517,7 @@ AND table_type = 'BASE TABLE';`, // Forever young await new Promise((resolve, reject) => { zongji.on('error', (error) => { - logger.error('Error on Binlog listener:', error); + logger.error('Binlog listener error:', error); zongji.stop(); queue.kill(); reject(error); diff --git a/modules/module-mysql/src/replication/zongji/zongji.d.ts b/modules/module-mysql/src/replication/zongji/zongji.d.ts index 9a17f15e9..f5640497e 100644 --- a/modules/module-mysql/src/replication/zongji/zongji.d.ts +++ b/modules/module-mysql/src/replication/zongji/zongji.d.ts @@ -1,4 +1,6 @@ declare module '@powersync/mysql-zongji' { + import { Socket } from 'net'; + export type ZongjiOptions = { host: string; user: string; @@ -108,7 +110,15 @@ declare module '@powersync/mysql-zongji' { export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent; + // @vlasky/mysql Connection + export interface MySQLConnection { + _socket?: Socket; + /** There are other forms of this method as well - this is the most basic one. */ + query(sql: string, callback: (error: any, results: any, fields: any) => void): void; + } + export default class ZongJi { + connection: MySQLConnection; constructor(options: ZongjiOptions); start(options: StartOptions): void; From 16c383901f9ad25d707464317c4f4df3bcb79fa2 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 7 Jan 2025 19:51:49 +0200 Subject: [PATCH 2/3] Update modules/module-mysql/src/replication/BinLogStream.ts Fixed comment Co-authored-by: Ralf Kistner --- modules/module-mysql/src/replication/BinLogStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 6251eb5e3..483f292f9 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -505,7 +505,7 @@ AND table_type = 'BASE TABLE';`, // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); zongji.start({ - // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the collection alive + // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], excludeEvents: [], includeSchema: { [this.defaultSchema]: includedTables }, From 2e33b35d62a510892cd9af1c650ac7dc2215a595 Mon Sep 17 00:00:00 2001 From: Roland Teichert Date: Tue, 7 Jan 2025 19:52:43 +0200 Subject: [PATCH 3/3] Added changeset --- .changeset/grumpy-cameras-breathe.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/grumpy-cameras-breathe.md diff --git a/.changeset/grumpy-cameras-breathe.md b/.changeset/grumpy-cameras-breathe.md new file mode 100644 index 000000000..264e7be63 --- /dev/null +++ b/.changeset/grumpy-cameras-breathe.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mysql': patch +--- + +Added a heartbeat mechanism to the MySQL binlog listener replication connection to detect connection timeouts.