Skip to content

Commit

Permalink
feat: changed from stream to sorted set
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas8219 committed Aug 12, 2023
1 parent 69fa8fd commit 0a8c5b5
Showing 1 changed file with 23 additions and 20 deletions.
43 changes: 23 additions & 20 deletions src/cluster/listener.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,35 @@ export class ClusterListener extends EventEmitter {

init(){
this._addToStream(() => {
this.currentTimestamp = Date.now();
redisClient.xrange('websocket:cluster:nodes', '-', `${this.currentTimestamp}`, (_, values) => {
for(const [timestamp, [key, value]] of values){
this.emit('newHost', value);
redisClient.zrange('websocket:cluster:nodes', 0, -1, (err, values) => {
if(err){
throw err;
}
this.currentTimestamp = Date.now();
for(const host of values){
this.emit('newHost', host);
}
setInterval(() => {
const usedTimestamp = Date.now();
redisClient.zrangebyscore('websocket:cluster:nodes', `${this.currentTimestamp}`, `${usedTimestamp}`, (err, values) => {
if(err){
throw err;
}
if(values.length){
this.currentTimestamp = usedTimestamp;
}
for(const host of values){
this.emit('newHost', host);
}

})
}, 1000)
})

setInterval(() => {
const usedTimestamp = Date.now();

redisClient.xrange('websocket:cluster:nodes', `${this.currentTimestamp}`, `${usedTimestamp}`, (_, values) => {
if(values.length){
this.currentTimestamp = Date.now();
}
for(const [timestamp, [key, value]] of values){
this.emit('newHost', value);
}

})
}, 1000)
});
}

async _addToStream(cb){
const PORT = await getPubPort();
this.redisClient.xadd('websocket:cluster:nodes','*', 'host', `127.0.0.1:${PORT}`, cb);
this.currentTimestamp = Date.now();
this.redisClient.zadd('websocket:cluster:nodes', this.currentTimestamp, `127.0.0.1:${PORT}`, cb);
}
}

0 comments on commit 0a8c5b5

Please sign in to comment.