From 461b0d0b8c40bbb15a5be6675121b0d67cafe5e3 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 7 May 2020 09:00:05 +0100 Subject: [PATCH 1/2] fix: re-sort queue after adding tasks When tasks are added to an existing list of tasks for a given peer, we need to sort the queue to ensure the order is correct, otherwise we never process pending tasks as task lists with pending tasks need to be moved up the queue. Fixes the build problems exposed in ipfs/js-ipfs#2992 Also upgrade aegir to a safe version. --- package.json | 2 +- src/decision-engine/req-queue.js | 7 +++---- test/decision-engine/req-queue.spec.js | 25 +++++++++++++++++++++++++ 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index 4a0a39aa..61660634 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "homepage": "https://github.com/ipfs/js-ipfs-bitswap#readme", "devDependencies": { "@nodeutils/defaults-deep": "^1.1.0", - "aegir": "^21.9.0", + "aegir": "^21.10.1", "async-iterator-all": "^1.0.0", "benchmark": "^2.1.4", "buffer": "^5.6.0", diff --git a/src/decision-engine/req-queue.js b/src/decision-engine/req-queue.js index 5b3bec92..c8e0f937 100644 --- a/src/decision-engine/req-queue.js +++ b/src/decision-engine/req-queue.js @@ -54,12 +54,11 @@ class RequestQueue { */ pushTasks (peerId, tasks) { let peerTasks = this._byPeer.get(peerId.toB58String()) - if (peerTasks) { - peerTasks.pushTasks(tasks) - return + + if (!peerTasks) { + peerTasks = new PeerTasks(peerId, this._taskMerger) } - peerTasks = new PeerTasks(peerId, this._taskMerger) peerTasks.pushTasks(tasks) this._byPeer.set(peerId.toB58String(), peerTasks) } diff --git a/test/decision-engine/req-queue.spec.js b/test/decision-engine/req-queue.spec.js index 1eeb7ec7..b185b46d 100644 --- a/test/decision-engine/req-queue.spec.js +++ b/test/decision-engine/req-queue.spec.js @@ -237,6 +237,31 @@ describe('Request Queue', () => { }) }) + it('resorts queue when new peer tasks are added where peer tasks already exist', () => { + const rq = new RequestQueue() + + rq.pushTasks(peerIds[0], [{ + topic: 'a', + size: 0, + priority: 1 + }]) + rq.pushTasks(peerIds[1], [{ + topic: 'a', + size: 0, + priority: 1 + }]) + rq.pushTasks(peerIds[0], [{ + topic: 'a', + size: 1, + priority: 1 + }]) + + // _byPeer map should have been resorted to put peer0 + // fist in the queue + const { peerId } = rq.popTasks(16) + expect(peerId).to.eql(peerIds[0]) + }) + describe('remove', () => { it('removes tasks by peer and topic', () => { const rq = new RequestQueue() From 82516b962f5364577525ceea8dec716aad0484bc Mon Sep 17 00:00:00 2001 From: achingbrain Date: Thu, 7 May 2020 11:11:43 +0100 Subject: [PATCH 2/2] fix: survive not being able to send messages to peers Builds on the work in #221 I see intermittant but frequent CI errors with js-ipfs, usually after a test has finished and the ndoes are being torn down. Getting it to do some additional logging reveals bitswap is crashing when it cannot send a message to a remote peer due to the libp2p dial failing: ```console ipfs: [stdout] Error: stream ended before 1 bytes became available ipfs: at /home/travis/build/ipfs/js-ipfs/node_modules/it-reader/index.js:37:9 ipfs: at processTicksAndRejections (internal/process/task_queues.js:97:5) ipfs: at async /home/travis/build/ipfs/js-ipfs/node_modules/it-length-prefixed/src/decode.js:80:20 ipfs: at async oneChunk (/home/travis/build/ipfs/js-ipfs/node_modules/multistream-select/src/multistream.js:12:20) ipfs: at async Object.exports.read (/home/travis/build/ipfs/js-ipfs/node_modules/multistream-select/src/multistream.js:34:15) ipfs: at async module.exports (/home/travis/build/ipfs/js-ipfs/node_modules/multistream-select/src/select.js:21:19) ipfs: at async ClassIsWrapper.newStream [as _newStream] (/home/travis/build/ipfs/js-ipfs/node_modules/libp2p/src/upgrader.js:251:40) ipfs: at async ClassIsWrapper.newStream (/home/travis/build/ipfs/js-ipfs/node_modules/libp2p-interfaces/src/connection/connection.js:172:34) ipfs: at async Network.sendMessage (/home/travis/build/ipfs/js-ipfs/node_modules/ipfs-bitswap/src/network.js:147:34) ipfs: at async DecisionEngine._processTasks (/home/travis/build/ipfs/js-ipfs/node_modules/ipfs-bitswap/src/decision-engine/index.js:124:5) { ipfs: code: 'ERR_UNSUPPORTED_PROTOCOL', ipfs: buffer: BufferList { _bufs: [], length: 0 } ipfs: } ``` This PR adds a try/catch around network send operations. At the moment it just dumps the request, I'm not sure if we want to add a retry in there or something. --- src/decision-engine/index.js | 18 ++++++---- test/decision-engine/decision-engine.js | 46 +++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 7 deletions(-) diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 90565a8a..b873e9dd 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -120,17 +120,21 @@ class DecisionEngine { return } - // Send the message - await this.network.sendMessage(peerId, msg) + try { + // Send the message + await this.network.sendMessage(peerId, msg) + + // Peform sent message accounting + for (const block of blocks.values()) { + this.messageSent(peerId, block) + } + } catch (err) { + this._log.error(err) + } // Free the tasks up from the request queue this._requestQueue.tasksDone(peerId, tasks) - // Peform sent message accounting - for (const block of blocks.values()) { - this.messageSent(peerId, block) - } - // Trigger the next round of task processing this._scheduleProcessTasks() } diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index eb9e5a31..eec8dcf4 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -683,4 +683,50 @@ describe('Engine', () => { } } }) + + it('survives not being able to send a message to peer', async () => { + let r + const failToSendPromise = new Promise((resolve) => { + r = resolve + }) + + const network = mockNetwork() + network.sendMessage = () => { + r() + throw new Error('Something is b0rken') + } + + // who is in the network + const us = await newEngine(network) + const them = await newEngine() + + // add a block to our blockstore + const data = Buffer.from(`this is message ${Date.now()}`) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(hash) + const block = new Block(data, cid) + await us.engine.blockstore.put(block) + + // receive a message with a want for our block + await us.engine.messageReceived(them.peer, { + blocks: [], + wantlist: [{ + cid, + priority: 1, + wantType: 'wanty' + }] + }) + + // should have added a task for the remote peer + const tasks = us.engine._requestQueue._byPeer.get(them.peer.toB58String()) + + expect(tasks).to.have.property('_pending').that.has.property('length', 1) + + // wait for us.network.sendMessage to be called + await failToSendPromise + + // should be done processing + expect(tasks).to.have.property('_pending').that.has.property('length', 0) + expect(tasks).to.have.property('_active').that.has.property('size', 0) + }) })