Skip to content

Commit

Permalink
Add sequences for deployment alongside starting sth
Browse files Browse the repository at this point in the history
  • Loading branch information
a-tylenda committed Jun 29, 2023
1 parent 320e92e commit 979e310
Show file tree
Hide file tree
Showing 11 changed files with 352 additions and 0 deletions.
74 changes: 74 additions & 0 deletions typescript/mgr-sequence/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Sequence deployment together with starting Hubs

## Sequence description

### timestamp-manager

This Sequence starts multiple instances of the '@scramjet/timestamp-producer' sequence, calculates the time differences between received timestamps and local timestamps, and returns the overall average difference in milliseconds to the Instance output and stdout.

### timestamp-producer

This Sequence generates and sends timestamps to the output stream. Upon receiving the 'start' event, the Sequence starts generating the specified number of timestamps at the given interval and pushes them to the output stream. It uses the 'real-hrtime' module to generate timestamps. After generating the desired number of timestamps, it pushes an end indicator to signify the end of data.

## STH deployment command

```bash
sth -S typescript/mgr-sequence/seq-config.json -E -D typescript/mgr-sequence --runtime-adapter process --config typescript/mgr-sequence/shh-config.json
```

> Optionally you can use `DEVELOPMENT=1` env to see Runner logs.
Options used in the command:

* `-S <path>` or `--startup-config <path>` - option that points at configuration JSON file, which contains metadata needed for starting up the sequence(s). It works only with process adapter. As a `<path>` argument you should provide the location of a config file with the list of Sequences to be started along with the sth.
The example of a startup-config.json file:

```json
{
"sequences": [
{
"id": "sequence-name",
"args": [],
"instanceId": "11111111-2222-3333-4444-555555555555"
}
]
}
```

As the example above shows, you should pass in your own Sequence ID and Instance ID in your config file.

Sequence id given in the config file should be exactly the same as the same if the Sequence directory. Otherwise you will get a WARN like this in STH logs:

```bash
2023-06-16T13:58:54.067Z WARN Host Sequence id not found for startup config [ { id: 'sequence-name', args: [], instanceId: '11111111-2222-3333-4444-555555555555' } ]
```
* `-E` or `--identify-existing` - this option scans the catalog and looks for Sequences, when there are any found they are added to STH and started right after.
· `-D <path>` or `--sequences-root <path>` - this option points at the location of the Sequences that will be started together with sth. This is also where ProcessSequenceAdapter saves new Sequences.
Optionally:
* `-X` or `--exit-with-last-instance` - thanks to this option STH will exit when no more Instances exist.
## Output
The results will be provided by the instance of the `timestamp-manager` Sequence.
Read `stdout` stream (`si inst stdout <instance-id>`), or check out STH logs, you should see something similar to this:
```bash
Channel averages (nanoseconds):
┌─────────┬──────────────┐
│ (index) │ Values │
├─────────┼──────────────┤
│ 0 │ ' 422940'
│ 1 │ ' 423649'
│ 2 │ ' 422948'
│ 3 │ ' 434039'
│ 4 │ ' 412757'
└─────────┴──────────────┘
Average from 5 output streams: 423266 (nanoseconds)
Average of 5 averages: 0.2116 (milliseconds)
```
To find out what each Sequence does look into the comments included in the code.
9 changes: 9 additions & 0 deletions typescript/mgr-sequence/seq-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"sequences": [
{
"id": "timestamp-manager",
"args": [],
"instanceId": "11111111-2222-3333-4444-555555555555"
}
]
}
21 changes: 21 additions & 0 deletions typescript/mgr-sequence/shh-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"host": {
"hostname":"0.0.0.0",
"port":9002,
"instancesServerPort":9003,
"id":"shh-1"
},
"description": "My awesome Self-Hosted Hub <3",
"tags": [
"scramjet",
"data streaming"
],
"timings": {
"instanceLifetimeExtensionDelay": 360000
},
"platform": {
"apiKey":"<your-access-key>",
"api":"https://api-shh.scramjet.cloud",
"space":"<your-org-id>:<your-org-id>-manager"
}
}
112 changes: 112 additions & 0 deletions typescript/mgr-sequence/timestamp-manager/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/* eslint-disable no-loop-func */
import { AppConfig, AppContext } from "@scramjet/types";
import { PassThrough, Readable } from "stream";

const rht = require("./real-hrtime.node");

// Default configuration values
let CONFIG = {
tsPerInstance: 1000,
tsInterval: 100,
instancesCount: 5
};

