diff --git a/javascript-nodejs-stream/README.md b/javascript-nodejs-stream/README.md index 433bb3e3..dde542f2 100644 --- a/javascript-nodejs-stream/README.md +++ b/javascript-nodejs-stream/README.md @@ -14,7 +14,18 @@ Apart from [Node.js](https://nodejs.org/en/download/), these examples use the [` Code examples are executed via `npm`: - npm run publish - npm run receive +[Tutorial one: "Hello World!"](https://www.rabbitmq.com/tutorials/tutorial-one-javascript-stream): + +```shell +npm run send +npm run receive +``` + +[Tutorial two: Offset Tracking](https://www.rabbitmq.com/tutorials/tutorial-two-javascript-stream): + +```shell +npm run offset-tracking-publish +npm run offset-tracking-receive +``` To learn more, see [`coders51/rabbitmq-stream-js-client`](https://github.com/coders51/rabbitmq-stream-js-client). diff --git a/javascript-nodejs-stream/offset_tracking_receive.js b/javascript-nodejs-stream/offset_tracking_receive.js new file mode 100644 index 00000000..b401b11b --- /dev/null +++ b/javascript-nodejs-stream/offset_tracking_receive.js @@ -0,0 +1,60 @@ +const rabbit = require("rabbitmq-stream-js-client"); + +const sleep = (ms) => new Promise((r) => setTimeout(r, ms)); + +async function main() { + const streamName = "stream-offset-tracking-javascript"; + + console.log("Connecting..."); + const client = await rabbit.connect({ + hostname: "localhost", + port: 5552, + username: "guest", + password: "guest", + vhost: "/", + }); + + const consumerRef = "offset-tracking-tutorial"; + let firstOffset = undefined; + let offsetSpecification = rabbit.Offset.first(); + try { + const offset = await client.queryOffset({ reference: consumerRef, stream: streamName }); + offsetSpecification = rabbit.Offset.offset(offset + 1n); + } catch (e) {} + + let lastOffset = offsetSpecification.value; + let messageCount = 0; + const consumer = await client.declareConsumer( + { stream: streamName, offset: offsetSpecification, consumerRef }, + async (message) => { + messageCount++; + if (!firstOffset && messageCount === 1) { + firstOffset = message.offset; + console.log("First message received"); + } + if (messageCount % 10 === 0) { + await consumer.storeOffset(message.offset); + } + if (message.content.toString() === "marker") { + console.log("Marker found"); + lastOffset = message.offset; + await consumer.storeOffset(message.offset); + await consumer.close(true); + } + } + ); + + console.log(`Start consuming...`); + await sleep(2000); + console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`); + process.exit(0); +} + +main() + .then(async () => { + await new Promise(function () {}); + }) + .catch((res) => { + console.log("Error while receiving message!", res); + process.exit(-1); + }); diff --git a/javascript-nodejs-stream/offset_tracking_send.js b/javascript-nodejs-stream/offset_tracking_send.js new file mode 100644 index 00000000..52113878 --- /dev/null +++ b/javascript-nodejs-stream/offset_tracking_send.js @@ -0,0 +1,36 @@ +const rabbit = require("rabbitmq-stream-js-client"); + +async function main() { + console.log("Connecting..."); + const client = await rabbit.connect({ + vhost: "/", + port: 5552, + hostname: "localhost", + username: "guest", + password: "guest", + }); + + console.log("Making sure the stream exists..."); + const streamName = "stream-offset-tracking-javascript"; + await client.createStream({ stream: streamName, arguments: {} }); + + console.log("Creating the publisher..."); + const publisher = await client.declarePublisher({ stream: streamName }); + + const messageCount = 100; + console.log(`Publishing ${messageCount} messages`); + for (let i = 0; i < messageCount; i++) { + const body = i === messageCount - 1 ? "marker" : `hello ${i}`; + await publisher.send(Buffer.from(body)); + } + + console.log("Closing the connection..."); + await client.close(); +} + +main() + .then(() => console.log("done!")) + .catch((res) => { + console.log("Error in publishing message!", res); + process.exit(-1); + }); diff --git a/javascript-nodejs-stream/package-lock.json b/javascript-nodejs-stream/package-lock.json index 30d25692..96b1091c 100644 --- a/javascript-nodejs-stream/package-lock.json +++ b/javascript-nodejs-stream/package-lock.json @@ -8,7 +8,7 @@ "name": "rabbitmq-stream-node-tutorial", "version": "1.0.0", "dependencies": { - "rabbitmq-stream-js-client": "^0.3.1" + "rabbitmq-stream-js-client": "^0.4.1" } }, "node_modules/lru-cache": { @@ -23,9 +23,9 @@ } }, "node_modules/rabbitmq-stream-js-client": { - "version": "0.3.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz", - "integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz", + "integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==", "dependencies": { "semver": "^7.5.4" } @@ -60,9 +60,9 @@ } }, "rabbitmq-stream-js-client": { - "version": "0.3.1", - "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.3.1.tgz", - "integrity": "sha512-x2xfH+otHquRNPzzClWMuMa2njvqgbrG0YRY/AE51aL0PFCXlv0NZ9OXR7Y73X63+9kUzoYLDWBX4bw1rTfX8Q==", + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/rabbitmq-stream-js-client/-/rabbitmq-stream-js-client-0.4.1.tgz", + "integrity": "sha512-Dny3vFup/TQMcWXIKQUl3hdQQC1/ixeUEf4uEgzvwaFK/dIaUhsBT4J7i0mD581TUbCNhXFw4uWEXle9bXdmtA==", "requires": { "semver": "^7.5.4" } diff --git a/javascript-nodejs-stream/package.json b/javascript-nodejs-stream/package.json index d7fcb4cd..3d1a354d 100644 --- a/javascript-nodejs-stream/package.json +++ b/javascript-nodejs-stream/package.json @@ -3,10 +3,12 @@ "version": "1.0.0", "description": "Tutorial for the nodejs RabbitMQ stream client", "scripts": { + "offset-tracking-publish": "node offset_tracking_send.js", + "offset-tracking-receive": "node offset_tracking_receive.js", "send": "node send.js", "receive": "node receive.js" }, "dependencies": { - "rabbitmq-stream-js-client": "^0.3.1" + "rabbitmq-stream-js-client": "^0.4.1" } }