Skip to content

Commit

Permalink
Merge branch 'master' into fix/fix-tslib-dep
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed Dec 21, 2019
2 parents 17c7378 + d8bf244 commit 01818b9
Show file tree
Hide file tree
Showing 34 changed files with 399 additions and 235 deletions.
53 changes: 53 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,56 @@
## [1.6.2](https://github.com/taskforcesh/bullmq/compare/v1.6.1...v1.6.2) (2019-12-16)


### Bug Fixes

* change default QueueEvents lastEventId to $ ([3c5b01d](https://github.com/taskforcesh/bullmq/commit/3c5b01d16ee1442f5802a0fe4e7675c14f7a7f1f))
* ensure QE ready before adding test events ([fd190f4](https://github.com/taskforcesh/bullmq/commit/fd190f4be792b03273481c8aaf73be5ca42663d1))
* explicitly test the behavior of .on and .once ([ea11087](https://github.com/taskforcesh/bullmq/commit/ea11087b292d9325105707b53f92ac61c334a147))

## [1.6.1](https://github.com/taskforcesh/bullmq/compare/v1.6.0...v1.6.1) (2019-12-16)


### Bug Fixes

* check of existing redis instance ([dd466b3](https://github.com/taskforcesh/bullmq/commit/dd466b332b03b430108126531d59ff9e66ce9521))

# [1.6.0](https://github.com/taskforcesh/bullmq/compare/v1.5.0...v1.6.0) (2019-12-12)


### Features

* add generic type to job data and return value ([87c0531](https://github.com/taskforcesh/bullmq/commit/87c0531efc2716db37f8a0886848cdb786709554))

# [1.5.0](https://github.com/taskforcesh/bullmq/compare/v1.4.3...v1.5.0) (2019-11-22)


### Features

* remove delay dependency ([97e1a30](https://github.com/taskforcesh/bullmq/commit/97e1a3015d853e615ddd623af07f12a194ccab2c))
* remove dependence on Bluebird.delay [#67](https://github.com/taskforcesh/bullmq/issues/67) ([bedbaf2](https://github.com/taskforcesh/bullmq/commit/bedbaf25af6479e387cd7548e246dca7c72fc140))

## [1.4.3](https://github.com/taskforcesh/bullmq/compare/v1.4.2...v1.4.3) (2019-11-21)


### Bug Fixes

* check in moveToFinished to use default val for opts.maxLenEvents ([d1118aa](https://github.com/taskforcesh/bullmq/commit/d1118aab77f755b4a65e3dd8ea2e195baf3d2602))

## [1.4.2](https://github.com/taskforcesh/bullmq/compare/v1.4.1...v1.4.2) (2019-11-21)


### Bug Fixes

* avoid Job<->Queue circular json error ([5752727](https://github.com/taskforcesh/bullmq/commit/5752727a6294e1b8d35f6a49e4953375510e10e6))
* avoid the .toJSON serializer interface [#70](https://github.com/taskforcesh/bullmq/issues/70) ([5941b82](https://github.com/taskforcesh/bullmq/commit/5941b82b646e46d53970197a404e5ea54f09d008))

## [1.4.1](https://github.com/taskforcesh/bullmq/compare/v1.4.0...v1.4.1) (2019-11-08)


### Bug Fixes

* default job settings [#58](https://github.com/taskforcesh/bullmq/issues/58) ([667fc6e](https://github.com/taskforcesh/bullmq/commit/667fc6e00ae4d6da639d285a104fb67e01c95bbd))

# [1.4.0](https://github.com/taskforcesh/bullmq/compare/v1.3.0...v1.4.0) (2019-11-06)


Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import { Worker } from 'bullmq'
const worker = new Worker(queueName, async job => {
// Will print { foo: 'bar'} for the first job
// and { qux: 'baz' } for the second.
console.log(job.data):
console.log(job.data);
});

```
Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/guide/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ In order to use the full potential of Bull queues, it is important to understand

When a job is added to a queue it can be in one of two states, it can either be in the “wait” status, which is, in fact, a waiting list, where all jobs must enter before they can be processed, or it can be in a “delayed” status: a delayed status implies that the job is waiting for some timeout or to be promoted for being processed, however, a delayed job will not be processed directly, instead it will be placed at the beginning of the waiting list and processed as soon as a worker is idle.

The next state for a job I the “active” state. The active state is represented by a set, and are jobs that are currently being processed, i.e. they are running in the `process` function explained in the previous chapter. A job can be in the active state for an unlimited amount of time until the process is completed or an exception is thrown so that the job will end in either the “completed” or the “failed” status.
The next state for a job Is the “active” state. The active state is represented by a set, and are jobs that are currently being processed, i.e. they are running in the `process` function explained in the previous chapter. A job can be in the active state for an unlimited amount of time until the process is completed or an exception is thrown so that the job will end in either the “completed” or the “failed” status.

4 changes: 2 additions & 2 deletions docs/gitbook/guide/connections.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Connections

In order to start working with a Queue, a connection to a Redis instance is necessary. BullMQ uses the node module [ioredis](https://github.com/luin/ioredis), and the options you pass to BullMQ are just passed to the constructor of ioredis. If you do not provide any options, it will default to port 6739 and localhost.
In order to start working with a Queue, a connection to a Redis instance is necessary. BullMQ uses the node module [ioredis](https://github.com/luin/ioredis), and the options you pass to BullMQ are just passed to the constructor of ioredis. If you do not provide any options, it will default to port 6739 and localhost.

Every class will consume at least one redis connection, but it is also possible to reuse connections in some situations. For example, the _Queue_ and _Worker_ classes can accept an existing ioredis instance, and by that reusing that connection, however _QueueScheduler_ and _QueueEvents_ cannot do that because they require blocking connections to Redis, which makes it impossible to reuse them.

Expand All @@ -14,7 +14,7 @@ const myQueue = new Queue('myqueue', { connection: {
host: myredis.taskforce.run,
port: 32856
}});

const myWorker = new Worker('myworker', { connection: {
host: myredis.taskforce.run,
port: 32856
Expand Down
2 changes: 1 addition & 1 deletion docs/gitbook/guide/jobs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

Queues can hold different types of jobs which determine how and when they are processed. In this section we will describe them in detail.

An important thing to consider is that you can mix the different job types in the same queue, so you can add FIFO jobs and at any moment for example add a a LIFO of a delayed job.
An important thing to consider is that you can mix the different job types in the same queue, so you can add FIFO jobs, and at any moment add a LIFO or a delayed job.

2 changes: 1 addition & 1 deletion docs/gitbook/guide/jobs/fifo.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ description: 'First-In, First-Out'

# FIFO

The first type of jobs we are going to describe is the FIFO \(First-In, First-Out\) type. This is the standard type when adding jobs to a queue. The jobs are processed in the order they are inserted into the queue. This order is preserved independently onn the amount of processors you have, however if you have more than one worker or concurrency larger than 1, even though the workers will start the jobs in order, they may be completed in a slightly different order, since some jobs may take more time to complete than others.
The first type of jobs we are going to describe is the FIFO \(First-In, First-Out\) type. This is the standard type when adding jobs to a queue. The jobs are processed in the order they are inserted into the queue. This order is preserved independently on the amount of processors you have, however if you have more than one worker or concurrency larger than 1, even though the workers will start the jobs in order, they may be completed in a slightly different order, since some jobs may take more time to complete than others.



16 changes: 8 additions & 8 deletions docs/gitbook/guide/jobs/stalled.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,24 @@ In order to avoid stalled jobs, make sure that your worker does not keep NodeJS

Another way to reduce the chance for stalled jobs is using so called "sandboxed" processors. In this case, the workers will spawn new separate NodeJS processes, running separately from the main process.

{% code-tabs %}
{% code-tabs-item title="main.ts" %}
{% tabs %}
{% tab title="main.ts" %}
```typescript
import { Worker } from 'bullmq';

const worker = new Worker('Paint', painter);

```
{% endcode-tabs-item %}
{% endcode-tabs %}
{% endtab %}
{% endtabs %}

{% code-tabs %}
{% code-tabs-item title="painter.ts" %}
{% tabs %}
{% tab title="painter.ts" %}
```typescript
export default = (job) => {
// Paint something
}
```
{% endcode-tabs-item %}
{% endcode-tabs %}
{% endtab %}
{% endtabs %}

2 changes: 2 additions & 0 deletions docs/gitbook/what-is-bullmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ The library is designed so that it will fulfil the following goals:
* Consistent.
* High performant. Try to get the highest possible throughput from Redis by combining efficient .lua scripts and pipelining.

View the repository, see open issues, and contribute back [on GitHub](https://github.com/taskforcesh/bullmq)!

### **Features**

If you are new to Message Queues, you may wonder why they are needed after all. Queues can solve many different problems in an elegant way, from smoothing out processing peaks to creating robust communication channels between micro-services or offloading heavy work from one server to many smaller workers, and many other cases. Check the [Patterns](patterns/producer-consumer.md) section for getting some inspiration and information about best practices.
Expand Down
5 changes: 1 addition & 4 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bullmq",
"version": "1.4.0",
"version": "1.6.2",
"description": "Queue for messages and jobs based on Redis",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand All @@ -24,7 +24,6 @@
},
"dependencies": {
"@types/ioredis": "^4.0.13",
"bluebird": "^3.5.3",
"cron-parser": "^2.7.3",
"get-port": "^5.0.0",
"ioredis": "^4.3.0",
Expand All @@ -44,7 +43,6 @@
"@semantic-release/github": "^5.5.4",
"@semantic-release/npm": "^5.2.0",
"@semantic-release/release-notes-generator": "^7.3.0",
"@types/bluebird": "^3.5.25",
"@types/chai": "^4.1.7",
"@types/lodash": "^4.14.119",
"@types/mocha": "^5.2.5",
Expand All @@ -55,7 +53,6 @@
"chai": "^4.2.0",
"copyfiles": "^2.1.1",
"coveralls": "^3.0.7",
"delay": "^4.3.0",
"husky": "^3.0.3",
"istanbul": "^0.4.5",
"mocha": "^6.1.4",
Expand Down
3 changes: 1 addition & 2 deletions src/classes/compat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ export class Queue3<T = any> extends EventEmitter {
* The name of the queue
*/
name: string;
queueEvents: QueueEvents;

private opts: CommonOptions;

private readonly queue: Queue;
private queueEvents: QueueEvents;
private worker: Worker;
private queueScheduler: QueueScheduler;

Expand Down
9 changes: 7 additions & 2 deletions src/classes/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,12 @@ export class Job<T = any, R = any> {
}
}

toJSON(): JobJson {
toJSON() {
const { queue, ...withoutQueue } = this;
return withoutQueue;
}

asJSON(): JobJson {
return {
id: this.id,
name: this.name,
Expand Down Expand Up @@ -452,7 +457,7 @@ export class Job<T = any, R = any> {
private addJob(client: IORedis.Redis): string {
const queue = this.queue;

const jobData = this.toJSON();
const jobData = this.asJSON();

return Scripts.addJob(client, queue, jobData, this.opts, this.id);
}
Expand Down
5 changes: 2 additions & 3 deletions src/classes/queue-events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { delay } from 'bluebird';
import { QueueEventsOptions } from '../interfaces';
import { array2obj } from '../utils';
import { array2obj, delay } from '../utils';
import { QueueBase } from './queue-base';

export class QueueEvents extends QueueBase {
Expand All @@ -26,7 +25,7 @@ export class QueueEvents extends QueueBase {
const opts: QueueEventsOptions = this.opts;

const key = this.keys.events;
let id = opts.lastEventId || '0-0';
let id = opts.lastEventId || '$';

while (!this.closing) {
try {
Expand Down
6 changes: 3 additions & 3 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ export class Queue<T = any> extends QueueGetters {
return (await this.repeat).addNextRepeatableJob(
jobName,
data,
{ ...opts, ...this.jobsOpts },
{ ...this.jobsOpts, ...opts },
true,
);
} else {
const job = await Job.create(this, jobName, data, {
...opts,
...this.jobsOpts,
...opts,
});
this.emit('waiting', job);
return job;
Expand All @@ -77,7 +77,7 @@ export class Queue<T = any> extends QueueGetters {
jobs.map(job => ({
name: job.name,
data: job.data,
opts: { ...job.opts, ...this.jobsOpts },
opts: { ...this.jobsOpts, ...job.opts },
})),
);
}
Expand Down
5 changes: 3 additions & 2 deletions src/classes/redis-connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import IORedis, { Redis } from 'ioredis';
import * as semver from 'semver';
import { load } from '../commands';
import { ConnectionOptions, RedisOptions } from '../interfaces';
import { isRedisInstance } from '../utils';

export class RedisConnection extends EventEmitter {
static minimumVersion = '5.0.0';
Expand All @@ -13,7 +14,7 @@ export class RedisConnection extends EventEmitter {
constructor(private opts?: ConnectionOptions) {
super();

if (!(opts instanceof IORedis)) {
if (!isRedisInstance(opts)) {
this.opts = {
port: 6379,
host: '127.0.0.1',
Expand All @@ -23,7 +24,7 @@ export class RedisConnection extends EventEmitter {
...opts,
};
} else {
this._client = opts;
this._client = <Redis>opts;
}

this.initializing = this.init();
Expand Down
2 changes: 1 addition & 1 deletion src/classes/sandbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const sandbox = (processFile: any, childPool: any) => {

child.send({
cmd: 'start',
job: job.toJSON(),
job: job.asJSON(),
});

const done = new Promise((resolve, reject) => {
Expand Down
18 changes: 5 additions & 13 deletions src/classes/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import * as Bluebird from 'bluebird';
import fs from 'fs';
import IORedis from 'ioredis';
import { Redis } from 'ioredis';
import path from 'path';
import { Processor, WorkerOptions } from '../interfaces';
import { QueueBase, Repeat } from './';
Expand All @@ -11,6 +10,7 @@ import sandbox from './sandbox';
import { Scripts } from './scripts';
import uuid from 'uuid';
import { TimerManager } from './timer-manager';
import { isRedisInstance } from '../utils';

// note: sandboxed processors would also like to define concurrency per process
// for better resource utilization.
Expand Down Expand Up @@ -52,8 +52,8 @@ export class Worker<T = any> extends QueueBase {
this.opts.lockRenewTime || this.opts.lockDuration / 2;

this.blockingConnection = new RedisConnection(
opts instanceof IORedis
? (<IORedis.Redis>opts.connection).duplicate()
isRedisInstance(opts.connection)
? (<Redis>opts.connection).duplicate()
: opts.connection,
);
this.blockingConnection.on('error', this.emit.bind(this));
Expand Down Expand Up @@ -282,16 +282,8 @@ export class Worker<T = any> extends QueueBase {
};

const handleFailed = async (err: Error) => {
let error = err;
if (
error instanceof Bluebird.OperationalError &&
(<any>error).cause instanceof Error
) {
error = (<any>error).cause; // Handle explicit rejection
}

await job.moveToFailed(err, token);
this.emit('failed', job, error, 'active');
this.emit('failed', job, err, 'active');
};

// TODO: how to cancel the processing? (null -> job.cancel() => throw CancelError()void)
Expand Down
7 changes: 5 additions & 2 deletions src/commands/moveToFinished-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
rcall("XADD", KEYS[6], "*", "event", ARGV[5], "jobId", ARGV[1], ARGV[3],
ARGV[4])

-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- Try to get next job to avoid an extra roundtrip if the queue is not closing,
-- and not rate limited.
if (ARGV[8] == "1") then
-- move from wait to active
-- move from wait to active
local jobId = rcall("RPOPLPUSH", KEYS[4], KEYS[1])
if jobId then
local jobKey = ARGV[9] .. jobId
Expand All @@ -104,6 +104,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
end

local maxEvents = rcall("HGET", KEYS[7], "opts.maxLenEvents")
if (maxEvents == false) then
maxEvents = 10000
end
rcall("XTRIM", KEYS[6], "MAXLEN", "~", maxEvents)

return 0
Expand Down
5 changes: 5 additions & 0 deletions src/test/fixtures/delay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = function delay(ms) {
return new Promise(function(resolve) {
return setTimeout(resolve, ms);
});
};
2 changes: 1 addition & 1 deletion src/test/fixtures/fixture_processor.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
'use strict';

const delay = require('delay');
const delay = require('./delay');

module.exports = function(/*job*/) {
return delay(500).then(() => {
Expand Down
2 changes: 1 addition & 1 deletion src/test/fixtures/fixture_processor_bar.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
'use strict';

const delay = require('delay');
const delay = require('./delay');

module.exports = function(/*job*/) {
return delay(500).then(() => {
Expand Down
2 changes: 1 addition & 1 deletion src/test/fixtures/fixture_processor_exit.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
'use strict';

const delay = require('delay');
const delay = require('./delay');

module.exports = function(/*job*/) {
return delay(500).then(() => {
Expand Down
Loading

0 comments on commit 01818b9

Please sign in to comment.