Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gentle-icons-try.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-mysql': patch
---

Fixed MySQL version checking to better handle non-semantic version strings
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ jobs:
strategy:
fail-fast: false
matrix:
mysql-version: [8.0, 8.4]
mysql-version: [5.7, 8.0, 8.4]

steps:
- uses: actions/checkout@v4
Expand All @@ -167,7 +167,8 @@ jobs:
-d mysql:${{ matrix.mysql-version }} \
--log-bin=/var/lib/mysql/mysql-bin.log \
--gtid_mode=ON \
--enforce_gtid_consistency=ON
--enforce_gtid_consistency=ON \
--server-id=1

- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.8.0
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
import mysql from 'mysql2/promise';
import * as common from '../common/common-index.js';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';
import * as types from '../types/types.js';
import { toExpressionTypeFromMySQLType } from '../common/common-index.js';

Expand Down Expand Up @@ -326,7 +326,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
name: result.schema_name,
tables: []
});

const columns = JSON.parse(result.columns).map((column: { data_type: string; column_name: string }) => ({
name: column.column_name,
type: column.data_type,
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/src/common/ReplicatedGTID.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import mysql from 'mysql2/promise';
import * as uuid from 'uuid';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

export type BinLogPosition = {
filename: string;
Expand Down
19 changes: 9 additions & 10 deletions modules/module-mysql/src/common/check-source-configuration.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

const MIN_SUPPORTED_VERSION = '5.7.0';

export async function checkSourceConfiguration(connection: mysqlPromise.Connection): Promise<string[]> {
const errors: string[] = [];

const version = await mysql_utils.getMySQLVersion(connection);
if (!mysql_utils.isVersionAtLeast(version, MIN_SUPPORTED_VERSION)) {
errors.push(`MySQL versions older than ${MIN_SUPPORTED_VERSION} are not supported. Your version is: ${version}.`);
}

const [[result]] = await mysql_utils.retriedQuery({
connection,
query: `
Expand Down Expand Up @@ -48,12 +56,3 @@ export async function checkSourceConfiguration(connection: mysqlPromise.Connecti

return errors;
}

export async function getMySQLVersion(connection: mysqlPromise.Connection): Promise<string> {
const [[versionResult]] = await mysql_utils.retriedQuery({
connection,
query: `SELECT VERSION() as version`
});

return versionResult.version as string;
}
2 changes: 1 addition & 1 deletion modules/module-mysql/src/common/get-replication-columns.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { storage } from '@powersync/service-core';
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

export type GetReplicationColumnsOptions = {
connection: mysqlPromise.Connection;
Expand Down
14 changes: 5 additions & 9 deletions modules/module-mysql/src/common/read-executed-gtid.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import { gte } from 'semver';

import * as mysql_utils from '../utils/mysql-utils.js';
import { ReplicatedGTID } from './ReplicatedGTID.js';
import { getMySQLVersion } from './check-source-configuration.js';

/**
* Gets the current master HEAD GTID
*/
export async function readExecutedGtid(connection: mysqlPromise.Connection): Promise<ReplicatedGTID> {
const version = await getMySQLVersion(connection);
const version = await mysql_utils.getMySQLVersion(connection);

let binlogStatus: mysqlPromise.RowDataPacket;
if (gte(version, '8.4.0')) {
// Get the BinLog status
if (mysql_utils.isVersionAtLeast(version, '8.4.0')) {
// Syntax for the below query changed in 8.4.0
const [[binLogResult]] = await mysql_utils.retriedQuery({
connection,
query: `SHOW BINARY LOG STATUS`
});
binlogStatus = binLogResult;
} else {
// TODO Check if this works for version 5.7
// Get the BinLog status
const [[binLogResult]] = await mysql_utils.retriedQuery({
connection,
query: `SHOW MASTER STATUS`
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as zongji_utils from './zongji/zongji-utils.js';
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js';
import mysqlPromise from 'mysql2/promise';
import { createRandomServerId } from '../utils/mysql_utils.js';
import { createRandomServerId } from '../utils/mysql-utils.js';

export interface BinLogStreamOptions {
connections: MySQLConnectionManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NormalizedMySQLConnectionConfig } from '../types/types.js';
import mysqlPromise from 'mysql2/promise';
import mysql, { FieldPacket, RowDataPacket } from 'mysql2';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';
import ZongJi from '@powersync/mysql-zongji';
import { logger } from '@powersync/lib-services-framework';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { logger } from '@powersync/lib-services-framework';
import mysql from 'mysql2';
import mysqlPromise from 'mysql2/promise';
import * as types from '../types/types.js';
import { coerce, gte } from 'semver';

export type RetriedQueryOptions = {
connection: mysqlPromise.Connection;
Expand Down Expand Up @@ -60,3 +61,24 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option
export function createRandomServerId(syncRuleId: number): number {
return Number.parseInt(`${syncRuleId}00${Math.floor(Math.random() * 10000)}`);
}

export async function getMySQLVersion(connection: mysqlPromise.Connection): Promise<string> {
const [[versionResult]] = await retriedQuery({
connection,
query: `SELECT VERSION() as version`
});

return versionResult.version as string;
}

/**
* Check if the current MySQL version is newer or equal to the target version.
* @param version
* @param minimumVersion
*/
export function isVersionAtLeast(version: string, minimumVersion: string): boolean {
const coercedVersion = coerce(version);
const coercedMinimumVersion = coerce(minimumVersion);

return gte(coercedVersion!, coercedMinimumVersion!, { loose: true });
}
76 changes: 25 additions & 51 deletions modules/module-mysql/test/src/BinLogStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
import { BucketStorageFactory, Metrics } from '@powersync/service-core';
import { describe, expect, test } from 'vitest';
import { binlogStreamTest } from './BinlogStreamUtils.js';
import { v4 as uuid } from 'uuid';

type StorageFactory = () => Promise<BucketStorageFactory>;

Expand Down Expand Up @@ -32,9 +33,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
data:
- SELECT id, description, num FROM "test_data"`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, num BIGINT)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`);

await context.replicateSnapshot();

Expand All @@ -44,11 +43,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {
(await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0;

context.startStreaming();
await connectionManager.query(`INSERT INTO test_data(description, num) VALUES('test1', 1152921504606846976)`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1' AND num = 1152921504606846976`
const testId = uuid();
await connectionManager.query(
`INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)`
);
const testId = result.test_id;
const data = await context.getBucketData('global[]');

expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]);
Expand All @@ -71,9 +69,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
- SELECT id, description FROM "test_DATA"
`);

await connectionManager.query(
`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();

Expand All @@ -84,11 +80,8 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

await connectionManager.query(`INSERT INTO test_DATA(description) VALUES('test1')`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_DATA WHERE description = 'test1'`
);
const testId = result.test_id;
const testId = uuid();
await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`);

const data = await context.getBucketData('global[]');

Expand Down Expand Up @@ -144,24 +137,18 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();
context.startStreaming();

await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`);
const [[result1]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1'`
);
const testId1 = result1.test_id;
const testId1 = uuid();
await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`);

await connectionManager.query(`UPDATE test_data SET id = UUID(), description = 'test2a' WHERE id = '${testId1}'`);
const [[result2]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test2a'`
const testId2 = uuid();
await connectionManager.query(
`UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'`
);
const testId2 = result2.test_id;

// This update may fail replicating with:
// Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"}
Expand Down Expand Up @@ -192,15 +179,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`);

await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1'`
);
const testId = result.test_id;
const testId = uuid();
await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`);

await context.replicateSnapshot();

Expand All @@ -221,16 +203,13 @@ function defineBinlogStreamTests(factory: StorageFactory) {
`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
);

const testId = uuid();
await connectionManager.query(`
INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'testDates'`
);
const testId = result.test_id;

await context.replicateSnapshot();

Expand Down Expand Up @@ -259,7 +238,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
);

