Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#20 - check nodes on ping #172

Merged
merged 6 commits into from Dec 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 94 additions & 1 deletion client.js
Expand Up @@ -13,6 +13,7 @@ var randombytes = require('randombytes')
var simpleSha1 = require('simple-sha1')

var ROTATE_INTERVAL = 5 * 60 * 1000 // rotate secrets every 5 minutes
var BUCKET_OUTDATED_TIMESPAN = 15 * 60 * 1000 // check nodes in bucket in 15 minutes old buckets

inherits(DHT, EventEmitter)

Expand All @@ -39,12 +40,27 @@ function DHT (opts) {
this._verify = opts.verify || null
this._host = opts.host || null
this._interval = setInterval(rotateSecrets, ROTATE_INTERVAL)
this._hash = opts.hash || sha1
this._bucketCheckInterval = null
this._bucketOutdatedTimeSpan = opts.timeBucketOutdated || BUCKET_OUTDATED_TIMESPAN

this.listening = false
this.destroyed = false
this.nodeId = this._rpc.id
this.nodes = this._rpc.nodes

this.nodes.on('ping', function (nodes, contact) {
self._debug('received ping', nodes, contact)
self._checkAndRemoveNodes(nodes, function (_, removed) {
if (removed) {
self._debug('added new node:', contact)
self.addNode(contact)
}

self._debug('no node added, all other nodes ok')
})
})

process.nextTick(bootstrap)

EventEmitter.call(this)
Expand Down Expand Up @@ -81,13 +97,83 @@ function DHT (opts) {
}
}

DHT.prototype._setBucketCheckInterval = function () {
var self = this
var interval = 1 * 60 * 1000 // check age of bucket every minute

this._bucketCheckInterval = setInterval(function () {
const diff = Date.now() - self._rpc.nodes.metadata.lastChange

if (diff >= self._bucketOutdatedTimeSpan) {
self._checkAndRemoveNodes(self.nodes.toArray(), function () {
if (self.nodes.toArray().length < 1) {
// node is currently isolated,
// retry with initial bootstrap nodes
self._bootstrap(true)
}
})
}
}, interval)
}

DHT.prototype.removeBucketCheckInterval = function () {
clearInterval(this._bucketCheckInterval)
}

DHT.prototype.updateBucketTimestamp = function () {
this._rpc.nodes.metadata.lastChange = Date.now()
}

DHT.prototype._checkAndRemoveNodes = function (nodes, cb) {
var self = this

this._checkNodes(nodes, function (_, node) {
if (node) self.removeNode(node.id)
cb(null, node)
})
}

DHT.prototype._checkNodes = function (nodes, cb) {
var self = this

function test (acc) {
if (!acc.length) {
return cb(null)
}

var current = acc.pop()

self._sendPing(current, function (err) {
if (!err) {
self.updateBucketTimestamp()
return test(acc)
}

// retry
self._sendPing(current, function (er) {
if (err) {
return cb(null, current)
}

self.updateBucketTimestamp()
return test(acc)
})
})
}

test(nodes)
}

