/
consumer.js
60 lines (50 loc) · 1.7 KB
/
consumer.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
'use strict';
var async = require('async');
var redis = require('redis');
var redisClient = redis.createClient();
const STREAMS_KEY = "weather_sensor:wind";
const APPLICATION_ID = "iot_application:node_1";
var CONSUMER_ID = "consumer:1"
// create the group
redisClient.xgroup("CREATE", STREAMS_KEY, APPLICATION_ID, '$', function(err) {
if (err) {
if (err.code == 'BUSYGROUP' ) {
console.log(`Group ${APPLICATION_ID} already exists`);
} else {
console.log(err);
process.exit();
}
}
});
async.forever(
function(next) {
redisClient.xreadgroup('GROUP', APPLICATION_ID, CONSUMER_ID, 'BLOCK', 500, 'STREAMS', STREAMS_KEY , '>', function (err, stream) {
if (err) {
console.error(err);
next(err);
}
if (stream) {
var messages = stream[0][1];
// print all messages
messages.forEach(function(message){
// convert the message into a JSON Object
var id = message[0];
var values = message[1];
var msgObject = { id : id};
for (var i = 0 ; i < values.length ; i=i+2) {
msgObject[values[i]] = values[i+1];
}
console.log( "Message: "+ JSON.stringify(msgObject));
});
} else {
// No message in the consumer buffer
console.log("No new message...");
}
next();
});
},
function(err) {
console.log(" ERROR " + err);
process.exit()
}
);