Skip to content

Commit

Permalink
merged changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
wesone committed Aug 5, 2021
2 parents 9041c83 + 9a8f7e4 commit 598ae7d
Show file tree
Hide file tree
Showing 5 changed files with 219 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- New option `index` for ReadModelStore.defineTable scheme, to allow B-Tree and FULLTEXT indexing of fields
- Database reconnect on connection loss for eventstore-mysql and readmodelstore-mysql

## [1.1.2] - 2021-05-05
### Changed
Expand Down
40 changes: 36 additions & 4 deletions src/adapters/eventstore-mysql/Adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,17 @@ class Adapter extends EventStoreAdapterInterface
async init()
{
await this.createDatabase();
await this.connect();
await this.createTable();
}

async connect()
{
this.db = await mysql.createConnection(this.config);
this.db.on('error', err => {
this.errorHandler(err, false);
});
await this.db.connect();
await this.createTable();
// see https://github.com/sidorares/node-mysql2/issues/1239
//========MySQL 8.0.22 (and higher) fix========
const originalExecute = this.db.execute.bind(this.db);
Expand All @@ -134,11 +142,25 @@ class Adapter extends EventStoreAdapterInterface
//========/========
}

async execute(sql, values)
{
try
{
if(!this.db)
await this.connect();
return this.db.execute(sql, values);
}
catch(err)
{
this.errorHandler(err);
}
}

async save(event)
{
try
{
const result = await this.db.execute(
const result = await this.execute(
`INSERT INTO events (${Object.keys(event).join(',')}) VALUES (${Object.keys(event).map(() => '?').join(',')})`,
Object.values(event));
return result[0].insertId;
Expand All @@ -147,7 +169,6 @@ class Adapter extends EventStoreAdapterInterface
{
if(e.errno === 1062)
return false;
throw e;
}
}

Expand Down Expand Up @@ -204,7 +225,7 @@ class Adapter extends EventStoreAdapterInterface
}
}
const toExecute = `SELECT * FROM events WHERE ${where.join(' AND ')} ORDER BY position ASC ${limit.join(' ')}`;
const events = await this.db.execute(toExecute, values);
const events = await this.execute(toExecute, values);
return {
events: events[0].map(event => {
event.payload = JSON.parse(event.payload);
Expand Down Expand Up @@ -318,6 +339,17 @@ class Adapter extends EventStoreAdapterInterface
index.forEach(field => queryParts.push(`, INDEX USING BTREE (${field})`));
return queryParts.join(' ');
}

errorHandler(err, shouldThrow = true){
if(err.fatal)
{
this.db = null;
}
if(shouldThrow)
throw err;
else
console.error(err);
}
}

module.exports = Adapter;
29 changes: 26 additions & 3 deletions src/adapters/readmodelstore-mysql/Adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ class Adapter extends ReadModelStoreAdapterInterface
);
}

errorHandler(err, shouldThrow = true){
if(err.fatal)
{
this.connection = null;
this._conn = null;
}
if(shouldThrow)
throw err;
else
console.error(err);
}

async checkConnection()
{
if(!this.connection)
Expand All @@ -67,6 +79,9 @@ class Adapter extends ReadModelStoreAdapterInterface
return conn;
});
this.connection = await this._conn;
this.connection.on('error', err => {
this.errorHandler(err, false);
});
}

async disconnect()
Expand All @@ -75,6 +90,7 @@ class Adapter extends ReadModelStoreAdapterInterface
{
await this.connection.end();
this.connection = null;
this._conn = null;
}
}

Expand All @@ -92,9 +108,16 @@ class Adapter extends ReadModelStoreAdapterInterface

async exec(sql, parameters)
{
await this.checkConnection();
this.printDebugStatemant(sql, parameters);
return this.connection.execute(sql, parameters);
try
{
await this.checkConnection();
this.printDebugStatemant(sql, parameters);
return this.connection.execute(sql, parameters);
}
catch(err)
{
this.errorHandler(err);
}
}

