diff --git a/typescript/mgr-sequence/README.md b/typescript/mgr-sequence/README.md deleted file mode 100644 index c83b00b..0000000 --- a/typescript/mgr-sequence/README.md +++ /dev/null @@ -1,77 +0,0 @@ -# Sequence deployment together with starting Hubs - -## Sequence description - -### timestamp-manager - -This Sequence starts multiple instances of the '@scramjet/timestamp-producer' sequence, calculates the time differences between received timestamps and local timestamps, and returns the overall average difference in milliseconds to the Instance output and stdout. - -### timestamp-producer - -This Sequence generates and sends timestamps to the output stream. Upon receiving the 'start' event, the Sequence starts generating the specified number of timestamps at the given interval and pushes them to the output stream. It uses the 'real-hrtime' module to generate timestamps. After generating the desired number of timestamps, it pushes an end indicator to signify the end of data. - -## STH deployment command - ->**Build each Sequence before deployment (`npm run build`)** - -```bash -sth -S typescript/mgr-sequence/seq-config.json -E -D typescript/mgr-sequence --runtime-adapter process --config typescript/mgr-sequence/shh-config.json -``` - -> Optionally you can use `DEVELOPMENT=1` env to see Runner logs. - -Options used in the command: - -* `-S ` or `--startup-config ` - option that points at configuration JSON file, which contains metadata needed for starting up the sequence(s). It works only with process adapter. As a `` argument you should provide the location of a config file with the list of Sequences to be started along with the sth. -The example of a startup-config.json file: - -```json -{ - "sequences": [ - { - "id": "sequence-name", - "args": [], - "instanceId": "11111111-2222-3333-4444-555555555555" - } - ] -} -``` - -As the example above shows, you should pass in your own Sequence ID and Instance ID in your config file. - -Sequence id given in the config file should be exactly the same as the same if the Sequence directory. Otherwise you will get a WARN like this in STH logs: - -```bash -2023-06-16T13:58:54.067Z WARN Host Sequence id not found for startup config [ { id: 'sequence-name', args: [], instanceId: '11111111-2222-3333-4444-555555555555' } ] -``` - -* `-E` or `--identify-existing` - this option scans the catalog and looks for Sequences, when there are any found they are added to STH and started right after. - -· `-D ` or `--sequences-root ` - this option points at the location of the Sequences that will be started together with sth. This is also where ProcessSequenceAdapter saves new Sequences. - -Optionally: - -* `-X` or `--exit-with-last-instance` - thanks to this option STH will exit when no more Instances exist. - -## Output - -The results will be provided by the instance of the `timestamp-manager` Sequence. -Read `stdout` stream (`si inst stdout `), or check out STH logs, you should see something similar to this: - -```bash -Channel averages (nanoseconds): -┌─────────┬──────────────┐ -│ (index) │ Values │ -├─────────┼──────────────┤ -│ 0 │ ' 422940' │ -│ 1 │ ' 423649' │ -│ 2 │ ' 422948' │ -│ 3 │ ' 434039' │ -│ 4 │ ' 412757' │ -└─────────┴──────────────┘ -Average from 5 output streams: 423266 (nanoseconds) -Average of 5 averages: 0.2116 (milliseconds) -``` - -To find out what each Sequence does look into the comments included in the code. -___ diff --git a/typescript/mgr-sequence/seq-config.json b/typescript/mgr-sequence/seq-config.json deleted file mode 100644 index 07e1e62..0000000 --- a/typescript/mgr-sequence/seq-config.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "sequences": [ - { - "id": "timestamp-manager", - "args": [], - "instanceId": "11111111-2222-3333-4444-555555555555" - } - ] -} diff --git a/typescript/mgr-sequence/shh-config.json b/typescript/mgr-sequence/shh-config.json deleted file mode 100644 index 9691387..0000000 --- a/typescript/mgr-sequence/shh-config.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "host": { - "hostname":"0.0.0.0", - "port":9002, - "instancesServerPort":9003, - "id":"shh-1" - }, - "description": "My awesome Self-Hosted Hub <3", - "tags": [ - "scramjet", - "data streaming" - ], - "timings": { - "instanceLifetimeExtensionDelay": 360000 - }, - "platform": { - "apiKey":"", - "api":"https://api-shh.scramjet.cloud", - "space":":-manager" - } -} diff --git a/typescript/mgr-sequence/timestamp-manager/index.ts b/typescript/mgr-sequence/timestamp-manager/index.ts deleted file mode 100644 index 6e1f991..0000000 --- a/typescript/mgr-sequence/timestamp-manager/index.ts +++ /dev/null @@ -1,112 +0,0 @@ -/* eslint-disable no-loop-func */ -import { AppConfig, AppContext } from "@scramjet/types"; -import { PassThrough, Readable } from "stream"; - -const rht = require("./real-hrtime.node"); - -// Default configuration values -let CONFIG = { - tsPerInstance: 1000, - tsInterval: 100, - instancesCount: 5 -}; - -/** - * This Sequence finds the Sequence with the name '@scramjet/timestamp-producer' on the Sequence list and starts it multiple times. - * Each Instance of the '@scramjet/timestamp-producer' Sequence will send a 'start' event, triggering the data flow. The Instances will write - * timestamps to the output stream based on the provided number and interval arguments. - * The Sequence reads the output from each Instance, calculates the time differences, and returns the overall average to the output stream. - * - * @param {AppContext} this - Application context - * @param {any}_stream - A dummy input stream - * @param {number} instancesCount - The number of instances to start - * @param {number} tsPerInstance - The number of timestamps to be sent to the output - * @param {number} tsInterval - The interval at which the timestamps are sent - * @returns {PassThrough} A PassThrough stream containing the overall average - */ - -export default [ - async function (this: AppContext, _stream: any, instancesCount: number, tsPerInstance: number, tsInterval: number) { - // Wait for 5 seconds - await new Promise((res) => setTimeout(res, 5000)); - - // Update the configuration values if provided, otherwise use default values - CONFIG.instancesCount ??= instancesCount; - CONFIG.tsInterval ??= tsInterval; - CONFIG.tsPerInstance ??= tsPerInstance; - - // Retrieve the Sequence ID by its name and create Sequence client - const sequenceName = "@scramjet/timestamp-producer"; - const hostClient = this.hub!; - const seqList = await hostClient.listSequences(); - const producerSequence = seqList.find((seq) => seq.config.name === sequenceName); - const seqClient = hostClient.getSequenceClient(producerSequence!.id); - - // Start multiple instances of the sequence with specified arguments in CONFIG - const instances = await Promise.all( - new Array(CONFIG.instancesCount).fill(null) - .map((_e, i) => { - return new Promise((res) => setTimeout(res, i * 1000)) - .then(__ => seqClient.start({ args: [CONFIG.tsPerInstance, CONFIG.tsInterval], appConfig: {} })); - }) - ); - - // Get the "output" stream from each instance - const outputs = await Promise.all( - instances.map((instanceClient, i) => - new Promise((res) => setTimeout(res, i * 16)) - .then(() => instanceClient.getStream("output") - ) - ) - ) as Readable[]; - - // Send "start" event to each instance - instances.forEach(async (ic, i) => { - await new Promise((res) => setTimeout(res, i * 8)) - ic.sendEvent("start", ""); - }); - - // Calculate the differences between timestamps received from instances and the local timestamps - const outputDiffs = await Promise.all(outputs.map((o, i) => - new Promise(res => { - const diffs: BigInt[] = []; // Array to store the differences between timestamps - - // Event-based processing of data from the output stream - o.on("data", (d) => { - const rx = d.toString().replace("\n", ""); // Convert received data to string and remove "\n" - - // If end indicator received, resolve with the differences array - if (rx === "0") { - res(diffs as bigint[]); - return; - } - - const rxTs = BigInt(rx); // Convert received timestamp to BigInt - const ts = rht.bigint() as bigint; // Get local timestamp using rht module - - diffs.push((ts - rxTs) as bigint); // Calculate difference and push it in the diffs array - }); - }) - )); - - // Calculate the average difference for each output - const outputDiffsAvg = outputDiffs.map((d: bigint[]) => d.reduce((partialSum: bigint, a: bigint) => partialSum + a, BigInt(0)) / BigInt(d.length)); - - console.log("Channel averages (nanoseconds):"); - console.table(outputDiffsAvg.map(a => Number(a).toFixed(0).toString().padStart(10, " "))); - - // Calculate the overall average difference in nanoseconds - const avg = outputDiffsAvg.reduce((partialSum: bigint, a: bigint) => partialSum + a, BigInt(0)) / BigInt(outputDiffsAvg.length); - console.log(`Average from ${CONFIG.instancesCount} output streams:`, Number(avg), "(nanoseconds)"); - - // Calculate the ultimate average in milliseconds - const ultimateAvg = (Number(avg) / 1000000 / 2).toFixed(4); - console.log(`Average of ${outputDiffsAvg.length} averages:`, ultimateAvg, "(milliseconds)\n"); - - // Create a new PassThrough stream and write the overall average in ms - const ps = new PassThrough({ encoding: "utf-8" }); - ps.write(ultimateAvg.toString()+ "\n"); - - return ps; - } -]; diff --git a/typescript/mgr-sequence/timestamp-manager/package.json b/typescript/mgr-sequence/timestamp-manager/package.json deleted file mode 100644 index 7b5a7bf..0000000 --- a/typescript/mgr-sequence/timestamp-manager/package.json +++ /dev/null @@ -1,35 +0,0 @@ -{ - "name": "@scramjet/timestamp-manager", - "version": "0.1.0", - "main": "index.js", - "author": "Scramjet ", - "license": "ISC", - "description": "This package contains a Scramjet Sequence that starts multiple instances of the '@scramjet/timestamp-producer' sequence, calculates the time differences between received timestamps and local timestamps, and returns the overall average difference in milliseconds to the Instance output and stdout.", - "keywords": [ - "sample", - "easy", - "streaming", - "Data Producer" - ], - "repository": { - "type": "git", - "url": "https://github.com/scramjetorg/platform-samples/tree/main/typescript/mgr-sequence/timestamp-manager" - }, - "assets": [ - "real-hrtime.node" - ], - "scripts": { - "build": "tsc -p tsconfig.json", - "postbuild": "cp package.json dist/ && cp -r real-hrtime.node dist/ && (cd dist && npm i --omit=dev)", - "pack": "si seq pack ./dist/", - "clean": "rm -rf ./dist ./*.tar.gz" - }, - "devDependencies": { - "@scramjet/types": "^0.34.0", - "@types/node": "15.12.5" - }, - "dependencies": { - "@scramjet/api-client": "^0.34.0", - "scramjet": "^4.36.9" - } -} diff --git a/typescript/mgr-sequence/timestamp-manager/real-hrtime.node b/typescript/mgr-sequence/timestamp-manager/real-hrtime.node deleted file mode 100644 index ec5b214..0000000 Binary files a/typescript/mgr-sequence/timestamp-manager/real-hrtime.node and /dev/null differ diff --git a/typescript/mgr-sequence/timestamp-manager/tsconfig.json b/typescript/mgr-sequence/timestamp-manager/tsconfig.json deleted file mode 100644 index e1b54a0..0000000 --- a/typescript/mgr-sequence/timestamp-manager/tsconfig.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "compilerOptions": { - "outDir": "./dist", - }, - "exclude": [ - "node_modules", - "node_modules/@types/node/globals.d.ts" - ] -} - diff --git a/typescript/mgr-sequence/timestamp-producer/index.ts b/typescript/mgr-sequence/timestamp-producer/index.ts deleted file mode 100644 index 464fc4e..0000000 --- a/typescript/mgr-sequence/timestamp-producer/index.ts +++ /dev/null @@ -1,44 +0,0 @@ -/* eslint-disable no-loop-func */ - -import { AppConfig, AppContext } from "@scramjet/types"; -import { Readable } from "stream"; - -const rht = require("./real-hrtime.node"); - -/** - * This Sequence is responsible for generating and sending timestamps to the output stream. - * It listens for the "start" event and, upon receiving it, starts generating timestamps. - * The Sequence will generate the specified number of timestamps at the given interval and push them to the output stream. - * After generating the desired number of timestamps, it will push an end indicator to signify the end of data. - * - * * @param {AppContext} this - Application context - * @param {any} _stream - A dummy input stream - * @param {number} tsQuantity - The number of timestamps to be sent to the output (default: 128) - * @param {number} delay - The delay between each timestamp generation in milliseconds (default: 100) - * @returns {Readable} A Readable stream containing the generated timestamps - */ - -export default [ - async function (this: AppContext, _stream: any, tsQuantity: number = 128, delay: number = 100) { - this.logger.info(`Sending ${tsQuantity} timestamps to output`); - const p = new Readable({ read: () => {}}); - let l = 0; - - // Event listener for the "start" event - this.on("start", async () => { - while (l++ < tsQuantity) { - await new Promise(res => setTimeout(res, delay)); - - if (!p.push(`${rht.stringified()}\n`)) { - await new Promise(res => p.once("drain", res)); - } - } - - await new Promise(res => setTimeout(res, 10)); - - p.push("0\n"); - }); - - return p; - } -]; diff --git a/typescript/mgr-sequence/timestamp-producer/package.json b/typescript/mgr-sequence/timestamp-producer/package.json deleted file mode 100644 index 4042224..0000000 --- a/typescript/mgr-sequence/timestamp-producer/package.json +++ /dev/null @@ -1,37 +0,0 @@ -{ - "name": "@scramjet/timestamp-producer", - "version": "0.1.0", - "main": "index.js", - "author": "Scramjet ", - "license": "ISC", - "description": "This package contains a Scramjet Sequence that generates and sends timestamps to the output stream. Upon receiving the 'start' event, the Sequence starts generating the specified number of timestamps at the given interval and pushes them to the output stream. It uses the 'real-hrtime' module to generate timestamps. After generating the desired number of timestamps, it pushes an end indicator to signify the end of data.", - "keywords": [ - "sample", - "easy", - "streaming", - "Data Producer" - ], - "repository": { - "type": "git", - "url": "https://github.com/scramjetorg/platform-samples/tree/main/typescript/mgr-sequence/timestamp-producer" - }, - "assets": [ - "real-hrtime.node" - ], "scripts": { - "build": "tsc -p tsconfig.json", - "postbuild": "cp package.json dist/ && cp -r real-hrtime.node dist/ && (cd dist && npm i --omit=dev)", - "pack": "si seq pack ./dist/", - "clean": "rm -rf ./dist ./*.tar.gz" - }, - "engines": { - "node": ">=14" - }, - "devDependencies": { - "@scramjet/types": "^0.34.0", - "@types/node": "15.12.5" - }, - "dependencies": { - "@scramjet/api-client": "^0.34.0", - "scramjet": "^4.36.9" - } -} diff --git a/typescript/mgr-sequence/timestamp-producer/real-hrtime.node b/typescript/mgr-sequence/timestamp-producer/real-hrtime.node deleted file mode 100644 index ec5b214..0000000 Binary files a/typescript/mgr-sequence/timestamp-producer/real-hrtime.node and /dev/null differ diff --git a/typescript/mgr-sequence/timestamp-producer/tsconfig.json b/typescript/mgr-sequence/timestamp-producer/tsconfig.json deleted file mode 100644 index 8bdc5b6..0000000 --- a/typescript/mgr-sequence/timestamp-producer/tsconfig.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "compilerOptions": { - "outDir": "./dist" - }, - "exclude": [ - "node_modules", - "node_modules/@types/node/globals.d.ts" - ] -} -