Skip to content

Commit

Permalink
Fixed replica issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Nov 9, 2016
1 parent d283306 commit ad9a571
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 17 deletions.
2 changes: 1 addition & 1 deletion bin.js
Expand Up @@ -15,7 +15,7 @@ const args = require('minimist')(process.argv.slice(2), {
default: {
port: 0,
points: 100,
timeout: 200,
timeout: 1000,
verbose: false
},
alias: {
Expand Down
35 changes: 20 additions & 15 deletions lib/commands.js
Expand Up @@ -7,7 +7,6 @@ function load (kv) {
const upring = kv.upring
const db = new Map()
const streams = new Map()
const logger = upring.logger

function setupTracker (entry, reply, sendData) {
if (entry.hasTracker) {
Expand All @@ -20,7 +19,7 @@ function load (kv) {
entry.hasTracker = true
entry.hasReplicator = false

logger.debug({ key }, 'configuring tracker')
upring.logger.debug({ key }, 'configuring tracker')

const dest = upring._hashring.next(key)
const tracker = upring.track(key, { replica: true })
Expand All @@ -39,7 +38,7 @@ function load (kv) {
})
streams.delete(key)
setTimeout(function () {
if (!entry.hasReplicator) {
if (!entry.hasReplicator && !entry.hasTracker) {
db.delete(key)
}
}, 30000).unref()
Expand All @@ -56,7 +55,7 @@ function load (kv) {
function setupReplicator (entry, sendData) {
const key = entry.key
entry.hasReplicator = true
logger.debug({ key }, 'configuring replicator')
upring.logger.debug({ key }, 'configuring replicator')
upring.replica(key, function () {
entry.hasReplicator = false
setupTracker(entry, noop, sendData)
Expand Down Expand Up @@ -91,7 +90,7 @@ function load (kv) {
}
}

logger.debug({ key, value: req.value }, 'setting data')
upring.logger.debug({ key, value: req.value }, 'setting data')

if (needReply) {
reply()
Expand All @@ -109,7 +108,7 @@ function load (kv) {

function sendData (peer, cb) {
if (typeof cb !== 'function') {
cb = bigError
cb = retry
}

const entry = db.get(key)
Expand All @@ -131,11 +130,23 @@ function load (kv) {
return
}

logger.debug({ key, value: entry.value }, 'replicated key')
upring.logger.debug({ key, value: entry.value, to: peer }, 'replicated key')

cb()
})
}

function retry (err) {
if (err) {
upring.logger.error(err)
const dest = upring._hashring.next(key)
if (!dest) {
return upring.emit('error', err)
}

sendData(dest)
}
}
}

upring.add('ns:kv,cmd:get', function (req, reply) {
Expand All @@ -148,7 +159,7 @@ function load (kv) {
if (entry && entry.value || !dest) {
reply(null, { key, value: entry ? entry.value : undefined })
} else {
logger.debug({ key }, 'checking if we are in the middle of a migration')
upring.logger.debug({ key }, 'checking if we are in the middle of a migration')
upring.peerConn(dest)
.request(req, function (err, res) {
if (err) {
Expand All @@ -159,7 +170,7 @@ function load (kv) {
const entry = db.get(key)

if (res && !entry) {
logger.debug({ key }, 'set data because of migration')
upring.logger.debug({ key }, 'set data because of migration')
put({
ns: 'kv',
cmd: 'put',
Expand Down Expand Up @@ -209,12 +220,6 @@ function load (kv) {

reply(null, { streams: { updates } })
})

function bigError (err) {
if (err) {
upring.emit('error', err)
}
}
}

function noop () {}
Expand Down
2 changes: 1 addition & 1 deletion test/helper.js
Expand Up @@ -12,7 +12,7 @@ function build (main) {

return UpRingKV({
base,
logLevel: 'error',
logLevel: 'fatal',
hashring: {
joinTimeout
}
Expand Down
104 changes: 104 additions & 0 deletions test/multi-replica-die-twice.test.js
@@ -0,0 +1,104 @@
'use strict'

const build = require('./helper').build
const t = require('tap')
const maxInt = Math.pow(2, 32) - 1

t.plan(16)

var a = build()
t.tearDown(a.close.bind(a))

var c
var b
var key

a.upring.on('up', function () {
t.pass('a up')

join(a, function (instance) {
t.pass('b up')
b = instance

key = 'hello'

for (var i = 0; i < maxInt && !a.upring.allocatedToMe(key); i += 1) {
key = 'hello' + i
}
// key is now allocated to a

a.put(key, 'world', function (err) {
t.error(err)

b.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')

afterDown(a, b, function () {
t.pass('a closed')

join(b, function (instance) {
t.pass('c joined')
c = instance

c.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')

closeBAndGet()
})
})
})
})
})
})
})

function afterDown (prev, next, cb) {
var count = 0
next.upring.once('peerDown', function () {
if (++count === 2) {
cb()
}
})
prev.close(function () {
if (++count === 2) {
cb()
}
})
}

function join (main, cb) {
const instance = build(main)

t.tearDown(instance.close.bind(instance))

instance.upring.on('up', function () {
cb(instance)
})
}

function closeBAndGet () {
afterDown(b, c, function () {
t.pass('b closed')

c.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')

join(c, function (d) {
t.pass('d up')
setTimeout(function () {
afterDown(c, d, function () {
t.pass('c closed')

d.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')
})
})
}, 1000)
})
})
})
}

0 comments on commit ad9a571

Please sign in to comment.