Skip to content

Commit

Permalink
promise-based API, simpler types
Browse files Browse the repository at this point in the history
  • Loading branch information
jacoscaz committed Aug 22, 2023
1 parent 2ad9075 commit 20a962d
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 322 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest]
node-version: [12.x, 14.x, 15.x, 16.x]
node-version: [16.x, 18.x, 20.x]
arch: [x64]

runs-on: ${{ matrix.os }}
Expand Down
7 changes: 0 additions & 7 deletions .mocharc.js

This file was deleted.

4 changes: 4 additions & 0 deletions .npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dist/**/*.spec.js
dist/**/*.spec.d.ts
dist/**/*.spec.js.map
dist/howto.*
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
MIT License

Copyright (c) 2022 Belay Engineering s.r.l.
Copyright (c) 2022 - 2023 Jacopo Scazzosi <jacopo@scazzosi.com>
Copyright (c) 2022 - 2023 Matteo Murgida <matteo.murgida@gmail.com>

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
Expand Down
76 changes: 34 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,74 +10,78 @@ Node.js, with the following priorities in mind:
behavior**, addressed by decoupling message delivery from message
handling and managing backpressure upon delivery.
2. **Simplicity and ease of debugging**, addressed by a small codebase (~ 300
LoCs in total), few dependencies (2 direct, 4 in total) and forcing the use
of unique message identifiers to easily track messages as they are passed
from node to node.
LoCs in total) and few dependencies (1 direct, 2 in total).
3. **Performance**, addressed by selecting fast data structure implementations
and reducing the overall number of allocations per handled message.

`instant-relay` is a work-in-progress and is currently being tested and refined
in pre-production environments. It was born out of the convergence of previous
projects in the space of multi-protocol gateways for the IoT sector.

`instant-relay` development is supported by [Belay Engineering][1]. <br>
[<img src="https://belayeng.com/assets/images/logo/logo-positivo-rgb.svg" alt="Belay Engineerin's logo, the letter B made out of a continuous line" height="70">][1]

`instant-relay` was born out of the convergence of previous projects in the
space of multi-protocol gateways for the IoT sector.

## How to use

A new relay is created through the `InstantRelay` class, which requires a
union of possible message types as a type argument.

Message types _must_ implement the `Message` interface, which mandates the
presence of the `id` and `type` properties.
union of possible message types as a type argument.

New nodes can be added to an instance of the `InstantRelay` class by providing
dedicated factory functions implementing the `NodeFactory` interface.

