diff --git a/src/commands.ts b/src/commands.ts index f573903..28b55cb 100644 --- a/src/commands.ts +++ b/src/commands.ts @@ -1,6 +1,45 @@ -import * as IORedis from 'ioredis'; +import IORedis from 'ioredis'; import { defaultOptions } from './defaults'; +interface Pipeline extends IORedis.Pipeline { + requeue( + qname: string, + dedupSet: string, + groupName: string, + taskId: string, + taskData: string, + dedupKey: string | null, + retryCount: number + ): Pipeline; + dequeue(qname: string, dedupSet: string, groupName: string, taskId: string, taskDedupkey: string): Pipeline; +} + +export interface Redis extends IORedis.Redis { + enqueue( + qname: string, + dedupSet: string, + data: string, + dedupKey: string | null, + retryCount: number + ): Promise; + + requeue( + qname: string, + dedupSet: string, + groupName: string, + taskId: string, + taskData: string, + dedupKey: string | null, + retryCount: number + ): Promise; + + dequeue(qname: string, dedupSet: string, groupName: string, taskId: string, taskDedupkey: string): Promise; + + delconsumer(qname: string, groupName: string, consumerName: string): Promise; + + pipeline(commands?: string[][]): Pipeline; +} + export function initScripts(redis: IORedis.Redis) { const pArr = []; diff --git a/src/consumer-unit.ts b/src/consumer-unit.ts index 1023ccb..12685de 100644 --- a/src/consumer-unit.ts +++ b/src/consumer-unit.ts @@ -1,37 +1,71 @@ -const IORedis = require('ioredis'); -const lodash = require('lodash'); -const shortid = require('shortid'); - -const { ReplyError } = require('redis-errors'); -const initScripts = require('./commands'); -const { waitUntilInitialized } = require('./common'); -const { parseStreamResponse, parseXPendingResponse } = require('./redis-transformers'); -const Task = require('./task'); -const { TimeoutError } = require('./errors'); +import IORedis from 'ioredis'; +import lodash from 'lodash'; +import * as shortid from 'shortid'; + +import { ReplyError } from 'redis-errors'; +import { initScripts, Redis } from './commands'; +import { waitUntilInitialized } from './common'; +import { parseStreamResponse, parseXPendingResponse, StreamValue } from './redis-transformers'; +import { Task } from './task'; +import { TimeoutError } from './errors'; + +import { defaultOptions, LoggingOptions, ConsumerOptions } from './defaults'; + +export interface ConsumerUnitOptions { + redisOptions?: IORedis.RedisOptions; + redisClient?: IORedis.Redis; + loggingOptions?: LoggingOptions; + consumerOptions?: ConsumerOptions; +} -const defaults = require('./defaults'); +export interface Metadata { + id: string; + qname: string; + retryCount: number; + consumerName: string; +} -class ConsumerUnit { - constructor(qname, workerFn, { consumerOptions, redisOptions, redisClient, loggingOptions } = {}) { +export class ConsumerUnit { + _paused: boolean; + _QNAME: string; + _DEDUPSET: string; + qname: string; + _GRPNAME: string; + workerFn: Function; + _pendingTasks: Task[]; + _totalTasks: number; + consumerOptions: ConsumerOptions; + loggingOptions: LoggingOptions; + _redis: Redis; + redisOptions: IORedis.RedisOptions = defaultOptions.redisOptions; + _name: string = ''; + _isInitialized: boolean = false; + _loopStarted: boolean = false; + + constructor( + qname: string, + workerFn: Function, + { consumerOptions, redisOptions, redisClient, loggingOptions }: ConsumerUnitOptions = {} + ) { this._paused = true; - this._QNAME = `${defaults.NAMESPACE}:queue:${qname}`; - this._DEDUPSET = `${defaults.NAMESPACE}:queue:${qname}:dedupset`; + this._QNAME = `${defaultOptions.NAMESPACE}:queue:${qname}`; + this._DEDUPSET = `${defaultOptions.NAMESPACE}:queue:${qname}:dedupset`; this.qname = qname; - this._GRPNAME = `${defaults.NAMESPACE}:queue:${qname}:cg`; + this._GRPNAME = `${defaultOptions.NAMESPACE}:queue:${qname}:cg`; this.workerFn = workerFn; this._pendingTasks = []; this._totalTasks = 0; - this.consumerOptions = lodash.merge({}, defaults.consumerOptions, consumerOptions); - this.loggingOptions = lodash.merge({}, defaults.loggingOptions, loggingOptions); + this.consumerOptions = lodash.merge({}, defaultOptions.consumerOptions, consumerOptions); + this.loggingOptions = lodash.merge({}, defaultOptions.loggingOptions, loggingOptions); if (redisClient) { - this._redis = redisClient.duplicate(); + this._redis = redisClient.duplicate() as Redis; } else { - this.redisOptions = lodash.merge({}, defaults.redisOptions, redisOptions); - this._redis = new IORedis(this.redisOptions); + this.redisOptions = lodash.merge({}, defaultOptions.redisOptions, redisOptions); + this._redis = new IORedis(this.redisOptions) as Redis; } this._initialize(); @@ -54,9 +88,9 @@ class ConsumerUnit { this.start(); } - _log(...msg) { - if (this.loggingOptions.enabled) { - this.loggingOptions.loggerFn(`Orkid :: ${this._name}`, ...msg); + _log(msg: string, ...optionalParams: any[]) { + if (this.loggingOptions.enabled && this.loggingOptions.loggerFn) { + this.loggingOptions.loggerFn(`Orkid :: ${this._name}`, msg, ...optionalParams); } } @@ -110,7 +144,8 @@ class ConsumerUnit { ); const taskObj = parseStreamResponse(redisReply); - const tasks = [].concat(...Object.values(taskObj)); + //@ts-ignore + const tasks: StreamValue[] = [].concat(...Object.values(taskObj)); for (const t of tasks) { const task = new Task(t.id, t.data); @@ -151,7 +186,7 @@ class ConsumerUnit { */ async _cleanUp() { /* Returns items that are present in setA but not in setB */ - function difference(setA, setB) { + function difference(setA: Set, setB: Set) { const _difference = new Set(setA); for (const elem of setB) { _difference.delete(elem); @@ -162,9 +197,9 @@ class ConsumerUnit { // xinfo: https://redis.io/commands/xinfo // Get the list of every consumer in a specific consumer group const info = await this._redis.xinfo('CONSUMERS', this._QNAME, this._GRPNAME); - const consumerInfo = {}; + const consumerInfo: Record = {}; for (const inf of info) { - const data = {}; + const data: Record = {}; for (let i = 0; i < inf.length; i += 2) { data[inf[i]] = inf[i + 1]; } @@ -172,14 +207,14 @@ class ConsumerUnit { } const consumerNames = Object.keys(consumerInfo); - const pendingConsumerNames = new Set(); - const emptyConsumerNames = new Set(); + const pendingConsumerNames: Set = new Set(); + const emptyConsumerNames: Set = new Set(); // Separate consumers with some pending tasks and no pending tasks for (const con of consumerNames) { if (consumerInfo[con].pending) { pendingConsumerNames.add(con); - } else if (consumerInfo[con].idle > this.consumerOptions.workerFnTimeoutMs * 5) { + } else if (consumerInfo[con].idle > this.consumerOptions.workerFnTimeoutMs * 5) { // Just to be safe, only delete really old consumers emptyConsumerNames.add(con); } @@ -187,7 +222,7 @@ class ConsumerUnit { // https://redis.io/commands/client-list const clients = (await this._redis.client('LIST')).split('\n'); - const activeWorkers = new Set(); + const activeWorkers: Set = new Set(); // Orkid consumers always set a name to redis connection // Filter active connections those have names @@ -209,22 +244,27 @@ class ConsumerUnit { // Workers that have not pending tasks and also are not active anymore in redis const orphanEmptyWorkers = difference(emptyConsumerNames, activeWorkers); - const claimInfo = {}; + const claimInfo: Record = {}; for (const w of orphanWorkers) { // xpending: https://redis.io/commands/xpending const redisXPendingReply = await this._redis.xpending(this._QNAME, this._GRPNAME, '-', '+', 1000, w); const pendingTasks = parseXPendingResponse(redisXPendingReply); - const ids = pendingTasks.map(t => t.id); + let ids: string[] = []; + if (Array.isArray(pendingTasks)) { + ids = pendingTasks.map(t => t.id); + } // xclaim: https://redis.io/commands/xclaim - const claim = await this._redis.xclaim( - this._QNAME, - this._GRPNAME, - this._name, - this.consumerOptions.workerFnTimeoutMs * 2, - ...ids, - 'JUSTID' + const claim = >( + await this._redis.xclaim( + this._QNAME, + this._GRPNAME, + this._name, + this.consumerOptions.workerFnTimeoutMs * 2, + ...ids, + 'JUSTID' + ) ); claimInfo[w] = claim.length; @@ -285,12 +325,12 @@ class ConsumerUnit { return; } - const task = this._pendingTasks.shift(); + const task = this._pendingTasks.shift() as Task; this._log('Starting to process task', task); this._totalTasks++; // TODO: Update queue specific total processed stat - await this._redis.hincrby(defaults.STAT, 'processed', 1); + await this._redis.hincrby(defaultOptions.STAT, 'processed', 1); const metadata = { id: task.id, qname: this.qname, retryCount: task.retryCount, consumerName: this._name }; try { @@ -307,7 +347,7 @@ class ConsumerUnit { } } - async _processSuccess(task, result) { + async _processSuccess(task: Task, result: any) { this._log(`Worker ${task.id} returned`, result); const resultVal = JSON.stringify({ @@ -324,12 +364,12 @@ class ConsumerUnit { await this._redis .pipeline() .dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue - .lpush(defaults.RESULTLIST, resultVal) - .ltrim(defaults.RESULTLIST, 0, defaults.queueOptions.maxResultListSize - 1) + .lpush(defaultOptions.RESULTLIST, resultVal) + .ltrim(defaultOptions.RESULTLIST, 0, defaultOptions.queueOptions.maxResultListSize - 1) .exec(); } - async _processFailure(task, error) { + async _processFailure(task: Task, error: Error) { const info = JSON.stringify({ id: task.id, qname: this.qname, @@ -344,13 +384,13 @@ class ConsumerUnit { at: new Date().toISOString() }); - if (task.retryCount < this.consumerOptions.maxRetry) { + if (task.retryCount < this.consumerOptions.maxRetry) { task.incrRetry(); // Send again to the queue await this._redis .pipeline() .requeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dataString, task.dedupKey, task.retryCount) - .hincrby(defaults.STAT, 'retries', 1) + .hincrby(defaultOptions.STAT, 'retries', 1) .exec(); // TODO: Update queue specific total retries stat } else { @@ -358,9 +398,9 @@ class ConsumerUnit { await this._redis .pipeline() .dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue - .lpush(defaults.DEADLIST, info) - .ltrim(defaults.DEADLIST, 0, defaults.queueOptions.maxDeadListSize - 1) - .hincrby(defaults.STAT, 'dead', 1) + .lpush(defaultOptions.DEADLIST, info) + .ltrim(defaultOptions.DEADLIST, 0, defaultOptions.queueOptions.maxDeadListSize - 1) + .hincrby(defaultOptions.STAT, 'dead', 1) .exec(); // TODO: Update queue specific total dead stat } @@ -368,16 +408,16 @@ class ConsumerUnit { // Add to failed list in all cases await this._redis .pipeline() - .lpush(defaults.FAILEDLIST, info) - .ltrim(defaults.FAILEDLIST, 0, defaults.queueOptions.maxFailedListSize - 1) - .hincrby(defaults.STAT, 'failed', 1) + .lpush(defaultOptions.FAILEDLIST, info) + .ltrim(defaultOptions.FAILEDLIST, 0, defaultOptions.queueOptions.maxFailedListSize - 1) + .hincrby(defaultOptions.STAT, 'failed', 1) .exec(); // TODO: Update queue specific total failed stat } - _wrapWorkerFn(data, metadata) { + _wrapWorkerFn(data: any, metadata: Metadata) { const timeoutMs = this.consumerOptions.workerFnTimeoutMs; - const timeoutP = new Promise((resolve, reject) => { + const timeoutP = new Promise((_, reject) => { const to = setTimeout(() => { clearTimeout(to); reject(new TimeoutError(`Task timed out after ${timeoutMs}ms`)); @@ -395,5 +435,3 @@ class ConsumerUnit { await this._redis.disconnect(); } } - -module.exports = ConsumerUnit; diff --git a/src/consumer.ts b/src/consumer.ts index 5acc883..3f25d3c 100644 --- a/src/consumer.ts +++ b/src/consumer.ts @@ -1,14 +1,18 @@ -const lodash = require('lodash'); -const ConsumerUnit = require('./consumer-unit'); +import * as lodash from 'lodash'; +import { ConsumerUnit, ConsumerUnitOptions } from './consumer-unit'; -const defaults = require('./defaults'); +import { defaultOptions, ConsumerOptions } from './defaults'; -const { InvalidConfigError } = require('./errors'); +import { InvalidConfigError } from './errors'; -class Consumer { - constructor(qname, workerFn, options = {}) { - this.consumerOptions = lodash.merge({}, defaults.consumerOptions, options.consumerOptions); - this.concurrency = this.consumerOptions.concurrencyPerInstance; +export class Consumer { + consumerOptions: ConsumerOptions; + concurrency: number; + consumers: ConsumerUnit[]; + + constructor(qname: string, workerFn: Function, options: ConsumerUnitOptions = {}) { + this.consumerOptions = lodash.merge({}, defaultOptions.consumerOptions, options.consumerOptions); + this.concurrency = this.consumerOptions.concurrencyPerInstance as number; if (this.concurrency < 1) { throw new InvalidConfigError('Concurrency cannot be less than 1'); @@ -43,5 +47,3 @@ class Consumer { } } } - -module.exports = Consumer; diff --git a/src/errors.ts b/src/errors.ts index 1411e88..6254e7b 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -1,18 +1,13 @@ -class TimeoutError extends Error { - constructor(...args) { +export class TimeoutError extends Error { + constructor(...args: string[]) { super(...args); Error.captureStackTrace(this, TimeoutError); } } -class InvalidConfigError extends Error { - constructor(...args) { +export class InvalidConfigError extends Error { + constructor(...args: string[]) { super(...args); Error.captureStackTrace(this, InvalidConfigError); } } - -module.exports = { - InvalidConfigError, - TimeoutError -}; diff --git a/src/index.ts b/src/index.ts index 260ad11..b5a1546 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,4 @@ -const Producer = require('./producer'); -const Consumer = require('./consumer'); +import { Producer } from './producer'; +import { Consumer } from './consumer'; -module.exports = { - Producer, - Consumer -}; +export { Producer, Consumer }; diff --git a/src/producer.ts b/src/producer.ts index feffbe0..d3fd86e 100644 --- a/src/producer.ts +++ b/src/producer.ts @@ -1,7 +1,7 @@ import IORedis from 'ioredis'; import * as lodash from 'lodash'; -import { initScripts } from './commands'; +import { initScripts, Redis } from './commands'; import { waitUntilInitialized } from './common'; import { defaultOptions } from './defaults'; @@ -11,28 +11,18 @@ export interface ProducerOptions { redisClient?: IORedis.Redis; } -interface Redis extends IORedis.Redis { - enqueue( - qname: string, - dedupSet: string, - data: string, - dedupKey: string | null, - retryCount: number - ): Promise; -} - export class Producer { _redis: Redis; _QNAME: string; _DEDUPSET: string; _isInitialized: boolean = false; - _redisOptions: IORedis.RedisOptions = defaults.redisOptions; + _redisOptions: IORedis.RedisOptions = defaultOptions.redisOptions; constructor(qname: string, { redisOptions, redisClient }: ProducerOptions = {}) { if (redisClient) { this._redis = redisClient.duplicate() as Redis; } else { - this._redisOptions = lodash.merge({}, defaults.redisOptions, redisOptions); + this._redisOptions = lodash.merge({}, defaultOptions.redisOptions, redisOptions); this._redis = new IORedis(this._redisOptions) as Redis; } diff --git a/src/redis-transformers.ts b/src/redis-transformers.ts index 1056208..d5657f4 100644 --- a/src/redis-transformers.ts +++ b/src/redis-transformers.ts @@ -2,21 +2,22 @@ Source: https://github.com/luin/ioredis/issues/747#issuecomment-500735545 */ -function parseObjectResponse(reply) { +function parseObjectResponse(reply: Array) { if (!Array.isArray(reply)) { return reply; } - const data = {}; + const data: Record = {}; for (let i = 0; i < reply.length; i += 2) { data[reply[i]] = reply[i + 1]; } return data; } -function parseMessageResponse(reply) { +function parseMessageResponse(reply: Array): StreamValue[] { if (!Array.isArray(reply)) { return []; } + return reply.map(message => { return { id: message[0], @@ -25,45 +26,52 @@ function parseMessageResponse(reply) { }); } -function parseStreamResponse(reply) { +export interface StreamValue { + id: string; + data: any; +} + +function parseStreamResponse(reply: Array) { if (!Array.isArray(reply)) { return reply; } - const object = {}; + const object: Record = {}; for (const stream of reply) { object[stream[0]] = parseMessageResponse(stream[1]); } + return object; } -const parseXPendingResponse = reply => { +const parseXPendingResponse = (reply: Array) => { if (!reply || reply.length === 0) { return []; } - if (reply.length === 4 && !Number.isNaN(reply[0])) + + const consumers: Array> = (reply[3] || []).map((consumer: Array) => { + return { + name: consumer[0], + count: parseInt(consumer[1]) + }; + }); + + if (reply.length === 4 && !Number.isNaN(reply[0])) { return { count: parseInt(reply[0]), minId: reply[1], maxId: reply[2], - consumers: (reply[3] || []).map(consumer => { - return { - name: consumer[0], - count: parseInt(consumer[1]) - }; - }) + consumers }; + } + return reply.map(message => { return { - id: message[0], - consumerName: message[1], - elapsedMilliseconds: parseInt(message[2]), - deliveryCount: parseInt(message[3]) + id: message[0], + consumerName: message[1], + elapsedMilliseconds: parseInt(message[2]), + deliveryCount: parseInt(message[3]) }; }); }; -export = { - parseStreamResponse, - parseMessageResponse, - parseXPendingResponse -}; +export { parseStreamResponse, parseMessageResponse, parseXPendingResponse };