Skip to content

Commit

Permalink
serialized keystore - support multiple concurrent join cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
nomilous committed Oct 30, 2016
1 parent 5181c61 commit 95a1fde
Show file tree
Hide file tree
Showing 12 changed files with 435 additions and 179 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ config = {
logLevel: 'info',
server: {
listen: '0.0.0.0:65535'
},
cluster: {
seed: false,
sync: {
timeout: 2000,
limit: 42
},
join: {
0: '1.2.3.4:65535',
1: '1.2.3.5:65535',
2: '1.2.3.6:65535'
}
}
}
```
Expand All @@ -52,3 +64,6 @@ Configure the server.

Host and port to listen.

#### config.cluster

...
3 changes: 1 addition & 2 deletions TODO.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
* loglevel as word
* resolve dns names before connect/listen
* store set moves sequence on replace
* stored values while sync arrive at sync target
* serialize store to prevent del and set racing
* store updates _next on all
* store delete
* ping pong sys socket to detect net segmentation (because not timeouts on store messages, relying on close promise reject)
* handle master fails immediately after join2
* handle master fails midway through stores sync (perhaps distribute sync)
* sync error should stop member
109 changes: 39 additions & 70 deletions lib/Cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const {VertexSocket} = require('vertex-transport');
const {createWord} = require('vertex-names');
const {format} = require('util');

const Distributor = require('./Distributor');
const KeyStore = require('./KeyStore');
const Member = require('./Member');
const {isInt, isntSelf} = require('./utils');
const {VertexConfigError, VertexJoinError} = require('./errors');
Expand Down Expand Up @@ -37,7 +37,7 @@ class Cluster extends EventEmitter {
});

Object.defineProperty(this, '_waitingMaster', {value: []});
Object.defineProperty(this, '_stores', {value: new Distributor(this)});
Object.defineProperty(this, '_stores', {value: new KeyStore(this)});
Object.defineProperty(this, '_memberStore', {value: this._stores.createStore('members')});
Object.defineProperty(this, '_members', {value: {}});
Object.defineProperty(this, '_waitingReady', {value: null, writable: true});
Expand All @@ -49,6 +49,7 @@ class Cluster extends EventEmitter {
enumerable: false
});

Object.defineProperty(this, '_noConnect', {value: true, writable: true});
Object.defineProperty(this, '_onMemberSetListener', {value: this._onMemberSet.bind(this)});
Object.defineProperty(this, '_onMemberDelListener', {value: this._onMemberDel.bind(this)});
this._memberStore.on('set', this._onMemberSetListener);
Expand All @@ -64,7 +65,7 @@ class Cluster extends EventEmitter {
.then(() => {
this._waitingReady = (error) => {
if (error) return reject(error);
this.log.info('done sync cluster at %d members', Object.keys(this.members).length);
this.log.info('done sync at %d members', Object.keys(this.members).length);
resolve();
}
})
Expand All @@ -78,6 +79,7 @@ class Cluster extends EventEmitter {
Object.keys(this.$handlers).forEach(key => delete this.$handlers[key]);
this._memberStore.removeListener('set', this._onMemberSetListener);
this._memberStore.removeListener('del', this._onMemberDelListener);
this._stores.stop();

// TODO: delete members

Expand Down Expand Up @@ -108,6 +110,7 @@ class Cluster extends EventEmitter {
this.$handlers[constants.CLUSTER_HANDLER_JOIN_2] = this._onJoin2.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_JOIN_3] = this._onJoin3.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_JOIN_4] = this._onJoin4.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_JOIN_5] = this._onJoin5.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_SYNC_1] = this._onSync1.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_STORE_1] = this._onStore1.bind(this);
this.$handlers[constants.CLUSTER_HANDLER_STORE_2] = this._onStore2.bind(this);
Expand All @@ -132,9 +135,12 @@ class Cluster extends EventEmitter {
this.emit('member/remove', member.name, member);
}
}
this._startedMaybe();
}

// TODO: member.error(s)

_startedMaybe() {
// TODO: member.error(s)
if (this._ready) {
if (this._waitingReady) {
this._waitingReady();
Expand All @@ -145,6 +151,13 @@ class Cluster extends EventEmitter {


_onMemberSet(name, member) {
if (name == this._vertex.name) {
// Members are emitted in the order they were created (store seq),
// ignore members that existed before me,
// they join to me, not me to them, the sockets go both ways
this._noConnect = false;
}

if (this._members[member.name]) {
this._members[member.name]._addMemberRecord(member);
return;
Expand Down Expand Up @@ -282,9 +295,6 @@ class Cluster extends EventEmitter {
reply('join2', new Promise((resolve, reject) => {
this._memberStore.set(data.name, data)
.then(result => {

console.log('SET RESULT', result);

resolve();
})
.catch(reject)
Expand All @@ -294,6 +304,7 @@ class Cluster extends EventEmitter {

_onJoin3(session, data, meta, reply) {
var address = session.socket.remoteAddress();
let member = this._members[data.name];

this.log.debug(
'join3 (sys) attempt <- %s:%d %s',
Expand All @@ -302,8 +313,8 @@ class Cluster extends EventEmitter {
data.name
);

if (this._members[data.name]) {
this._members[data.name]._addSysSocket(session.socket);
if (member) {
member._addSysSocket(session.socket);
return;
}

Expand All @@ -313,75 +324,42 @@ class Cluster extends EventEmitter {

_onJoin4(session, data, meta, reply) {
var address = session.socket.remoteAddress();

this.log.debug(
'join4 (usr) attempt <- %s:%d %s',
address.address,
address.port,
data.name
);

if (!this._members[data.name]) {
return reply('nak', new Error('Missing member record'));
}

this._members[data.name]._addUsrSocket(session.socket);
reply('join4', {master: this._master});
}


_syncStores(master) {
return new Promise((resolve, reject) => {
this.log.debug('syncing stores');

let bytes = 0;
let getListPartsRecurse = (next) => {
master.socket.send(this._sync1Payload(next))
.then(result => {
if (!result.sync1 && typeof result.sync1.next !== 'number') {
return reject(new VertexJoinError('Unexpected sync response'));
}
bytes += result.meta.len;
this._stores._syncIn(result.sync1);
if (result.sync1.last) {
this.log.info('done sync stores at %d bytes', bytes);
this._stores._ready = true;
if (this._ready) {
if (this._waitingReady) {
this._waitingReady();
this._waitingReady = null;
}
}
return resolve();
}
getListPartsRecurse(result.sync1.next);
})
.catch(reject)
};

getListPartsRecurse(0);
});
_onJoin5(session, data, meta, reply) {
let member = this._members[data.name];
var address = session.socket.remoteAddress();
this.log.debug(
'join5 (usr) attempt <- %s:%d %s',
address.address,
address.port,
data.name
);
member._joined = true;
if (member.socket) {
member._updated(member);
return;
}
member._addUsrSocket(session.socket);
}


_onSync1(session, data, meta, reply) {
// TODO: if not master, redirect to master
// (only necessary if master is demoted, joining members already
// syncs to the master it joined to)
if (typeof data.next != 'number') {
return reply('nak', new Error('Missing next'));
}

let limit = 3; // TODO: configurable...
let replyData = this._stores._syncOut(data.next, limit);

reply('sync1', replyData);
session.local.syncBytes = session.local.syncBytes || 0;
session.local.syncBytes += meta.len;
this._stores._syncIn(data, session.local);
}


_onStore1(session, update, meta, reply) {
// let {store, act, key, value} = update;
reply('store1', this._stores._doUpdateDistribute(update));
reply('store1', this._stores._updateDistribute(update));
}


Expand Down Expand Up @@ -443,15 +421,6 @@ class Cluster extends EventEmitter {
}


_sync1Payload(nextInSequence = 0) {
return [
this._payloadHeader(constants.CLUSTER_HANDLER_SYNC_1), {
next: nextInSequence
}
]
}


_defaults() {
if (!this._vertex.server) {
throw new VertexConfigError('Cluster requires server');
Expand Down

0 comments on commit 95a1fde

Please sign in to comment.