```typescript
import { InstantRelay, Message, NodeFactory, uid } from 'instant-relay';
import { uid } from 'uid';
import { InstantRelay, NodeFactory } from 'instant-relay';

interface Request extends Message { type: 'req'; }
interface Response extends Message { type: 'res'; reqId: string; }
type Messages = Request | Response;
interface Request { id: string; type: 'req'; }
interface Response { type: 'res'; reqId: string; }

type Message = Request | Response;

const wait = (delay: number) => {
return new Promise((resolve) => {
setTimeout(resolve, delay);
});
};

const relay = new InstantRelay<Messages>();
const relay = new InstantRelay<Message>();

const serverFactory: NodeFactory<Messages, {}> = (send, broadcast, opts) => {
const serverFactory: NodeFactory<Message, {}> = (send, broadcast, opts) => {

return (message, done) => {
return async (message) => {
switch (message.type) {
case 'req':
console.log(`server received request ${message.id}`);
setTimeout(() => {
send('client', { id: uid(), type: 'res', reqId: message.id }, done);
}, Math.random() * 200);
await wait(Math.random() * 1000);
await send('client', { type: 'res', reqId: message.id });
break;
default:
done();
}
};
};

relay.addNode('server', serverFactory, {});

const clientFactory: NodeFactory<Messages, {}> = (send, broadcast, opts) => {
const clientFactory: NodeFactory<Message, {}> = (send, broadcast, opts) => {

let count = 0;

const loop = () => {
send('server', { id: uid(), type: 'req' }, loop);
send('server', { id: uid(), type: 'req' }).then(() => {
if (count < 10) {
count += 1;
loop()
} else {
count = 0;
setImmediate(loop);
}
});
};

setImmediate(loop);

return (message, done) => {
return async (message) => {
switch (message.type) {
case 'res':
console.log(`client received a response for request ${message.reqId}`);
done();
break;
default:
done();
}

};
Expand All @@ -89,18 +93,6 @@ relay.addNode('client', clientFactory, {});
Due to backpressure support, the loop that sends requests to the server node
will quickly slow down to a rate compatible with the artificial latency.

## Debug

`instant-relay` uses the `debug` module with the `instant-relay` namespace.
Debug messages can be enabled by setting the `DEBUG` environment variable as
follows:

```shell
DEBUG="instant-relay*" node index.js
```

## License

Licensed under [MIT](./LICENSE).

[1]: https://belayeng.com/en
25 changes: 11 additions & 14 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
{
"name": "instant-relay",
"version": "0.2.4",
"version": "0.3.0",
"description": "An opinionated library for asynchronous communication between nodes. Focuses on backpressure management, simplicity, performance.",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"ts:watch": "tsc --build --watch .",
"ts:build": "tsc --build .",
"build": "npm run ts:build",
"test": "mocha"
"build": "rm -rf ./dist && rm -rf ./tsconfig.tsbuildinfo && npm run ts:build",
"test": "mocha dist/**/*.spec.js"
},
"author": "Belay Engineering s.r.l. <info@belayeng.com>",
"author": "Jacopo Scazzosi <jacopo@scazzosi.com>",
"contributors": [
"Jacopo Scazzosi <jacopo@belayeng.com>",
"Matteo Murgida <matteo@belayeng.com>"
"Jacopo Scazzosi <jacopo@scazzosi.com>",
"Matteo Murgida <matteo.murgida@gmail.com>"
],
"license": "MIT",
"files": [
Expand Down Expand Up @@ -43,15 +43,12 @@
"node.js"
],
"devDependencies": {
"@types/debug": "^4.1.7",
"@types/mocha": "^9.0.0",
"@types/node": "^14.14.32",
"mocha": "^9.1.4",
"ts-node": "^10.4.0",
"typescript": "^4.2.3"
"@types/mocha": "^10.0.1",
"@types/node": "^20.5.1",
"mocha": "^10.2.0",
"typescript": "^5.1.6"
},
"dependencies": {
"debug": "^4.3.3",
"fastq": "^1.13.0"
"fastq": "^1.15.0"
}
}
67 changes: 31 additions & 36 deletions test/main.spec.ts → src/InstantRelay.spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@

import assert, { strictEqual } from 'assert';
import { InstantRelay, Message, NodeFactory } from '..';

const noop = () => {};
import { InstantRelay } from './InstantRelay';
import { NodeFactory } from './types';

describe('instant-relay', () => {

Expand All @@ -13,14 +12,13 @@ describe('instant-relay', () => {
const msg = { id: '1', type: 'msg' };
ir.addNode('a', (send, broadcast) => {
setImmediate(() => {
send('b', msg, noop);
send('b', msg);
});
return () => {};
return async () => {};
}, {});
ir.addNode('b', (send, broadcast) => {
return (message, done) => {
return async (message) => {
strictEqual(message, msg);
done();
testDone();
};
}, {});
Expand All @@ -35,14 +33,13 @@ describe('instant-relay', () => {
const msg = { id: '1', type: 'msg'};
ir.addNode('a', (send, broadcast) => {
setImmediate(() => {
broadcast(msg, noop);
broadcast(msg);
});
return () => {};
return async () => {};
}, {});
ir.addNode('b', (send, broadcast) => {
return (message, done) => {
return async (message) => {
strictEqual(message, msg, 'unexpected message');
done();
testDone();
};
}, {});
Expand All @@ -52,12 +49,11 @@ describe('instant-relay', () => {
const ir = new InstantRelay();
let receivedCount = 0;
const receiverQty = 3;
const msg: Message = { id: '1', type: 'msg'};
const msg = { id: '1', type: 'msg'};
const receiverFactory: NodeFactory<any, {}> = () => {
return (receivedMessage, done) => {
return async (receivedMessage) => {
strictEqual(msg, receivedMessage);
receivedCount += 1;
done();
if (receivedCount >= receiverQty) {
testDone();
}
Expand All @@ -68,9 +64,9 @@ describe('instant-relay', () => {
}
ir.addNode('broadcaster', (send, broadcast) => {
setImmediate(() => {
broadcast(msg, noop);
broadcast(msg);
});
return () => {};
return async () => {};
}, {});
});

Expand All @@ -90,42 +86,43 @@ describe('instant-relay', () => {
},
};
ir.addNode('r', (send, broadcast) => {
return (message, done) => {};
return (message) => new Promise(() => {});
}, opts);
for (let i = 0; i < 4; i += 1) {
// @ts-ignore
ir.nodes.get('r')!.incomingQueue.push({ id: i + '', type: 'msg' }, () => {});
ir.nodeMap.get('r')!.push({ id: i + '', type: 'msg' });
}
});

it('throttling should result in higher latencies for pushers', (testDone) => {
const ir = new InstantRelay();
ir.addNode('r', (send, broadcast) => {
return (message, done) => {};
return (message) => new Promise(() => {});
}, { highWaterMark: 1, throttle: (len: number) => len * 5 });
let prevTstmp = Date.now();
let currTstmp = prevTstmp;
let prevDelta = 0;
let currDelta = 0;
let count = 0;
const loop = () => {
count += 1;
// @ts-ignore
ir.nodes.get('r')!.incomingQueue.push({ id: count + '', type: 'msg' }, () => {
ir.nodeMap.get('r')!.push({ id: count + '', type: 'msg' }).then(() => {
currTstmp = Date.now();
currDelta = currTstmp - prevTstmp;
assert(currDelta > prevDelta);
if (count > 0) {
assert(currDelta > prevDelta, 'Incoherent deltas');
}
prevTstmp = currTstmp;
prevDelta = currDelta;
count += 1;
if (count === 10) {
testDone();
} else {
loop();
}
});
}).catch(testDone);
};
// @ts-ignore
ir.nodes.get('r')!.incomingQueue.push({ id: count + '', type: 'msg' }, loop);
loop();
});

});
Expand All @@ -138,24 +135,23 @@ describe('instant-relay', () => {

ir.addNode('a', (send, broadcast) => {
let recvd = 0;
return (message, done) => {
return async (message) => {
recvd += 1;
if (recvd === 20) {
done();
testDone();
return;
}
send('b', message, done);
await send('b', message);
};
}, {});

ir.addNode('b', (send, broadcast) => {
return (message, done) => { send('a', message, done); };
return async (message) => { await send('a', message); };
}, {});

setImmediate(() => {
// @ts-ignore
ir.nodes.get('a')!.incomingQueue.push({ id: '0', type: 'greeting' }, () => {});
ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' });
});

});
Expand All @@ -166,28 +162,27 @@ describe('instant-relay', () => {

ir.addNode('a', (send, broadcast) => {
let recvd = 0;
return (message, done) => {
return async (message) => {
recvd += 1;
if (recvd === 20) {
done();
testDone();
return;
}
send('b', message, done);
await send('b', message);
};
}, {});

ir.addNode('b', (send, broadcast) => {
return (message, done) => { send('c', message, done); };
return async (message) => { await send('c', message); };
}, {});

ir.addNode('c', (send, broadcast) => {
return (message, done) => { send('a', message, done); };
return async (message) => { await send('a', message); };
}, {});

setImmediate(() => {
// @ts-ignore
ir.nodes.get('a')!.incomingQueue.push({ id: '0', type: 'greeting' }, () => {});
ir.nodeMap.get('a')!.push({ id: '0', type: 'greeting' }, () => {});
});

});
Expand Down
Loading

0 comments on commit 20a962d

Please sign in to comment.