Skip to content

Commit

Permalink
Merge branch 'master'
Browse files Browse the repository at this point in the history
  • Loading branch information
weyoss committed Jan 21, 2019
2 parents 9c76d00 + 64087f1 commit 77583a2
Show file tree
Hide file tree
Showing 20 changed files with 275 additions and 112 deletions.
4 changes: 2 additions & 2 deletions src/gc.js
Expand Up @@ -19,7 +19,7 @@ function garbageCollector(dispatcher) {
const config = dispatcher.getConfig();
const events = dispatcher.getEvents();
const keys = dispatcher.getKeys();
const { keyQueueName, keyQueueNameDead, keyGCLock, keyGCLockTmp } = keys;
const { keyQueueName, keyQueueNameProcessingCommon, keyQueueNameDead, keyGCLock, keyGCLockTmp } = keys;

const logger = dispatcher.getLogger();
const lockManager = lockManagerFn(dispatcher, keyGCLock, keyGCLockTmp);
Expand Down Expand Up @@ -122,7 +122,7 @@ function garbageCollector(dispatcher) {
if (err) dispatcher.error(err);
else {
debug('Inspecting processing queues...');
util.getProcessingQueues(client, (e, result) => {
util.getProcessingQueuesOf(client, keyQueueNameProcessingCommon, (e, result) => {
if (e) dispatcher.error(e);
else if (result && result.length) {
debug(`Found [${result.length}] processing queues`);
Expand Down
111 changes: 78 additions & 33 deletions src/redis-keys.js
Expand Up @@ -23,6 +23,17 @@ const keyTypes = {
KEY_TYPE_SCHEDULER_LOCK_TMP: '7.2',
};

/**
*
* @param keys
* @return {*}
*/
function formatKeys(keys) {
const ns = `redis-smq-${namespace}`;
for (const k in keys) keys[k] = `${ns}|@${keys[k]}`;
return keys;
}

module.exports = {

/**
Expand Down Expand Up @@ -57,54 +68,88 @@ module.exports = {
return filtered;
},


/**
*
* @param dispatcher
*/
getKeys(dispatcher = null) {
let queueName = null;
let instanceId = null;
let isConsumer = false;
let isProducer = false;
if (dispatcher) {
instanceId = dispatcher.getInstanceId();
isConsumer = dispatcher.isConsumer();
isProducer = dispatcher.isProducer();
queueName = dispatcher.getQueueName();
if (queueName && queueName.indexOf(`|@${keyTypes.KEY_TYPE_MESSAGE_QUEUE}|`) > 0) {
queueName = queueName.split('|')[2].replace(/[@]/g, '');
const instanceId = dispatcher.getInstanceId();
const queueName = dispatcher.getQueueName();
if (dispatcher.isConsumer()) {
return this.getConsumerKeys(instanceId, queueName);
}
if (dispatcher.isProducer()) {
return this.getProducerKeys(instanceId, queueName);
}
}
return this.getCommonKeys();
},

/**
*
* @param instanceId
* @param queueName
* @return {*}
*/
getConsumerKeys(instanceId, queueName) {
const consumerKeys = {};
consumerKeys.keyQueueNameProcessing = `${keyTypes.KEY_TYPE_PROCESSING_QUEUE}|${queueName}|${instanceId}`;
consumerKeys.keyRateProcessing = `${keyTypes.KEY_TYPE_RATE_PROCESSING}|${queueName}|${instanceId}`;
consumerKeys.keyRateAcknowledged = `${keyTypes.KEY_TYPE_RATE_ACKNOWLEDGED}|${queueName}|${instanceId}`;
consumerKeys.keyRateUnacknowledged = `${keyTypes.KEY_TYPE_RATE_UNACKNOWLEDGED}|${queueName}|${instanceId}`;
const keys = formatKeys(consumerKeys);
Object.assign(keys, this.getQueueKeys(queueName));
Object.assign(keys, this.getCommonKeys());
return keys;
},

/**
*
* @param instanceId
* @param queueName
* @return {*}
*/
getProducerKeys(instanceId, queueName) {
const producerKeys = {};
producerKeys.keyRateInput = `${keyTypes.KEY_TYPE_RATE_INPUT}|${queueName}|${instanceId}`;
const keys = formatKeys(producerKeys);
Object.assign(keys, this.getQueueKeys(queueName));
Object.assign(keys, this.getCommonKeys());
return keys;
},

/**
*
* @param queueName
* @return {*}
*/
getQueueKeys(queueName) {
const keys = {};
keys.keyQueueName = `${keyTypes.KEY_TYPE_MESSAGE_QUEUE}|${queueName}`;
keys.keyQueueNameDelayed = `${keyTypes.KEY_TYPE_MESSAGE_QUEUE_DELAYED}|${queueName}`;
keys.keyQueueNameDead = `${keyTypes.KEY_TYPE_DEAD_LETTER_QUEUE}|${queueName}`;
keys.keyQueueNameProcessingCommon = `${keyTypes.KEY_TYPE_PROCESSING_QUEUE}|${queueName}`;
keys.keyGCLock = `${keyTypes.KEY_TYPE_GC_LOCK}|${queueName}`;
keys.keyGCLockTmp = `${keyTypes.KEY_TYPE_GC_LOCK_TMP}|${queueName}`;
keys.keySchedulerLock = `${keyTypes.KEY_TYPE_SCHEDULER_LOCK}|${queueName}`;
keys.keySchedulerLockTmp = `${keyTypes.KEY_TYPE_SCHEDULER_LOCK_TMP}|${queueName}`;
return formatKeys(keys);
},

/**
*
* @return {*}
*/
getCommonKeys() {
const keys = {};
keys.keyStatsFrontendLock = keyTypes.KEY_TYPE_STATS_FRONTEND_LOCK;
keys.keyRate = keyTypes.KEY_TYPE_RATE;
keys.keyHeartBeat = keyTypes.KEY_TYPE_HEARTBEAT;
keys.keyMessageQueuesIndex = keyTypes.KEY_TYPE_MESSAGE_QUEUES_INDEX;
keys.keyProcessingQueuesIndex = keyTypes.KEY_TYPE_PROCESSING_QUEUES_INDEX;
keys.keyDLQueuesIndex = keyTypes.KEY_TYPE_DEAD_LETTER_QUEUES_INDEX;
if (queueName) {
keys.keyQueueName = `${keyTypes.KEY_TYPE_MESSAGE_QUEUE}|${queueName}`;
keys.keyQueueNameDelayed = `${keyTypes.KEY_TYPE_MESSAGE_QUEUE_DELAYED}|${queueName}`;
keys.keyQueueNameDead = `${keyTypes.KEY_TYPE_DEAD_LETTER_QUEUE}|${queueName}`;
keys.keyQueueNameProcessingCommon = `${keyTypes.KEY_TYPE_PROCESSING_QUEUE}|${queueName}`;
keys.keyGCLock = `${keyTypes.KEY_TYPE_GC_LOCK}|${queueName}`;
keys.keyGCLockTmp = `${keyTypes.KEY_TYPE_GC_LOCK_TMP}|${queueName}`;
keys.keySchedulerLock = `${keyTypes.KEY_TYPE_SCHEDULER_LOCK}|${queueName}`;
keys.keySchedulerLockTmp = `${keyTypes.KEY_TYPE_SCHEDULER_LOCK_TMP}|${queueName}`;
if (isConsumer) {
keys.keyQueueNameProcessing = `${keyTypes.KEY_TYPE_PROCESSING_QUEUE}|${queueName}|${instanceId}`;
keys.keyRateProcessing = `${keyTypes.KEY_TYPE_RATE_PROCESSING}|${queueName}|${instanceId}`;
keys.keyRateAcknowledged = `${keyTypes.KEY_TYPE_RATE_ACKNOWLEDGED}|${queueName}|${instanceId}`;
keys.keyRateUnacknowledged = `${keyTypes.KEY_TYPE_RATE_UNACKNOWLEDGED}|${queueName}|${instanceId}`;
}
if (isProducer) {
keys.keyRateInput = `${keyTypes.KEY_TYPE_RATE_INPUT}|${queueName}|${instanceId}`;
}
}
const ns = `redis-smq-${namespace}`;
for (const k in keys) keys[k] = `${ns}|@${keys[k]}`;
return keys;
return formatKeys(keys);
},

/**
Expand Down
54 changes: 40 additions & 14 deletions src/util.js
@@ -1,6 +1,6 @@
'use strict';

const redisKeys = require('./redis-keys').getKeys();
const redisKeys = require('./redis-keys');


module.exports = {
Expand All @@ -11,30 +11,41 @@ module.exports = {
* @param cb
*/
rememberMessageQueue(redisClient, queueName, cb) {
redisClient.sadd(redisKeys.keyMessageQueuesIndex, queueName, cb);
const { keyMessageQueuesIndex } = redisKeys.getKeys();
redisClient.sadd(keyMessageQueuesIndex, queueName, cb);
},

/**
*
* @param redisClient
* @param queueName
* @param processingQueueName
* @param cb
*/
rememberProcessingQueue(redisClient, queueName, cb) {
redisClient.sadd(redisKeys.keyProcessingQueuesIndex, queueName, cb);
rememberProcessingQueue(redisClient, processingQueueName, cb) {
const { keyProcessingQueuesIndex } = redisKeys.getCommonKeys();
const { queueName, consumerId } = redisKeys.getKeySegments(processingQueueName);
const { keyQueueNameProcessingCommon } = redisKeys.getQueueKeys(queueName);
const multi = redisClient.multi();
multi.hset(keyQueueNameProcessingCommon, processingQueueName, consumerId);
multi.sadd(keyProcessingQueuesIndex, processingQueueName);
multi.exec(cb);
},

/**
*
* @param redisClient
* @param queueName
* @param processingQueueName
* @param cb
*/
purgeProcessingQueue(redisClient, queueName, cb) {
purgeProcessingQueue(redisClient, processingQueueName, cb) {
const { keyProcessingQueuesIndex } = redisKeys.getCommonKeys();
const { queueName } = redisKeys.getKeySegments(processingQueueName);
const { keyQueueNameProcessingCommon } = redisKeys.getQueueKeys(queueName);
const multi = redisClient.multi();
multi.srem(redisKeys.keyProcessingQueuesIndex, queueName);
multi.del(queueName);
multi.exec(err => cb);
multi.srem(keyProcessingQueuesIndex, processingQueueName);
multi.hdel(keyQueueNameProcessingCommon, processingQueueName);
multi.del(processingQueueName);
multi.exec(cb);
},

/**
Expand All @@ -44,7 +55,8 @@ module.exports = {
* @param cb
*/
rememberDLQueue(redisClient, queueName, cb) {
redisClient.sadd(redisKeys.keyDLQueuesIndex, queueName, cb);
const { keyDLQueuesIndex } = redisKeys.getKeys();
redisClient.sadd(keyDLQueuesIndex, queueName, cb);
},

/**
Expand All @@ -53,7 +65,8 @@ module.exports = {
* @param cb
*/
getMessageQueues(redisClient, cb) {
redisClient.smembers(redisKeys.keyMessageQueuesIndex, cb);
const { keyMessageQueuesIndex } = redisKeys.getKeys();
redisClient.smembers(keyMessageQueuesIndex, cb);
},

/**
Expand All @@ -62,7 +75,8 @@ module.exports = {
* @param cb
*/
getDLQueues(redisClient, cb) {
redisClient.smembers(redisKeys.keyDLQueuesIndex, cb);
const { keyDLQueuesIndex } = redisKeys.getKeys();
redisClient.smembers(keyDLQueuesIndex, cb);
},

/**
Expand All @@ -71,6 +85,18 @@ module.exports = {
* @param cb
*/
getProcessingQueues(redisClient, cb) {
redisClient.smembers(redisKeys.keyProcessingQueuesIndex, cb);
const { keyProcessingQueuesIndex } = redisKeys.getKeys();
redisClient.smembers(keyProcessingQueuesIndex, cb);
},

/**
*
* @param redisClient
* @param keyQueueNameProcessingCommon
* @param cb
* @return {*}
*/
getProcessingQueuesOf(redisClient, keyQueueNameProcessingCommon, cb) {
redisClient.hkeys(keyQueueNameProcessingCommon, cb);
},
};
20 changes: 0 additions & 20 deletions test/common/consumers/test-queue-consumer.js

This file was deleted.

53 changes: 35 additions & 18 deletions test/run/hooks.js
Expand Up @@ -4,48 +4,65 @@ const sinon = require('sinon');
const config = require('../common/config');
const redisSMQ = require('../../index');
const redisClient = require('./../../src/redis-client');
const TestQueueConsumer = require('./../common/consumers/test-queue-consumer');

const Consumer = redisSMQ.Consumer;
const Producer = redisSMQ.Producer;
const producer = new Producer('test_queue', config);

const client = redisClient.getNewInstance(config);
const consumersList = [];
const producersList = [];

function cleanConsumers(cb) {
if (consumersList.length) {
let consumer = consumersList.pop();
function clean(set, cb) {
if (set.length) {
let item = set.pop();
const onStopped = () => {
consumer = null;
cleanConsumers(cb);
item = null;
clean(set, cb);
};
if (consumer.isRunning()) {
consumer.on('halt', onStopped);
consumer.stop();
} else onStopped();
if (item.stop) {
if (item.isRunning()) {
item.on('halt', onStopped);
item.stop();
} else onStopped();
} else {
item.shutdown();
onStopped();
}
} else cb();
}

function getConsumer(options) {
const consumer = new TestQueueConsumer(config, options);
function getConsumer(queueName = 'test_queue', options = {}) {
const TemplateClass = class extends Consumer {
consume(message, cb) {
cb();
}
};
TemplateClass.queueName = queueName;
const consumer = new TemplateClass(config, options);
consumersList.push(consumer);
return consumer;
}

function getProducer(queueName = 'test_queue') {
const producer = new Producer(queueName, config);
producersList.push(producer);
return producer;
}

before(function (done) {
this.sandbox = sinon.sandbox.create();
this.sandbox.producer = producer;
this.sandbox.getConsumer = getConsumer;
this.sandbox.getProducer = getProducer;
done();
});

after(function (done) {
this.sandbox.producer.shutdown();
client.end(true);
done();
});

beforeEach(function (done) {
this.sandbox.restore();
this.sandbox.getConsumer = getConsumer;
client.flushall((err) => {
if (err) throw err;
done();
Expand All @@ -54,7 +71,7 @@ beforeEach(function (done) {

afterEach(function (done) {
this.timeout(40000);
cleanConsumers(() => {
done();
clean(consumersList, () => {
clean(producersList, done);
});
});
2 changes: 1 addition & 1 deletion test/run/test02.js
Expand Up @@ -11,7 +11,7 @@ describe('Test 2: Produce and consume 1 message', function() {

it('is OK', function (done) {
this.timeout(20000);
const producer = this.sandbox.producer;
const producer = this.sandbox.getProducer();
const consumer = this.sandbox.getConsumer();
const consume = this.sandbox.spy(consumer, 'consume');
consumer.once('idle', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/run/test03.js
Expand Up @@ -11,7 +11,7 @@ describe('Test 3: Produce and consume 100 messages', function() {

it('is OK', function (done) {
this.timeout(20000);
const producer = this.sandbox.producer;
const producer = this.sandbox.getProducer();
const consumer = this.sandbox.getConsumer();
const consume = this.sandbox.spy(consumer, 'consume');

Expand Down

0 comments on commit 77583a2

Please sign in to comment.