Skip to content

Commit

Permalink
stores and mambers onto vertex, stores emit meta with seq
Browse files Browse the repository at this point in the history
  • Loading branch information
nomilous committed Nov 6, 2016
1 parent 75fe038 commit 22e6d66
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 39 deletions.
28 changes: 17 additions & 11 deletions lib/Cluster.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
const deepcopy = require('deepcopy');
const {EventEmitter} = require('events');
const {VertexSocket} = require('vertex-transport');
const {createWord} = require('vertex-names');
const {format} = require('util');
Expand All @@ -10,14 +9,15 @@ const {isInt, isntSelf} = require('./utils');
const {VertexError, VertexConfigError, VertexJoinError} = require('./errors');
const constants = require('./constants');

class Cluster extends EventEmitter {
class Cluster {

constructor(vertex, name, config = {}) {
super();

Object.defineProperty(this, 'config', {value: deepcopy(config)});
Object.defineProperty(this, '_vertex', {value: vertex});

this._vertex.members = {};
this._vertex.stores = {};

this._defaults();

this.name = config.name || null;
Expand Down Expand Up @@ -131,13 +131,15 @@ class Cluster extends EventEmitter {
this.members[member.name] = member;
this.log.info('added member %s - now %d',
member.name, Object.keys(this.members).length);
this.emit('member/add', member.name, member);
this._vertex.members[member.name] = member;
this._vertex.emit('members/add', member.name, member);
}
} else {
if (this.members[member.name]) {
delete this.members[member.name];
this.log.debug('quarantine member %s', member.name);
this.emit('member/remove', member.name, member);
delete this._vertex.members[member.name];
this._vertex.emit('members/remove', member.name, member);
this._deleteMember(member);
}
}
Expand All @@ -151,7 +153,7 @@ class Cluster extends EventEmitter {
// - if no consensus is reached that the member is gone then
// this vertex is assumed malfunctioning and stopped!
// - if the member is self, the same
if (this._vertex.stopping) return;
if (this._vertex._stopping) return;

this.log.debug('delete member attempt', member.name);

Expand Down Expand Up @@ -212,19 +214,23 @@ class Cluster extends EventEmitter {
}


_onMemberSet(name, member) {
_onMemberSet(name, member, meta) {
if (name == this._vertex.name) {
// Members are emitted in the order they were created (store seq),
// ignore members that existed before me,
// they join to me, not me to them, the sockets go both ways
this._noConnect = false;
}

if (this._members[member.name]) {
this._members[member.name]._addMemberRecord(member);
let memberInstance;

if (memberInstance = this._members[member.name]) {
memberInstance._addMemberRecord(member, meta);
return;
}
this._members[member.name] = new Member(this._vertex, member);
memberInstance = new Member(this._vertex, member);
this._members[member.name] = memberInstance;
memberInstance._addMemberRecord(member, meta);
}


Expand Down
22 changes: 13 additions & 9 deletions lib/KeyStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,14 @@ class KeyStore {

Object.defineProperty(this, '_onMemberAddListener', {value: this._onMemberAdd.bind(this)});
Object.defineProperty(this, '_onMemberRemoveListener', {value: this._onMemberRemove.bind(this)});
this._cluster.on('member/add', this._onMemberAddListener);
this._cluster.on('member/remove', this._onMemberRemoveListener);
this._cluster._vertex.on('members/add', this._onMemberAddListener);
this._cluster._vertex.on('members/remove', this._onMemberRemoveListener);
}


stop() {
this._cluster.removeListener('member/add', this._onMemberAddListener);
this._cluster.removeListener('member/remove', this._onMemberRemoveListener);
this._cluster._vertex.removeListener('members/add', this._onMemberAddListener);
this._cluster._vertex.removeListener('members/remove', this._onMemberRemoveListener);
}


Expand All @@ -67,7 +67,6 @@ class KeyStore {

// opts:
// {consensus: 0.4, expire: 1000} // set only occurs if > 0.4 ratio of members also set within expire
// {random: 2, expire: 1000} // NOT IMPLEMENTED - returns true to random 2 of the n members that also set within expire
store.actions.set = (key, value, opts) => {
let type = typeof key;
if (type != 'string' && type != 'boolean' && type != 'number') {
Expand All @@ -83,6 +82,8 @@ class KeyStore {

store.actions.has = (key) => !!store.data[key];

// opts:
// {consensus: 0.4, expire: 1000} // set only occurs if > 0.4 ratio of members also del within expire
store.actions.del = (key, opts) => {
let type = typeof key;
if (type != 'string' && type != 'boolean' && type != 'number') {
Expand All @@ -91,6 +92,9 @@ class KeyStore {
return this._doAction(store, 'del', key, undefined, opts);
};

this._cluster._vertex.stores[name] = store.actions;
this._cluster._vertex.emit('stores/add', store.actions);

return store.actions;
}

Expand Down Expand Up @@ -501,7 +505,7 @@ class KeyStore {

// emit last in case of throw in listener
try {
store.actions.emit('set', key, value);
store.actions.emit('set', key, value, {seq: seq});
} catch (error) {
this._cluster._vertex.emit('error', error);
}
Expand Down Expand Up @@ -542,7 +546,7 @@ class KeyStore {
delete this._sequence[existing.seq];
delete this._stores[store].data[key];
try {
theStore.emit(act, key);
theStore.emit(act, key, deepcopy(existing.val), {seq: existing.seq});
} catch (error) {
this._cluster._vertex.emit('error', error);
}
Expand All @@ -568,7 +572,7 @@ class KeyStore {
}

try {
theStore.emit(act, key, deepcopy(value));
theStore.emit(act, key, deepcopy(value), {seq: seq});
} catch (error) {
this._cluster._vertex.emit('error', error);
}
Expand Down Expand Up @@ -633,7 +637,7 @@ class KeyStore {

if (!duplicate) {
try {
this._stores[store].actions.emit('set', key, deepcopy(value.val));
this._stores[store].actions.emit('set', key, deepcopy(value.val), {seq: value.seq});
} catch (error) {
this._cluster._vertex.emit('error', error);
}
Expand Down
8 changes: 7 additions & 1 deletion lib/Member.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,14 @@ class Member {
this._updated(this);
}

_addMemberRecord(member) {
_addMemberRecord(member, meta) {
this._recorded = true;

Object.defineProperty(this, '_seq', {
value: meta.seq,
enumerable: true
});

this._updated(this);
}

Expand Down
54 changes: 46 additions & 8 deletions lib/Vertex.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const deepcopy = require('deepcopy');
const {format} = require('util');
const {EventEmitter} = require('events');
const REPL = require('repl');
const {createWord} = require('vertex-names');
const VertexLogger = require('vertex-logger');

Expand Down Expand Up @@ -42,10 +43,13 @@ class Vertex extends EventEmitter {
value: name || this.config.name || createWord(11, {finished: true}),
enumerable: true
});
Object.defineProperty(this, 'stopping', {
Object.defineProperty(this, '_stopping', {
value: false,
writable: true,
enumerable: true
writable: true
});
Object.defineProperty(this, '_stopped', {
value: false,
writable: true
});

this.log = null;
Expand All @@ -60,12 +64,15 @@ class Vertex extends EventEmitter {

Object.keys(this.config).forEach(serviceName => {
if (this.config[serviceName] == null) return;
if (serviceName[0] == '_') return;
if (serviceName == 'name') return;
if (serviceName == 'logLevel') return;
if (serviceName == 'log') return;
if (serviceName == 'services') return;
if (serviceName == '$start') return;
if (serviceName == '$stop') return;
if (serviceName == 'members') return;
if (serviceName == 'stores') return;
if (serviceName == 'config') return;
if (this._services.indexOf(serviceName) >= 0) return;
this._services.push(serviceName);
});
Expand All @@ -81,6 +88,9 @@ class Vertex extends EventEmitter {
});

this.log.info('assigned name %s', this.name);

// a repl is started if stdout is a TTY
this._startREPL();
}


Expand Down Expand Up @@ -148,9 +158,9 @@ class Vertex extends EventEmitter {


$stop(error) {
if (this.stopping) return Promise.resolve();
if (this.stopped) return Promise.resolve();
this.stopping = true;
if (this._stopping) return Promise.resolve();
if (this._stopped) return Promise.resolve();
this._stopping = true;
this._error = error;

try {
Expand Down Expand Up @@ -183,7 +193,7 @@ class Vertex extends EventEmitter {
Object.keys(this._services).forEach(name => {
delete this._services[name];
});
this.stopped = true;
this._stopped = true;
return resolve();
}

Expand Down Expand Up @@ -232,6 +242,34 @@ class Vertex extends EventEmitter {
}
}


_startREPL() {
if (!process.stdout.isTTY) return;
if (process.stdout.repl) return; // only one

process.stdout.repl = true;

let repl = REPL.start({
ignoreUndefined: true
});

repl.context.vertex = this;

this.log.repl = repl;

this.once('stopped', () => {
this.log._preWrite(this.log.repl);
this.log.repl = undefined;
repl.close();
});

repl.once('exit', () => {
this.log._preWrite(this.log.repl);
this.log.repl = undefined;
this.$stop();
});
}

}

Object.defineProperty(Vertex, 'Server', {value: Server, enumerable: true});
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"dependencies": {
"deep-equal": "^1.0.1",
"deepcopy": "^0.6.3",
"vertex-logger": "^2.2.1",
"vertex-logger": "^2.3.1",
"vertex-names": "^1.1.4",
"vertex-transport": "^3.1.3"
},
Expand Down
36 changes: 32 additions & 4 deletions test/func-KeyStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,27 @@ describe(filename, function () {

});

it('emits set value with meta', done => {

let {servers} = cluster;
let store1 = servers[9].cluster._stores.createStore('test');
let store2 = servers[3].cluster._stores.createStore('test');

let emitted1, emitted2;
store1.on('set', (...args) => emitted1 = args);
store2.on('set', (...args) => emitted2 = args);

store1.set('key', 'value')

.then(() => {
expect(emitted1).to.eql(emitted2);
expect(emitted1).to.eql(['key', 'value', {seq: 10}]);
})

.then(done).catch(done);

});

it('can replace key', done => {

let {servers} = cluster;
Expand Down Expand Up @@ -352,9 +373,14 @@ describe(filename, function () {
it('can delete key', done => {

let {servers} = cluster;
let store = servers[9].cluster._stores.createStore('test');
let store1 = servers[9].cluster._stores.createStore('test');
let store2 = servers[6].cluster._stores.createStore('test');

store.set('key', 1)
let emitted1, emitted2;
store1.on('del', (...args) => emitted1 = args);
store2.on('del', (...args) => emitted2 = args);

store1.set('key', 1)

.then()

Expand All @@ -381,11 +407,13 @@ describe(filename, function () {
})

.then(() => {
return store.del('key');
return store1.del('key');
})

.then(result => {
expect(result.ok).to.be(true);
expect(emitted1).to.eql(emitted2);
expect(emitted1).to.eql(['key', 1, {seq: 10}]);
return servers.map(server => {
return server.cluster._stores._stores.test.data.key
});
Expand Down Expand Up @@ -681,7 +709,7 @@ describe(filename, function () {

});

context('load', function () {
xcontext('load', function () {

// TODO: try with/without deepcopy

Expand Down
2 changes: 2 additions & 0 deletions test/unit-Cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ describe(filename, () => {

beforeEach(() => {
mockVertex = {
on: () => {},
emit: () => {},
server: {
registerService: () => {}
},
Expand Down
Loading

0 comments on commit 22e6d66

Please sign in to comment.