From ad9a57181b262711f3d1a1fc65cfb2f96c297a91 Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Wed, 9 Nov 2016 19:08:48 +0000 Subject: [PATCH] Fixed replica issues. --- bin.js | 2 +- lib/commands.js | 35 +++++---- test/helper.js | 2 +- test/multi-replica-die-twice.test.js | 104 +++++++++++++++++++++++++++ 4 files changed, 126 insertions(+), 17 deletions(-) create mode 100644 test/multi-replica-die-twice.test.js diff --git a/bin.js b/bin.js index d1247ad..0b523b5 100755 --- a/bin.js +++ b/bin.js @@ -15,7 +15,7 @@ const args = require('minimist')(process.argv.slice(2), { default: { port: 0, points: 100, - timeout: 200, + timeout: 1000, verbose: false }, alias: { diff --git a/lib/commands.js b/lib/commands.js index a7513d2..0e9a6b7 100644 --- a/lib/commands.js +++ b/lib/commands.js @@ -7,7 +7,6 @@ function load (kv) { const upring = kv.upring const db = new Map() const streams = new Map() - const logger = upring.logger function setupTracker (entry, reply, sendData) { if (entry.hasTracker) { @@ -20,7 +19,7 @@ function load (kv) { entry.hasTracker = true entry.hasReplicator = false - logger.debug({ key }, 'configuring tracker') + upring.logger.debug({ key }, 'configuring tracker') const dest = upring._hashring.next(key) const tracker = upring.track(key, { replica: true }) @@ -39,7 +38,7 @@ function load (kv) { }) streams.delete(key) setTimeout(function () { - if (!entry.hasReplicator) { + if (!entry.hasReplicator && !entry.hasTracker) { db.delete(key) } }, 30000).unref() @@ -56,7 +55,7 @@ function load (kv) { function setupReplicator (entry, sendData) { const key = entry.key entry.hasReplicator = true - logger.debug({ key }, 'configuring replicator') + upring.logger.debug({ key }, 'configuring replicator') upring.replica(key, function () { entry.hasReplicator = false setupTracker(entry, noop, sendData) @@ -91,7 +90,7 @@ function load (kv) { } } - logger.debug({ key, value: req.value }, 'setting data') + upring.logger.debug({ key, value: req.value }, 'setting data') if (needReply) { reply() @@ -109,7 +108,7 @@ function load (kv) { function sendData (peer, cb) { if (typeof cb !== 'function') { - cb = bigError + cb = retry } const entry = db.get(key) @@ -131,11 +130,23 @@ function load (kv) { return } - logger.debug({ key, value: entry.value }, 'replicated key') + upring.logger.debug({ key, value: entry.value, to: peer }, 'replicated key') cb() }) } + + function retry (err) { + if (err) { + upring.logger.error(err) + const dest = upring._hashring.next(key) + if (!dest) { + return upring.emit('error', err) + } + + sendData(dest) + } + } } upring.add('ns:kv,cmd:get', function (req, reply) { @@ -148,7 +159,7 @@ function load (kv) { if (entry && entry.value || !dest) { reply(null, { key, value: entry ? entry.value : undefined }) } else { - logger.debug({ key }, 'checking if we are in the middle of a migration') + upring.logger.debug({ key }, 'checking if we are in the middle of a migration') upring.peerConn(dest) .request(req, function (err, res) { if (err) { @@ -159,7 +170,7 @@ function load (kv) { const entry = db.get(key) if (res && !entry) { - logger.debug({ key }, 'set data because of migration') + upring.logger.debug({ key }, 'set data because of migration') put({ ns: 'kv', cmd: 'put', @@ -209,12 +220,6 @@ function load (kv) { reply(null, { streams: { updates } }) }) - - function bigError (err) { - if (err) { - upring.emit('error', err) - } - } } function noop () {} diff --git a/test/helper.js b/test/helper.js index 78d4e04..51191ea 100644 --- a/test/helper.js +++ b/test/helper.js @@ -12,7 +12,7 @@ function build (main) { return UpRingKV({ base, - logLevel: 'error', + logLevel: 'fatal', hashring: { joinTimeout } diff --git a/test/multi-replica-die-twice.test.js b/test/multi-replica-die-twice.test.js new file mode 100644 index 0000000..0dd1232 --- /dev/null +++ b/test/multi-replica-die-twice.test.js @@ -0,0 +1,104 @@ +'use strict' + +const build = require('./helper').build +const t = require('tap') +const maxInt = Math.pow(2, 32) - 1 + +t.plan(16) + +var a = build() +t.tearDown(a.close.bind(a)) + +var c +var b +var key + +a.upring.on('up', function () { + t.pass('a up') + + join(a, function (instance) { + t.pass('b up') + b = instance + + key = 'hello' + + for (var i = 0; i < maxInt && !a.upring.allocatedToMe(key); i += 1) { + key = 'hello' + i + } + // key is now allocated to a + + a.put(key, 'world', function (err) { + t.error(err) + + b.get(key, function (err, value) { + t.error(err) + t.equal(value, 'world') + + afterDown(a, b, function () { + t.pass('a closed') + + join(b, function (instance) { + t.pass('c joined') + c = instance + + c.get(key, function (err, value) { + t.error(err) + t.equal(value, 'world') + + closeBAndGet() + }) + }) + }) + }) + }) + }) +}) + +function afterDown (prev, next, cb) { + var count = 0 + next.upring.once('peerDown', function () { + if (++count === 2) { + cb() + } + }) + prev.close(function () { + if (++count === 2) { + cb() + } + }) +} + +function join (main, cb) { + const instance = build(main) + + t.tearDown(instance.close.bind(instance)) + + instance.upring.on('up', function () { + cb(instance) + }) +} + +function closeBAndGet () { + afterDown(b, c, function () { + t.pass('b closed') + + c.get(key, function (err, value) { + t.error(err) + t.equal(value, 'world') + + join(c, function (d) { + t.pass('d up') + setTimeout(function () { + afterDown(c, d, function () { + t.pass('c closed') + + d.get(key, function (err, value) { + t.error(err) + t.equal(value, 'world') + }) + }) + }, 1000) + }) + }) + }) +}