Skip to content

Commit

Permalink
Use upring.replica() correctly.
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Nov 3, 2016
1 parent 2a759d3 commit 02e7174
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 42 deletions.
6 changes: 5 additions & 1 deletion kv.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,20 @@ UpRingKV.prototype.get = function (key, cb) {
}

UpRingKV.prototype.liveUpdates = function (key) {
return nes.obj((done) => {
const result = nes.obj((done) => {
this.upring.request({ key, ns, cmd: 'liveUpdates' }, function (err, res) {
if (err) {
done(err)
return
}

result.emit('newStream')

done(null, res.streams.updates)
})
})

return result
}

UpRingKV.prototype.whoami = function () {
Expand Down
125 changes: 100 additions & 25 deletions lib/commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,86 @@ const Readable = require('readable-stream').Readable

function load (kv) {
const upring = kv.upring
const pairs = new Map()
const db = new Map()
const streams = new Map()
const logger = upring.logger

upring.add('ns:kv,cmd:put', function (req, reply) {
const key = req.key
function setupTracker (entry, reply, sendData) {
if (entry.hasTracker) {
reply()
return
}

const key = entry.key

entry.hasTracker = true
entry.hasReplicator = false

logger.debug({ key }, 'configuring tracker')

const dest = upring._hashring.next(key)
const tracker = upring.track(key, { replica: true })

process.nextTick(function () {
tracker.on('replica', sendData)
})
tracker.on('moved', function () {
entry.hasTracker = false
const rs = streams.get(key) || []
rs.forEach((stream) => stream.destroy())
streams.delete(key)
if (!entry.hasReplicator) {
db.delete(key)
}
})

if (dest) {
sendData(dest, reply)
return false
}

return true
}

function setupReplicator (entry, sendData) {
const key = entry.key
entry.hasReplicator = true
logger.debug({ key }, 'configuring replicator')
upring.replica(key, function () {
entry.hasReplicator = false
setupTracker(entry, bigError, sendData)
})
}

function Entry (key) {
this.key = key
this.hasTracker = false
this.hasReplicator = false
this.value = null
}

upring.add('ns:kv,cmd:put', put)

function put (req, reply) {
const key = req.key
var needReply = true
var entry = db.get(key) || new Entry(key)

entry.value = req.value
db.set(key, entry)

if (upring.allocatedToMe(key)) {
const tracker = upring.track(key, { replica: true })
tracker.on('replica', sendData)
tracker.on('moved', function () {
const rs = streams.get(key) || []
rs.forEach((stream) => stream.destroy())
streams.delete(key)
})
if (dest) {
sendData(dest, reply)
needReply = false
if (!entry.hasTracker) {
needReply = setupTracker(entry, reply, sendData)
}
} else {
if (!entry.hasReplicator) {
setupReplicator(entry, bigError, sendData)
}
}

logger.debug({ key, value: req.value }, 'setting data')

pairs.set(key, req.value, reply)

if (needReply) {
reply()
}
Expand All @@ -57,26 +110,48 @@ function load (kv) {
cb()
})
}
})
}

upring.add('ns:kv,cmd:get', function (req, reply) {
const value = pairs.get(req.key)
const entry = db.get(req.key)
const key = req.key
req.skipList = req.skipList || []
req.skipList.push(upring.whoami())
const dest = upring._hashring.next(key, req.skipList)

if (value || !(dest && upring.allocatedToMe(key))) {
reply(null, { key, value })
if (entry && entry.value || !dest) {
reply(null, { key, value: entry ? entry.value : undefined })
} else {
logger.debug({ key }, 'checking if we are in the middle of a migration')
upring.peerConn(dest)
.request(req, function (err, res) {
if (res && !pairs.has(req.key)) {
if (err) {
reply(err)
return
}

const entry = db.get(key)

if (res && !entry) {
logger.debug({ key }, 'set data because of migration')
pairs.set(req.key, res.value)
put({
ns: 'kv',
cmd: 'put',
key,
value: res.value
}, function (err) {
if (err) {
reply(err)
return
}

reply(null, res)
})

return
}
reply(err, res)

reply(null, res)
})
}
})
Expand All @@ -101,9 +176,9 @@ function load (kv) {

array.push(updates)

const value = pairs.get(req.key)
if (value) {
updates.push(value)
const entry = db.get(req.key)
if (entry && entry.hasTracker) {
updates.push(entry.value)
}

reply(null, { streams: { updates } })
Expand Down
66 changes: 50 additions & 16 deletions test/multi.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ test('get and put', function (t) {
for (var i = 0; i < maxInt && !this.allocatedToMe(key); i += 1) {
key = 'hello' + i
}
// key is now allocated to a b
// key is now allocated to b

a.put(key, 'world', function (err) {
t.error(err)
Expand All @@ -49,7 +49,7 @@ test('get and put', function (t) {
})

test('moving data', function (t) {
t.plan(9)
t.plan(13)

const a = build()
t.tearDown(a.close.bind(a))
Expand All @@ -65,6 +65,11 @@ test('moving data', function (t) {

var key = 'hello'

for (var i = 0; i < maxInt && !a.upring.allocatedToMe(key); i += 1) {
key = 'hello' + i
}
// key is now allocated to a

a.put(key, 'world', function (err) {
t.error(err)
b.upring.join(a.whoami(), function () {
Expand All @@ -74,13 +79,40 @@ test('moving data', function (t) {
t.error(err)
t.equal(value, 'world')

a.close(function () {
t.pass('closed')
var c

b.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')
b.upring.once('peerDown', function (peer) {
c = build(b)

t.tearDown(c.close.bind(c))

b.upring.on('peerUp', function (peer) {
c.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')

closeAndGet()
})
})

c.upring.on('up', function () {
t.pass('c joined')
})
})

function closeAndGet () {
b.close(function () {
t.pass('b closed')

c.get(key, function (err, value) {
t.error(err)
t.equal(value, 'world')
})
})
}

a.close(function () {
t.pass('a closed')
})
})
})
Expand All @@ -90,7 +122,7 @@ test('moving data', function (t) {
})

test('liveUpdates', function (t) {
t.plan(8)
t.plan(9)

const a = build()
t.tearDown(a.close.bind(a))
Expand All @@ -109,13 +141,13 @@ test('liveUpdates', function (t) {
for (var i = 0; i < maxInt && !this.allocatedToMe(key); i += 1) {
key = 'bbb' + i
}
// key is now allocated to a b
// key is now allocated to b

a.put(key, 'world', function (err) {
t.error(err)

const stream = a.liveUpdates(key)
const expected = ['matteo', 'luca']
const expected = ['world', 'matteo', 'luca']

stream.on('data', function (chunk) {
t.deepEqual(chunk, expected.shift(), 'chunk matches')
Expand All @@ -125,14 +157,16 @@ test('liveUpdates', function (t) {
t.fail('no error in stream')
})

b.put(key, 'matteo', function (err) {
t.error(err)
stream.once('newStream', function () {
b.put(key, 'matteo', function (err) {
t.error(err)

b.close(function () {
t.pass('closed')
b.close(function () {
t.pass('closed')

a.put(key, 'luca', function (err) {
t.error(err)
a.put(key, 'luca', function (err) {
t.error(err)
})
})
})
})
Expand Down

0 comments on commit 02e7174

Please sign in to comment.