DHT.prototype.addNode = function (node) {
var self = this
if (node.id) {
node.id = toBuffer(node.id)
var old = !!this._rpc.nodes.get(node.id)
this._rpc.nodes.add(node)
if (!old) this.emit('node', node)
if (!old) {
this.emit('node', node)
this.updateBucketTimestamp()
}
return
}
this._sendPing(node, function (_, node) {
Expand All @@ -106,6 +192,7 @@ DHT.prototype._sendPing = function (node, cb) {
if (!pong.r || !pong.r.id || !Buffer.isBuffer(pong.r.id) || pong.r.id.length !== self._hashLength) {
return cb(new Error('Bad reply'))
}
self.updateBucketTimestamp()
cb(null, {
id: pong.r.id,
host: node.host || node.address,
Expand Down Expand Up @@ -358,6 +445,9 @@ DHT.prototype.address = function () {
// listen([port], [address], [onlistening])
DHT.prototype.listen = function () {
this._rpc.bind.apply(this._rpc, arguments)

this.updateBucketTimestamp()
this._setBucketCheckInterval()
}

DHT.prototype.destroy = function (cb) {
Expand All @@ -368,6 +458,7 @@ DHT.prototype.destroy = function (cb) {
this.destroyed = true
var self = this
clearInterval(this._interval)
clearInterval(this._bucketCheckInterval)
this._debug('destroying')
this._rpc.destroy(function () {
self.emit('close')
Expand Down Expand Up @@ -530,6 +621,8 @@ DHT.prototype._bootstrap = function (populate) {
}, ready)

function ready () {
if (self.ready) return

self._debug('emit ready')
self.ready = true
self.emit('ready')
Expand Down
2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -15,7 +15,7 @@
"buffer-equals": "^1.0.3",
"debug": "^3.1.0",
"inherits": "^2.0.1",
"k-bucket": "^3.0.1",
"k-bucket": "^3.3.0",
"k-rpc": "^4.1.0",
"lru": "^3.1.0",
"randombytes": "^2.0.5",
Expand Down
175 changes: 175 additions & 0 deletions test/updated-bucket.js
@@ -0,0 +1,175 @@
var common = require('./common')
var DHT = require('../')
var test = require('tape')

test('adding a node updates the lastChange property', function (t) {
t.plan(3)

var now = Date.now()
var dht = new DHT({ bootstrap: false })

t.notOk(dht._rpc.nodes.metadata.lastChange, 'lastChanged not set')

setTimeout(function () {
dht.addNode({host: '127.0.0.1', port: 9999, id: common.randomId()})
t.equal(typeof dht._rpc.nodes.metadata.lastChange, 'number')
t.ok(
dht._rpc.nodes.metadata.lastChange > now,
'lastChange timestamp is older'
)
dht.destroy()
}, 1)
})

test('same node doesn´t change the lastChange property', function (t) {
t.plan(3)

var dht = new DHT({ bootstrap: false })

t.notOk(dht._rpc.nodes.metadata.lastChange, 'lastChanged not set')

var nodeId = common.randomId()
var lastChanged
setTimeout(function () {
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})

t.equal(typeof dht._rpc.nodes.metadata.lastChange, 'number')
lastChanged = dht._rpc.nodes.metadata.lastChange

setTimeout(function () {
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
t.equal(dht._rpc.nodes.metadata.lastChange, lastChanged)
dht.destroy()
}, 1)
}, 1)
})

test('same node doesn´t change the lastChange property', function (t) {
t.plan(3)

var dht = new DHT({ bootstrap: false })

t.notOk(dht._rpc.nodes.metadata.lastChange, 'lastChanged not set')

var nodeId = common.randomId()
var lastChanged
setTimeout(function () {
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})

t.equal(typeof dht._rpc.nodes.metadata.lastChange, 'number')
lastChanged = dht._rpc.nodes.metadata.lastChange

setTimeout(function () {
dht.addNode({host: '127.0.0.1', port: 9999, id: nodeId})
t.equal(dht._rpc.nodes.metadata.lastChange, lastChanged)
dht.destroy()
}, 1)
}, 1)
})

test('_checkNodes: skips good nodes', function (t) {
t.plan(5)
var dht1 = new DHT({ bootstrap: false })
common.failOnWarningOrError(t, dht1)

dht1.on('ready', function () {
t.pass('dht1 `ready` event fires because { bootstrap: false }')
t.equal(dht1.ready, true)

dht1.listen(function () {
var port = dht1.address().port
t.pass('dht1 listening on port ' + port)

// dht2 will get all 3 nodes from dht1 and should also emit a `ready` event
var dht2 = new DHT({ bootstrap: '127.0.0.1:' + port })
common.failOnWarningOrError(t, dht2)

dht2.on('ready', function () {
var nodes = dht1.nodes.toArray()

dht1._checkNodes(nodes, function (err, data) {
t.notOk(err, 'no error')
t.notOk(data, 'no broken nodes')
dht1.destroy()
dht2.destroy()
})
})
})
})
})

test('_checkNodes: returns the bad one', function (t) {
t.plan(5)
var dht1 = new DHT({ bootstrap: false })
common.failOnWarningOrError(t, dht1)

dht1.on('ready', function () {
t.pass('dht1 `ready` event fires because { bootstrap: false }')
t.equal(dht1.ready, true)

var nodeId = common.randomId()
var badNode = {host: '127.0.0.1', port: 9999, id: nodeId}
dht1.addNode(badNode)

dht1.listen(function () {
var port = dht1.address().port
t.pass('dht1 listening on port ' + port)

// dht2 will get all 3 nodes from dht1 and should also emit a `ready` event
var dht2 = new DHT({ bootstrap: '127.0.0.1:' + port })
common.failOnWarningOrError(t, dht2)

dht2.on('ready', function () {
var goodNodes = dht1.nodes.toArray()
var goodNode = goodNodes[0]
var nodes = [goodNode, goodNode, badNode, goodNode]

dht1._checkNodes(nodes, function (err, data) {
t.notOk(err, 'no error')
t.equal(data.id, badNode.id)
dht1.destroy()
dht2.destroy()
})
})
})
})
})

test('_checkAndRemoveNodes: removes bad nodes', function (t) {
t.plan(6)
var dht1 = new DHT({ bootstrap: false })
common.failOnWarningOrError(t, dht1)

dht1.on('ready', function () {
t.pass('dht1 `ready` event fires because { bootstrap: false }')
t.equal(dht1.ready, true)

var nodeId = common.randomId()

dht1.listen(function () {
var port = dht1.address().port
t.pass('dht1 listening on port ' + port)

// dht2 will get all 3 nodes from dht1 and should also emit a `ready` event
var dht2 = new DHT({ bootstrap: '127.0.0.1:' + port })
common.failOnWarningOrError(t, dht2)

dht2.on('ready', function () {
t.equal(dht1.nodes.toArray().length, 1)
var goodNodes = dht1.nodes.toArray()
var goodNode = goodNodes[0]
var badNode = {host: '127.0.0.1', port: 9999, id: nodeId}
dht1.addNode(badNode)

t.equal(dht1.nodes.toArray().length, 2)

var nodes = [goodNode, goodNode, badNode, goodNode]
dht1._checkAndRemoveNodes(nodes, function (_, data) {
t.equal(dht1.nodes.toArray().length, 1)
dht1.destroy()
dht2.destroy()
})
})
})
})
})