Large diffs are not rendered by default.

@@ -1,12 +1,12 @@
{
"name": "mysqle",
"description": "Mysql events.",
"version": "1.0.0",
"name": "stomatopod",
"description": "Spear events from mysql binlog.",
"version": "1.0.0-ALPHA",
"main": "index.js",
"license": "BSD-3-Clause",
"repository": {
"type": "git",
"url": "https://github.com/rodrigogs/mysqle"
"url": "https://github.com/rodrigogs/stomatopod"
},
"keywords": [],
"scripts": {
@@ -15,7 +15,15 @@
"test": "mocha"
},
"dependencies": {
"axios": "^0.18.0",
"debuggler": "^1.0.0",
"dotenv": "^6.0.0",
"koa": "^2.5.1",
"koa-bodyparser": "^4.2.1",
"koa-router": "^7.4.0",
"lowdb": "^1.0.0",
"mkdirp": "^0.5.1",
"mysql": "^2.15.0",
"mysql-events": "^0.0.11"
},
"devDependencies": {
@@ -0,0 +1,11 @@
const MEM_STORAGE = {};

const Cache = {
get: key => MEM_STORAGE[key],

set: (key, value) => {
MEM_STORAGE[key] = value;
},
};

module.exports = Cache;
@@ -0,0 +1,22 @@
const debug = require('debuggler')();

const os = require('os');
const fs = require('fs');
const path = require('path');
const low = require('lowdb');
const mkdirp = require('mkdirp');
const FileAsync = require('lowdb/adapters/FileAsync');

const dbDir = path.join(os.homedir(), 'stomatopod');
const dbFile = path.join(dbDir, 'db.json');

if (!fs.existsSync(dbDir)) {
debug('creating directory', dbDir);
mkdirp.sync(dbDir);
}

debug('initiating lowdb on', dbFile);
const adapter = new FileAsync(dbFile);
const db = low(adapter);

module.exports = db;
@@ -0,0 +1,60 @@
const path = require('path');

/**
* @see https://github.com/motdotla/dotenv#usage
*/
if (process.env.NODE_ENV === 'test') {
require('dotenv').config({ path: path.resolve(__filename, '../../.process.env.test') });
} if (process.env.NODE_ENV !== 'production') {
require('dotenv').config({ path: path.resolve(__filename, '../../.env') });
}

class Env {
/**
* @default 'development'
* @return {String}
*/
static get NODE_ENV() {
return process.env.NODE_ENV || 'development';
}

/**
* @default 3000
* @return {Number}
*/
static get PORT() {
return process.env.PORT || 3000;
}

/**
* @default 'localhost'
* @return {String}
*/
static get MYSQL_HOST() {
return process.env.MYSQL_HOST || 'localhost';
}

/**
* @default 3306
* @return {Number}
*/
static get MYSQL_PORT() {
return process.env.MYSQL_PORT || 3306;
}

/**
* @return {String}
*/
static get MYSQL_USER() {
return process.env.MYSQL_USER;
}

/**
* @return {String}
*/
static get MYSQL_PASSWORD() {
return process.env.MYSQL_PASSWORD;
}
}

module.exports = Env;
@@ -0,0 +1 @@
module.exports = require('./web');
@@ -0,0 +1,30 @@
const debug = require('debuggler')();
const low = require('./db');
const watch = require('./watch');

const init = async () => {
const db = await low;
await db
.defaults({
watchers: [],
})
.write();

const watchers = await db
.get('watchers')
.value();

if (!watchers) return;

return Promise.all(watchers.map(async (watcher) => {
if (!watcher.destinations) return;

return Promise.all(watcher.destinations.map(async (destination) => {
debug('initializing watcher for expression', watcher.expression, 'destination', destination);

// await watch(watcher.expression, destination);
}));
}));
};

module.exports = init;
@@ -0,0 +1,21 @@
const debug = require('debuggler')();
const MySQLEvents = require('mysql-events');

const {
MYSQL_HOST: host,
MYSQL_PORT: port,
MYSQL_USER: user,
MYSQL_PASSWORD: password,
} = require('./env');

const options = {
host,
port,
user,
password,
};

debug('connecting to', `${options.host}:${options.port}`);
const Shrimp = MySQLEvents(options);

module.exports = Shrimp;
@@ -0,0 +1,66 @@
const debug = require('debuggler')();
const low = require('./db');
const Shrimp = require('./shrimp');
const Cache = require('./cache');

const unwatch = async (expression, destination) => {
if (!expression) {
const err = new Error('Bad Request');
err.status = 400;
throw err;
}

const db = await low;

debug(
'removing watcher for expression', expression,
'for destination', destination,
);

if (!destination) {
await db.get('watchers').remove({ expression }).write();
Shrimp.remove(expression);
Cache.set(expression, undefined);
return;
}

let watcher = await db.get('watchers').find({ expression }).value();
if (watcher) {
debug('watcher exists for expression', expression);

const index = watcher.destinations.indexOf(destination);
if (index === -1) {
const err = new Error(`Destination ${destination} is not registered for watcher ${expression}`);
err.status = 404;
throw err;
}

watcher = await db.get('watchers')
.find({ expression })
.update('destinations', (dests) => {
dests.splice(index, 1);
return dests;
})
.write();

debug('removed destination', destination, 'for watcher');

if (watcher.destinations.length === 0) {
debug('no remaining destinations for watcher with expression', expression);
await db.get('watchers')
.remove({ expression })
.write();

return;
}

Cache.set(expression, watcher);
return watcher;
}

await db.get('watchers').remove({ expression }).write();
Shrimp.remove(expression);
Cache.set(expression, undefined);
};

module.exports = unwatch;
@@ -0,0 +1,65 @@
const debug = require('debuggler')();
const low = require('./db');
const Shrimp = require('./shrimp');
const Cache = require('./cache');
const axios = require('axios');

const watch = async (expression, destination) => {
if (!expression || !destination) {
const err = new Error('Bad Request');
err.status = 400;
throw err;
}

const db = await low;

debug(
'adding watcher for expression', expression,
'for destination', destination,
);

let watcher = await db.get('watchers').find({ expression }).value();
if (watcher) {
debug('watcher already exists for expression', expression);

const hasDest = watcher.destinations.indexOf(destination) !== -1;
if (hasDest) {
debug('watcher already has destination', destination);
return watcher;
}

debug('adding destination', destination, 'for watcher');
watcher = await db.get('watchers')
.find({ expression })
.update('destinations', dests => dests.push(destination))
.write();

return watcher;
}

debug('creating watcher');
watcher = await db
.get('watchers')
.push({ expression, destinations: [destination] })
.find({ expression })
.write();

Cache.set(expression, JSON.stringify(watcher));

Shrimp.add(expression, async (oldRow, newRow, event) => {
const watcher = JSON.parse(Cache.get(expression));
if (!watcher) return debug('found orphan watcher for expression', expression);

await Promise.all(watcher.destinations.map(async (dest) => {
axios.post(dest, {
oldRow,
newRow,
event,
});
}));
});

return watcher;
};

module.exports = watch;
@@ -0,0 +1,59 @@
const debug = require('debuggler')();

const Koa = require('koa');
const Router = require('koa-router');
const bodyParser = require('koa-bodyparser');
const watch = require('./watch');
const unwatch = require('./unwatch');
const init = require('./init');

const Application = async () => {
debug('bootstrapping application');

await init();

const app = new Koa();
const router = new Router();

app.use(bodyParser({
jsonLimit: '10mb',
}));

router.post('/watch', async (ctx) => {
const {
expression,
destination,
} = ctx.request.body;

const watcher = await watch(expression, destination);
if (!watcher) {
ctx.status = 204;
return;
}

ctx.status = 200;
ctx.body = watcher;
});

router.post('/unwatch', async (ctx) => {
const {
expression,
destination,
} = ctx.request.body;

const watcher = await unwatch(expression, destination);
if (!watcher) {
ctx.status = 204;
return;
}

ctx.status = 200;
ctx.body = watcher;
});

app.use(router.routes(), router.allowedMethods());

return app;
};

module.exports = Application;
@@ -6,7 +6,7 @@ before(() => {
chai.should();
});

describe('mysqle', () => {
describe('stomatopod', () => {

it('should do something', () => {
});