From b58c18798478e5cb1a741a4b247a6f395b2f56f6 Mon Sep 17 00:00:00 2001 From: Jacopo Scazzosi Date: Tue, 22 Aug 2023 10:21:41 +0200 Subject: [PATCH] reverts to the double-queue approach to minimize setTimeout() calls --- src/InstantRelay.spec.ts | 12 ++++-------- src/InstantRelay.ts | 4 ++-- src/node.ts | 26 ++++++++++++-------------- src/types.ts | 4 +++- 4 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/InstantRelay.spec.ts b/src/InstantRelay.spec.ts index 6d8eb98..9fa71df 100644 --- a/src/InstantRelay.spec.ts +++ b/src/InstantRelay.spec.ts @@ -89,8 +89,7 @@ describe('instant-relay', () => { return (message) => new Promise(() => {}); }, opts); for (let i = 0; i < 4; i += 1) { - // @ts-ignore - ir.nodeMap.get('r')!.push({ id: i + '', type: 'msg' }); + ir.nodeMap.get('r')!.queue.push({ id: i + '', type: 'msg' }); } }); @@ -105,8 +104,7 @@ describe('instant-relay', () => { let currDelta = 0; let count = 0; const loop = () => { - // @ts-ignore - ir.nodeMap.get('r')!.push({ id: count + '', type: 'msg' }).then(() => { + ir.nodeMap.get('r')!.queue.push({ id: count + '', type: 'msg' }).then(() => { currTstmp = Date.now(); currDelta = currTstmp - prevTstmp; if (count > 0) { @@ -150,8 +148,7 @@ describe('instant-relay', () => { }, {}); setImmediate(() => { - // @ts-ignore - ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' }); + ir.nodeMap.get('a')!.queue.push({ id: '0', type: 'greeting' }); }); }); @@ -181,8 +178,7 @@ describe('instant-relay', () => { }, {}); setImmediate(() => { - // @ts-ignore - ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' }, () => {}); + ir.nodeMap.get('a')!.queue.push({ id: '0', type: 'greeting' }); }); }); diff --git a/src/InstantRelay.ts b/src/InstantRelay.ts index 8178003..8334b79 100644 --- a/src/InstantRelay.ts +++ b/src/InstantRelay.ts @@ -4,8 +4,8 @@ import { makeNode } from './node'; export class InstantRelay { - private readonly nodeMap: Map>; - private readonly nodeArr: InternalNode[]; + readonly nodeMap: Map>; + readonly nodeArr: InternalNode[]; constructor() { this.nodeMap = new Map(); diff --git a/src/node.ts b/src/node.ts index 7bbd988..3279556 100644 --- a/src/node.ts +++ b/src/node.ts @@ -20,13 +20,13 @@ const makeSend = (nodeMap: Map>, senderId: string): S crashWithError(new Error(`Unknown node with id "${recipientId}"`)); return; } - await recipient.push(message); + await recipient.queue.push(message); }; }; const makeBroadcast = (nodeArr: InternalNode[], senderId: string): BroadcastMessage => { return async (msg: M) => { - await Promise.all(nodeArr.map(node => node.id !== senderId ? node.push(msg) : null)); + await Promise.all(nodeArr.map(node => node.id !== senderId ? node.queue.push(msg) : null)); }; }; @@ -51,27 +51,25 @@ export const makeNode = ( const broadcast = makeBroadcast(nodeArr, id); const handleMessage = factory(send, broadcast, { ...opts, id }); - let queueLength = 0; + const handlingQueue = fastq.promise(handleMessage, concurrency); - const queue = fastq.promise(async (msg: M) => { - await handleMessage(msg); - }, concurrency); - - queue.error((err, task) => { + handlingQueue.error((err, task) => { if (err !== null) { crashWithError(err); return; } }); - const push = async (msg: M) => { - queue.push(msg); - if ((queueLength = queue.length()) >= highWaterMark) { - await wait(throttle(queueLength)); + let handlingQueueLength = 0; + + const incomingQueue = fastq.promise(async (msg: M) => { + handlingQueue.push(msg); + if ((handlingQueueLength = handlingQueue.length()) >= highWaterMark) { + await wait(throttle(handlingQueueLength)); } - }; + }, 1); - const node: InternalNode = { id, push }; + const node: InternalNode = { id, queue: incomingQueue }; nodeMap.set(id, node); nodeArr.push(node); diff --git a/src/types.ts b/src/types.ts index bc753ea..6e5c6be 100644 --- a/src/types.ts +++ b/src/types.ts @@ -1,4 +1,6 @@ +import fastq from 'fastq'; + export interface HandleMessage { (message: M): Promise; } @@ -23,5 +25,5 @@ export interface AddNodeOpts { export interface InternalNode { readonly id: string; - readonly push: (msg: M) => Promise; + readonly queue: fastq.queueAsPromised; }