diff --git a/src/decision-engine/index.js b/src/decision-engine/index.js index 90565a8a..b873e9dd 100644 --- a/src/decision-engine/index.js +++ b/src/decision-engine/index.js @@ -120,17 +120,21 @@ class DecisionEngine { return } - // Send the message - await this.network.sendMessage(peerId, msg) + try { + // Send the message + await this.network.sendMessage(peerId, msg) + + // Peform sent message accounting + for (const block of blocks.values()) { + this.messageSent(peerId, block) + } + } catch (err) { + this._log.error(err) + } // Free the tasks up from the request queue this._requestQueue.tasksDone(peerId, tasks) - // Peform sent message accounting - for (const block of blocks.values()) { - this.messageSent(peerId, block) - } - // Trigger the next round of task processing this._scheduleProcessTasks() } diff --git a/test/decision-engine/decision-engine.js b/test/decision-engine/decision-engine.js index eb9e5a31..eec8dcf4 100644 --- a/test/decision-engine/decision-engine.js +++ b/test/decision-engine/decision-engine.js @@ -683,4 +683,50 @@ describe('Engine', () => { } } }) + + it('survives not being able to send a message to peer', async () => { + let r + const failToSendPromise = new Promise((resolve) => { + r = resolve + }) + + const network = mockNetwork() + network.sendMessage = () => { + r() + throw new Error('Something is b0rken') + } + + // who is in the network + const us = await newEngine(network) + const them = await newEngine() + + // add a block to our blockstore + const data = Buffer.from(`this is message ${Date.now()}`) + const hash = await multihashing(data, 'sha2-256') + const cid = new CID(hash) + const block = new Block(data, cid) + await us.engine.blockstore.put(block) + + // receive a message with a want for our block + await us.engine.messageReceived(them.peer, { + blocks: [], + wantlist: [{ + cid, + priority: 1, + wantType: 'wanty' + }] + }) + + // should have added a task for the remote peer + const tasks = us.engine._requestQueue._byPeer.get(them.peer.toB58String()) + + expect(tasks).to.have.property('_pending').that.has.property('length', 1) + + // wait for us.network.sendMessage to be called + await failToSendPromise + + // should be done processing + expect(tasks).to.have.property('_pending').that.has.property('length', 0) + expect(tasks).to.have.property('_active').that.has.property('size', 0) + }) })