Skip to content

Commit

Permalink
Fix type errors
Browse files Browse the repository at this point in the history
  • Loading branch information
mugli committed Aug 16, 2019
1 parent 5720171 commit f326469
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 120 deletions.
41 changes: 40 additions & 1 deletion src/commands.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,45 @@
import * as IORedis from 'ioredis';
import IORedis from 'ioredis';
import { defaultOptions } from './defaults';

interface Pipeline extends IORedis.Pipeline {
requeue(
qname: string,
dedupSet: string,
groupName: string,
taskId: string,
taskData: string,
dedupKey: string | null,
retryCount: number
): Pipeline;
dequeue(qname: string, dedupSet: string, groupName: string, taskId: string, taskDedupkey: string): Pipeline;
}

export interface Redis extends IORedis.Redis {
enqueue(
qname: string,
dedupSet: string,
data: string,
dedupKey: string | null,
retryCount: number
): Promise<string | null>;

requeue(
qname: string,
dedupSet: string,
groupName: string,
taskId: string,
taskData: string,
dedupKey: string | null,
retryCount: number
): Promise<string | null>;

dequeue(qname: string, dedupSet: string, groupName: string, taskId: string, taskDedupkey: string): Promise<null>;

delconsumer(qname: string, groupName: string, consumerName: string): Promise<null>;

pipeline(commands?: string[][]): Pipeline;
}