await context.replicateSnapshot();
Expand All @@ -271,13 +250,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

const testId = uuid();
await connectionManager.query(`
INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'testDates'`
);
const testId = result.test_id;

const data = await context.getBucketData('global[]');
expect(data).toMatchObject([
Expand All @@ -303,9 +279,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();

Expand All @@ -316,7 +290,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

await connectionManager.query(`INSERT INTO test_donotsync(description) VALUES('test1')`);
await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`);
const data = await context.getBucketData('global[]');

expect(data).toMatchObject([]);
Expand Down
17 changes: 17 additions & 0 deletions modules/module-mysql/test/src/mysql-utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { describe, expect, test } from 'vitest';
import { isVersionAtLeast } from '@module/utils/mysql-utils.js';

describe('MySQL Utility Tests', () => {
test('Minimum version checking ', () => {
const newerVersion = '8.4.0';
const olderVersion = '5.7';
const sameVersion = '8.0';
// Improperly formatted semantic versions should be handled gracefully if possible
const improperSemver = '5.7.42-0ubuntu0.18.04.1-log';

expect(isVersionAtLeast(newerVersion, '8.0')).toBeTruthy();
expect(isVersionAtLeast(sameVersion, '8.0')).toBeTruthy();
expect(isVersionAtLeast(olderVersion, '8.0')).toBeFalsy();
expect(isVersionAtLeast(improperSemver, '5.7')).toBeTruthy();
});
});
9 changes: 2 additions & 7 deletions modules/module-mysql/test/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import { BucketStorageFactory, Metrics, MongoBucketStorage } from '@powersync/se
import { env } from './env.js';
import mysqlPromise from 'mysql2/promise';
import { connectMongo } from '@core-tests/util.js';
import { getMySQLVersion } from '@module/common/check-source-configuration.js';
import { gte } from 'semver';
import { getMySQLVersion, isVersionAtLeast } from '@module/utils/mysql-utils.js';

export const TEST_URI = env.MYSQL_TEST_URI;

Expand Down Expand Up @@ -38,7 +37,7 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => {

export async function clearTestDb(connection: mysqlPromise.Connection) {
const version = await getMySQLVersion(connection);
if (gte(version, '8.4.0')) {
if (isVersionAtLeast(version, '8.4.0')) {
await connection.query('RESET BINARY LOGS AND GTIDS');
} else {
await connection.query('RESET MASTER');
Expand All @@ -55,7 +54,3 @@ export async function clearTestDb(connection: mysqlPromise.Connection) {
}
}
}

export function connectMySQLPool(): mysqlPromise.Pool {
return mysqlPromise.createPool(TEST_URI);
}
Loading