getAffectedCount([results])
Expand Down
70 changes: 69 additions & 1 deletion test/adapters/eventstore-mysql/Adapter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ const Adapter = require('../../../src/adapters/eventstore-mysql/Adapter');
const {EVENT_LIMIT_REPLAY} = require('../../../src/core/Constants');
const Event = require('../../../src/core/Event');
const mysql = require('mysql2/promise');
let errorCallback;
jest.mock('mysql2/promise', () => {
const mockConnect = jest.fn();
const mockExecute = jest.fn();
const mockEnd = jest.fn();
const mockOn = jest.fn((name, cb) => {
if(name === 'error')
errorCallback = cb;
});

const object = { // Object to spy on
mockConnect, mockExecute
Expand All @@ -14,7 +19,8 @@ jest.mock('mysql2/promise', () => {
return {
connect: object.mockConnect,
execute: object.mockExecute, // does not get executed in init() but still needed for a bind
end: mockEnd
end: mockEnd,
on: mockOn
};
});
return {
Expand Down Expand Up @@ -326,6 +332,68 @@ describe('Test save', () => {
expect(spyExecute).toHaveBeenCalled();
});
});
test('Throw error and reconnect', async () => {
const testObj = new Adapter(testConfig);
const error = new Error('test error');
error.fatal = true;
const mockConnection = {execute: jest.fn(() => {throw error;})};
testObj.db = mockConnection;
const data = {
aggregateId: '001',
aggregateVersion: 0,
type: 'USER_UPDATED',
correlationId: '111',
causation: '100',
payload: 'TEST'
};
const testEvent = new Event(data);

try
{
await testObj.save(testEvent);
}
catch(error)
{
expect(error).not.toBe(undefined);
}

const spyExecute = jest.spyOn(mockConnection, 'execute');
expect(spyExecute).toHaveBeenCalled();

expect(testObj.db).toBe(null);

});


test('Reconnect on connection loss', async () => {
const error = new Error('test error');
error.fatal = true;
errorCallback = null;
const testObj = new Adapter(testConfig);

await testObj.connect();

expect(errorCallback).not.toBe(null);
const originalError = console.error;
console.error = jest.fn();

errorCallback(error);
expect(console.error).toHaveBeenCalled();
console.error = originalError;
});


test('Reconnect on execute', async () => {
const testObj = new Adapter(testConfig);

testObj.connect = jest.fn(() => testObj.db = { execute: jest.fn()});
testObj.db = null;

await testObj.execute();

expect(testObj.db).not.toBe(null);

});

describe('Test load', () => {
test('Check for correct loading of events', async () => {
Expand Down
87 changes: 87 additions & 0 deletions test/adapters/readmodelstore-mysql/Reconnect.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import Adapter from '../../../src/adapters/readmodelstore-mysql/Adapter';
let errorCallback;
jest.mock('mysql2/promise', () => {
const mockConnect = jest.fn();
const mockExecute = jest.fn();
const mockEnd = jest.fn();
const mockOn = jest.fn((name, cb) => {
if(name === 'error')
errorCallback = cb;
});

const object = { // Object to spy on
mockConnect, mockExecute
};
const mockCreateConnection = jest.fn(() => {
return {
connect: object.mockConnect,
execute: object.mockExecute, // does not get executed in init() but still needed for a bind
end: mockEnd,
on: mockOn
};
});
return {
createConnection: () => Promise.resolve((mockCreateConnection()))
};
});

const testConfig = {
host: 'localhost',
database: 'eventstore',
user: 'root',
password: '1234'
};

test('Throw error and reconnect', async () => {
const testObj = new Adapter(testConfig);
const error = new Error('test error');
error.fatal = true;
const mockConnection = {execute: jest.fn(() => {throw error;})};
testObj.connection = mockConnection;

try
{
await testObj.find('test', {id: 1});
}
catch(error)
{
expect(error).not.toBe(undefined);
}

const spyExecute = jest.spyOn(mockConnection, 'execute');
expect(spyExecute).toHaveBeenCalled();

expect(testObj.connection).toBe(null);

});


test('Reconnect on connection loss', async () => {
const error = new Error('test error');
error.fatal = true;
errorCallback = null;
const testObj = new Adapter(testConfig);

await testObj.connect();

expect(errorCallback).not.toBe(null);
const originalError = console.error;
console.error = jest.fn();

errorCallback(error);
expect(console.error).toHaveBeenCalled();
console.error = originalError;
});


test('Reconnect on execute', async () => {
const testObj = new Adapter(testConfig);

testObj.connect = jest.fn(() => testObj.connection = { execute: jest.fn()});
testObj.connection = null;

await testObj.findOne('test', {id: 1});

expect(testObj.connection).not.toBe(null);

});

0 comments on commit 598ae7d

Please sign in to comment.