export function initScripts(redis: IORedis.Redis) {
const pArr = [];

Expand Down
156 changes: 97 additions & 59 deletions src/consumer-unit.ts
Original file line number Diff line number Diff line change
@@ -1,37 +1,71 @@
const IORedis = require('ioredis');
const lodash = require('lodash');
const shortid = require('shortid');

const { ReplyError } = require('redis-errors');
const initScripts = require('./commands');
const { waitUntilInitialized } = require('./common');
const { parseStreamResponse, parseXPendingResponse } = require('./redis-transformers');
const Task = require('./task');
const { TimeoutError } = require('./errors');
import IORedis from 'ioredis';
import lodash from 'lodash';
import * as shortid from 'shortid';

import { ReplyError } from 'redis-errors';
import { initScripts, Redis } from './commands';
import { waitUntilInitialized } from './common';
import { parseStreamResponse, parseXPendingResponse, StreamValue } from './redis-transformers';
import { Task } from './task';
import { TimeoutError } from './errors';

import { defaultOptions, LoggingOptions, ConsumerOptions } from './defaults';

export interface ConsumerUnitOptions {
redisOptions?: IORedis.RedisOptions;
redisClient?: IORedis.Redis;
loggingOptions?: LoggingOptions;
consumerOptions?: ConsumerOptions;
}

const defaults = require('./defaults');
export interface Metadata {
id: string;
qname: string;
retryCount: number;
consumerName: string;
}

class ConsumerUnit {
constructor(qname, workerFn, { consumerOptions, redisOptions, redisClient, loggingOptions } = {}) {
export class ConsumerUnit {
_paused: boolean;
_QNAME: string;
_DEDUPSET: string;
qname: string;
_GRPNAME: string;
workerFn: Function;
_pendingTasks: Task[];
_totalTasks: number;
consumerOptions: ConsumerOptions;
loggingOptions: LoggingOptions;
_redis: Redis;
redisOptions: IORedis.RedisOptions = defaultOptions.redisOptions;
_name: string = '';
_isInitialized: boolean = false;
_loopStarted: boolean = false;

constructor(
qname: string,
workerFn: Function,
{ consumerOptions, redisOptions, redisClient, loggingOptions }: ConsumerUnitOptions = {}
) {
this._paused = true;

this._QNAME = `${defaults.NAMESPACE}:queue:${qname}`;
this._DEDUPSET = `${defaults.NAMESPACE}:queue:${qname}:dedupset`;
this._QNAME = `${defaultOptions.NAMESPACE}:queue:${qname}`;
this._DEDUPSET = `${defaultOptions.NAMESPACE}:queue:${qname}:dedupset`;
this.qname = qname;
this._GRPNAME = `${defaults.NAMESPACE}:queue:${qname}:cg`;
this._GRPNAME = `${defaultOptions.NAMESPACE}:queue:${qname}:cg`;

this.workerFn = workerFn;
this._pendingTasks = [];
this._totalTasks = 0;

this.consumerOptions = lodash.merge({}, defaults.consumerOptions, consumerOptions);
this.loggingOptions = lodash.merge({}, defaults.loggingOptions, loggingOptions);
this.consumerOptions = lodash.merge({}, defaultOptions.consumerOptions, consumerOptions);
this.loggingOptions = lodash.merge({}, defaultOptions.loggingOptions, loggingOptions);

if (redisClient) {
this._redis = redisClient.duplicate();
this._redis = redisClient.duplicate() as Redis;
} else {
this.redisOptions = lodash.merge({}, defaults.redisOptions, redisOptions);
this._redis = new IORedis(this.redisOptions);
this.redisOptions = lodash.merge({}, defaultOptions.redisOptions, redisOptions);
this._redis = new IORedis(this.redisOptions) as Redis;
}

this._initialize();
Expand All @@ -54,9 +88,9 @@ class ConsumerUnit {
this.start();
}

_log(...msg) {
if (this.loggingOptions.enabled) {
this.loggingOptions.loggerFn(`Orkid :: ${this._name}`, ...msg);
_log(msg: string, ...optionalParams: any[]) {
if (this.loggingOptions.enabled && this.loggingOptions.loggerFn) {
this.loggingOptions.loggerFn(`Orkid :: ${this._name}`, msg, ...optionalParams);
}
}

Expand Down Expand Up @@ -110,7 +144,8 @@ class ConsumerUnit {
);

const taskObj = parseStreamResponse(redisReply);
const tasks = [].concat(...Object.values(taskObj));
//@ts-ignore
const tasks: StreamValue[] = [].concat(...Object.values(taskObj));

for (const t of tasks) {
const task = new Task(t.id, t.data);
Expand Down Expand Up @@ -151,7 +186,7 @@ class ConsumerUnit {
*/
async _cleanUp() {
/* Returns items that are present in setA but not in setB */
function difference(setA, setB) {
function difference(setA: Set<string>, setB: Set<string>) {
const _difference = new Set(setA);
for (const elem of setB) {
_difference.delete(elem);
Expand All @@ -162,32 +197,32 @@ class ConsumerUnit {
// xinfo: https://redis.io/commands/xinfo
// Get the list of every consumer in a specific consumer group
const info = await this._redis.xinfo('CONSUMERS', this._QNAME, this._GRPNAME);
const consumerInfo = {};
const consumerInfo: Record<string, any> = {};
for (const inf of info) {
const data = {};
const data: Record<string, any> = {};
for (let i = 0; i < inf.length; i += 2) {
data[inf[i]] = inf[i + 1];
}
consumerInfo[inf[1]] = data;
}

const consumerNames = Object.keys(consumerInfo);
const pendingConsumerNames = new Set();
const emptyConsumerNames = new Set();
const pendingConsumerNames: Set<string> = new Set();
const emptyConsumerNames: Set<string> = new Set();

// Separate consumers with some pending tasks and no pending tasks
for (const con of consumerNames) {
if (consumerInfo[con].pending) {
pendingConsumerNames.add(con);
} else if (consumerInfo[con].idle > this.consumerOptions.workerFnTimeoutMs * 5) {
} else if (consumerInfo[con].idle > <number>this.consumerOptions.workerFnTimeoutMs * 5) {
// Just to be safe, only delete really old consumers
emptyConsumerNames.add(con);
}
}

// https://redis.io/commands/client-list
const clients = (await this._redis.client('LIST')).split('\n');
const activeWorkers = new Set();
const activeWorkers: Set<string> = new Set();

// Orkid consumers always set a name to redis connection
// Filter active connections those have names
Expand All @@ -209,22 +244,27 @@ class ConsumerUnit {
// Workers that have not pending tasks and also are not active anymore in redis
const orphanEmptyWorkers = difference(emptyConsumerNames, activeWorkers);

const claimInfo = {};
const claimInfo: Record<string, number> = {};
for (const w of orphanWorkers) {
// xpending: https://redis.io/commands/xpending
const redisXPendingReply = await this._redis.xpending(this._QNAME, this._GRPNAME, '-', '+', 1000, w);
const pendingTasks = parseXPendingResponse(redisXPendingReply);

const ids = pendingTasks.map(t => t.id);
let ids: string[] = [];
if (Array.isArray(pendingTasks)) {
ids = pendingTasks.map(t => t.id);
}

// xclaim: https://redis.io/commands/xclaim
const claim = await this._redis.xclaim(
this._QNAME,
this._GRPNAME,
this._name,
this.consumerOptions.workerFnTimeoutMs * 2,
...ids,
'JUSTID'
const claim = <Array<any>>(
await this._redis.xclaim(
this._QNAME,
this._GRPNAME,
this._name,
<number>this.consumerOptions.workerFnTimeoutMs * 2,
...ids,
'JUSTID'
)
);

claimInfo[w] = claim.length;
Expand Down Expand Up @@ -285,12 +325,12 @@ class ConsumerUnit {
return;
}

const task = this._pendingTasks.shift();
const task = this._pendingTasks.shift() as Task;
this._log('Starting to process task', task);
this._totalTasks++;

// TODO: Update queue specific total processed stat
await this._redis.hincrby(defaults.STAT, 'processed', 1);
await this._redis.hincrby(defaultOptions.STAT, 'processed', 1);

const metadata = { id: task.id, qname: this.qname, retryCount: task.retryCount, consumerName: this._name };
try {
Expand All @@ -307,7 +347,7 @@ class ConsumerUnit {
}
}

async _processSuccess(task, result) {
async _processSuccess(task: Task, result: any) {
this._log(`Worker ${task.id} returned`, result);

const resultVal = JSON.stringify({
Expand All @@ -324,12 +364,12 @@ class ConsumerUnit {
await this._redis
.pipeline()
.dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue
.lpush(defaults.RESULTLIST, resultVal)
.ltrim(defaults.RESULTLIST, 0, defaults.queueOptions.maxResultListSize - 1)
.lpush(defaultOptions.RESULTLIST, resultVal)
.ltrim(defaultOptions.RESULTLIST, 0, <number>defaultOptions.queueOptions.maxResultListSize - 1)
.exec();
}

async _processFailure(task, error) {
async _processFailure(task: Task, error: Error) {
const info = JSON.stringify({
id: task.id,
qname: this.qname,
Expand All @@ -344,40 +384,40 @@ class ConsumerUnit {
at: new Date().toISOString()
});

if (task.retryCount < this.consumerOptions.maxRetry) {
if (task.retryCount < <number>this.consumerOptions.maxRetry) {
task.incrRetry();
// Send again to the queue
await this._redis
.pipeline()
.requeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dataString, task.dedupKey, task.retryCount)
.hincrby(defaults.STAT, 'retries', 1)
.hincrby(defaultOptions.STAT, 'retries', 1)
.exec();
// TODO: Update queue specific total retries stat
} else {
// Move to deadlist
await this._redis
.pipeline()
.dequeue(this._QNAME, this._DEDUPSET, this._GRPNAME, task.id, task.dedupKey) // Remove from queue
.lpush(defaults.DEADLIST, info)
.ltrim(defaults.DEADLIST, 0, defaults.queueOptions.maxDeadListSize - 1)
.hincrby(defaults.STAT, 'dead', 1)
.lpush(defaultOptions.DEADLIST, info)
.ltrim(defaultOptions.DEADLIST, 0, <number>defaultOptions.queueOptions.maxDeadListSize - 1)
.hincrby(defaultOptions.STAT, 'dead', 1)
.exec();
// TODO: Update queue specific total dead stat
}

// Add to failed list in all cases
await this._redis
.pipeline()
.lpush(defaults.FAILEDLIST, info)
.ltrim(defaults.FAILEDLIST, 0, defaults.queueOptions.maxFailedListSize - 1)
.hincrby(defaults.STAT, 'failed', 1)
.lpush(defaultOptions.FAILEDLIST, info)
.ltrim(defaultOptions.FAILEDLIST, 0, <number>defaultOptions.queueOptions.maxFailedListSize - 1)
.hincrby(defaultOptions.STAT, 'failed', 1)
.exec();
// TODO: Update queue specific total failed stat
}

_wrapWorkerFn(data, metadata) {
_wrapWorkerFn(data: any, metadata: Metadata) {
const timeoutMs = this.consumerOptions.workerFnTimeoutMs;
const timeoutP = new Promise((resolve, reject) => {
const timeoutP = new Promise((_, reject) => {
const to = setTimeout(() => {
clearTimeout(to);
reject(new TimeoutError(`Task timed out after ${timeoutMs}ms`));
Expand All @@ -395,5 +435,3 @@ class ConsumerUnit {
await this._redis.disconnect();
}
}

module.exports = ConsumerUnit;
22 changes: 12 additions & 10 deletions src/consumer.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
const lodash = require('lodash');
const ConsumerUnit = require('./consumer-unit');
import * as lodash from 'lodash';
import { ConsumerUnit, ConsumerUnitOptions } from './consumer-unit';

const defaults = require('./defaults');
import { defaultOptions, ConsumerOptions } from './defaults';

const { InvalidConfigError } = require('./errors');
import { InvalidConfigError } from './errors';

class Consumer {
constructor(qname, workerFn, options = {}) {
this.consumerOptions = lodash.merge({}, defaults.consumerOptions, options.consumerOptions);
this.concurrency = this.consumerOptions.concurrencyPerInstance;
export class Consumer {
consumerOptions: ConsumerOptions;
concurrency: number;
consumers: ConsumerUnit[];

constructor(qname: string, workerFn: Function, options: ConsumerUnitOptions = {}) {
this.consumerOptions = lodash.merge({}, defaultOptions.consumerOptions, options.consumerOptions);
this.concurrency = this.consumerOptions.concurrencyPerInstance as number;

if (this.concurrency < 1) {
throw new InvalidConfigError('Concurrency cannot be less than 1');
Expand Down Expand Up @@ -43,5 +47,3 @@ class Consumer {
}
}
}

module.exports = Consumer;

0 comments on commit f326469

Please sign in to comment.