/**
* This Sequence finds the Sequence with the name '@scramjet/timestamp-producer' on the Sequence list and starts it multiple times.
* Each Instance of the '@scramjet/timestamp-producer' Sequence will send a 'start' event, triggering the data flow. The Instances will write
* timestamps to the output stream based on the provided number and interval arguments.
* The Sequence reads the output from each Instance, calculates the time differences, and returns the overall average to the output stream.
*
* @param {AppContext} this - Application context
* @param {any}_stream - A dummy input stream
* @param {number} instancesCount - The number of instances to start
* @param {number} tsPerInstance - The number of timestamps to be sent to the output
* @param {number} tsInterval - The interval at which the timestamps are sent
* @returns {PassThrough} A PassThrough stream containing the overall average
*/

export default [
async function (this: AppContext<AppConfig, any>, _stream: any, instancesCount: number, tsPerInstance: number, tsInterval: number) {
// Wait for 5 seconds
await new Promise((res) => setTimeout(res, 5000));

// Update the configuration values if provided, otherwise use default values
CONFIG.instancesCount ??= instancesCount;
CONFIG.tsInterval ??= tsInterval;
CONFIG.tsPerInstance ??= tsPerInstance;

// Retrieve the Sequence ID by its name and create Sequence client
const sequenceName = "@scramjet/timestamp-producer";
const hostClient = this.hub!;
const seqList = await hostClient.listSequences();
const producerSequence = seqList.find((seq) => seq.config.name === sequenceName);
const seqClient = hostClient.getSequenceClient(producerSequence!.id);

// Start multiple instances of the sequence with specified arguments in CONFIG
const instances = await Promise.all(
new Array(CONFIG.instancesCount).fill(null)
.map((_e, i) => {
return new Promise((res) => setTimeout(res, i * 1000))
.then(__ => seqClient.start({ args: [CONFIG.tsPerInstance, CONFIG.tsInterval], appConfig: {} }));
})
);

// Get the "output" stream from each instance
const outputs = await Promise.all(
instances.map((instanceClient, i) =>
new Promise((res) => setTimeout(res, i * 16))
.then(() => instanceClient.getStream("output")
)
)
) as Readable[];

// Send "start" event to each instance
instances.forEach(async (ic, i) => {
await new Promise((res) => setTimeout(res, i * 8))
ic.sendEvent("start", "");
});

// Calculate the differences between timestamps received from instances and the local timestamps
const outputDiffs = await Promise.all(outputs.map((o, i) =>
new Promise<bigint[]>(res => {
const diffs: BigInt[] = []; // Array to store the differences between timestamps

// Event-based processing of data from the output stream
o.on("data", (d) => {
const rx = d.toString().replace("\n", ""); // Convert received data to string and remove "\n"

// If end indicator received, resolve with the differences array
if (rx === "0") {
res(diffs as bigint[]);
return;
}

const rxTs = BigInt(rx); // Convert received timestamp to BigInt
const ts = rht.bigint() as bigint; // Get local timestamp using rht module

diffs.push((ts - rxTs) as bigint); // Calculate difference and push it in the diffs array
});
})
));

// Calculate the average difference for each output
const outputDiffsAvg = outputDiffs.map((d: bigint[]) => d.reduce((partialSum: bigint, a: bigint) => partialSum + a, BigInt(0)) / BigInt(d.length));

console.log("Channel averages (nanoseconds):");
console.table(outputDiffsAvg.map(a => Number(a).toFixed(0).toString().padStart(10, " ")));

// Calculate the overall average difference in nanoseconds
const avg = outputDiffsAvg.reduce((partialSum: bigint, a: bigint) => partialSum + a, BigInt(0)) / BigInt(outputDiffsAvg.length);
console.log(`Average from ${CONFIG.instancesCount} output streams:`, Number(avg), "(nanoseconds)");

// Calculate the ultimate average in milliseconds
const ultimateAvg = (Number(avg) / 1000000 / 2).toFixed(4);
console.log(`Average of ${outputDiffsAvg.length} averages:`, ultimateAvg, "(milliseconds)\n");

// Create a new PassThrough stream and write the overall average in ms
const ps = new PassThrough({ encoding: "utf-8" });
ps.write(ultimateAvg.toString()+ "\n");

return ps;
}
];
35 changes: 35 additions & 0 deletions typescript/mgr-sequence/timestamp-manager/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"name": "@scramjet/timestamp-manager",
"version": "0.1.0",
"main": "dist/index.js",
"author": "Scramjet <open-source@scramjet.org>",
"license": "ISC",
"description": "This package contains a Scramjet Sequence that starts multiple instances of the '@scramjet/timestamp-producer' sequence, calculates the time differences between received timestamps and local timestamps, and returns the overall average difference in milliseconds to the Instance output and stdout.",
"keywords": [
"sample",
"easy",
"streaming",
"Data Producer"
],
"repository": {
"type": "git",
"url": "https://github.com/scramjetorg/platform-samples/tree/main/typescript/mgr-sequence"
},
"assets": [
"real-hrtime.node"
],
"scripts": {
"build": "tsc -p tsconfig.json",
"postbuild": "cp package.json dist/ && cp -r real-hrtime.node dist/ && (cd dist && npm i --omit=dev)",
"pack": "si seq pack ./dist/",
"clean": "rm -rf ./dist ./*.tar.gz"
},
"devDependencies": {
"@scramjet/types": "^0.34.0",
"@types/node": "15.12.5"
},
"dependencies": {
"@scramjet/api-client": "^0.34.0",
"scramjet": "^4.36.9"
}
}
Binary file not shown.
10 changes: 10 additions & 0 deletions typescript/mgr-sequence/timestamp-manager/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"compilerOptions": {
"outDir": "./dist",
},
"exclude": [
"node_modules",
"node_modules/@types/node/globals.d.ts"
]
}

44 changes: 44 additions & 0 deletions typescript/mgr-sequence/timestamp-producer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/* eslint-disable no-loop-func */

import { AppConfig, AppContext } from "@scramjet/types";
import { Readable } from "stream";

const rht = require("./real-hrtime.node");

/**
* This Sequence is responsible for generating and sending timestamps to the output stream.
* It listens for the "start" event and, upon receiving it, starts generating timestamps.
* The Sequence will generate the specified number of timestamps at the given interval and push them to the output stream.
* After generating the desired number of timestamps, it will push an end indicator to signify the end of data.
*
* * @param {AppContext} this - Application context
* @param {any} _stream - A dummy input stream
* @param {number} tsQuantity - The number of timestamps to be sent to the output (default: 128)
* @param {number} delay - The delay between each timestamp generation in milliseconds (default: 100)
* @returns {Readable} A Readable stream containing the generated timestamps
*/

export default [
async function (this: AppContext<AppConfig, any>, _stream: any, tsQuantity: number = 128, delay: number = 100) {
this.logger.info(`Sending ${tsQuantity} timestamps to output`);
const p = new Readable({ read: () => {}});
let l = 0;

// Event listener for the "start" event
this.on("start", async () => {
while (l++ < tsQuantity) {
await new Promise(res => setTimeout(res, delay));

if (!p.push(`${rht.stringified()}\n`)) {
await new Promise(res => p.once("drain", res));
}
}

await new Promise(res => setTimeout(res, 10));

p.push("0\n");
});

return p;
}
];
37 changes: 37 additions & 0 deletions typescript/mgr-sequence/timestamp-producer/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"name": "@scramjet/timestamp-producer",
"version": "0.1.0",
"main": "dist/index.js",
"author": "Scramjet <open-source@scramjet.org>",
"license": "ISC",
"description": "This package contains a Scramjet Sequence that generates and sends timestamps to the output stream. Upon receiving the 'start' event, the Sequence starts generating the specified number of timestamps at the given interval and pushes them to the output stream. It uses the 'real-hrtime' module to generate timestamps. After generating the desired number of timestamps, it pushes an end indicator to signify the end of data.",
"keywords": [
"sample",
"easy",
"streaming",
"Data Producer"
],
"repository": {
"type": "git",
"url": "https://github.com/scramjetorg/platform-samples/tree/main/typescript/mgr-sequence"
},
"assets": [
"real-hrtime.node"
], "scripts": {
"build": "tsc -p tsconfig.json",
"postbuild": "cp package.json dist/ && cp -r real-hrtime.node dist/ && (cd dist && npm i --omit=dev)",
"pack": "si seq pack ./dist/",
"clean": "rm -rf ./dist ./*.tar.gz"
},
"engines": {
"node": ">=14"
},
"devDependencies": {
"@scramjet/types": "^0.34.0",
"@types/node": "15.12.5"
},
"dependencies": {
"@scramjet/api-client": "^0.34.0",
"scramjet": "^4.36.9"
}
}
Binary file not shown.
10 changes: 10 additions & 0 deletions typescript/mgr-sequence/timestamp-producer/tsconfig.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"compilerOptions": {
"outDir": "./dist"
},
"exclude": [
"node_modules",
"node_modules/@types/node/globals.d.ts"
]
}

0 comments on commit 979e310

Please sign in to comment.