Skip to content

Commit

Permalink
Merge 491ddcc into 9e04499
Browse files Browse the repository at this point in the history
  • Loading branch information
haoliangyu authored Jan 20, 2018
2 parents 9e04499 + 491ddcc commit a54789a
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 33 deletions.
40 changes: 26 additions & 14 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ If the CREATE privilege is not granted (so sad), you can still use the static fu

All job monitoring will be stopped and all subscriptions on this instance will be removed. Basically, it's the opposite of `start()`. Even though `start()` may create new database objects during initialization, `stop()` will never remove anything from the database.

**If you need to uninstall pg-boss from a database, just run the following command.**
**If you need to uninstall pg-boss from a database, just run the following command.**

```sql
DROP SCHEMA $1 CASCADE
Expand All @@ -148,7 +148,7 @@ The opposite of `connect()`. Disconnects from a job database. All subscriptions

**returns: Promise**

Creates a new job and resolves the job's unique identifier (uuid).
Creates a new job and resolves the job's unique identifier (uuid).

> `publish()` will resolve a `null` for job id under some use cases when using [unique jobs](configuration.md#unique-jobs) or [throttling](configuration.md#throttled-jobs). These options are always opt-in on the publish side and therefore don't result in a promise rejection.
Expand Down Expand Up @@ -186,7 +186,7 @@ The request object has the following properties.
| Prop | Type | |
| - | - | -|
|`name`| string | *required*
|`data`| object |
|`data`| object |
|`options` | object | [publish options](configuration.md#publish-options)


Expand All @@ -212,19 +212,19 @@ The default concurrency for `subscribe()` is 1 job per second. Both the interva

**Arguments**
- `name`: string, *required*
- `options`: object
- `options`: object
- `handler`: function(job, callback)

When you provide the `handler` callback, it should have at least 1 argument for the job. The second argument is a convenience 'done' callback that will finish the job before it expires. The job object also has this callback attached as `done()` for convenience.
When you provide the `handler` callback, it should have at least 1 argument for the job. The second argument is a convenience 'done' callback that will finish the job before it expires. The job object also has this callback attached as `done()` for convenience.

The job object has the following properties.

| Prop | Type | |
| - | - | -|
|`id`| string, uuid |
|`name`| string |
|`id`| string, uuid |
|`name`| string |
|`data`| object | data sent from `publish()`
|`done(err, data)` | function | callback function used to mark the job as completed or failed in the database.
|`done(err, data)` | function | callback function used to mark the job as completed or failed in the database.

`done()` accepts optional arguments, the first being an error in typical node fashion. The second argument is an optional `data` argument for usage with [`onComplete()`](#oncompletename--options-handler) state-based subscriptions. If an error is passed, it will mark the job as failed and the data argument will be ignored.

Expand All @@ -242,13 +242,25 @@ boss.subscribe('email-welcome', {teamSize: 5}, job => {
.catch(error => console.error(error));
```

If a batchSize option is given, the handler will be invoked between polling intervals and receive an array of jobs.

```js
boss.subscribe('email-welcome', {batchSize: 5}, jobs => {
// do something with all jobs
myEmailService.sendEmailToAll(jobs)
jobs.forEach(job => job.done());
})
.then(() => console.log('subscription created'))
.catch(error => console.error(error));
```

## State-based subscriptions

Sometimes when a job changes state, it's important enough to trigger other things that should react to it. The following functions work identically to `subscribe()` and allow you to create orchestrations or sagas between jobs that may or may not know about each other. This common messaging pattern allows you to keep multi-job flow logic out of the individual job handlers so you can manage things in a more centralized fashion while not losing your mind. As you most likely already know, asynchronous jobs are complicated enough already.

Internally, all state transitions are also jobs themselves (they have a special suffix of `__state__<state name>`). Since pg-boss creates these, they are considered second class jobs and therefore not subject to the same expiration policies. In addition, the archiver ensures they don't hang around.

> There are some state changes that also trigger [events](#events), and this may be a bit confusing as there is a bit of overlapping concerns. Events can be convenient since you can have as many listeners as desired per event. However, emitting an event doesn't require any listeners, so if no callbacks are registered for them, you will never receive them as they are not persisted.
> There are some state changes that also trigger [events](#events), and this may be a bit confusing as there is a bit of overlapping concerns. Events can be convenient since you can have as many listeners as desired per event. However, emitting an event doesn't require any listeners, so if no callbacks are registered for them, you will never receive them as they are not persisted.
### `onComplete(name [, options], handler)`

Expand Down Expand Up @@ -337,7 +349,7 @@ Same as `unsubscribe()`, but removes an `onFail()` subscription.

## `fetch()`

Typically one would use `subscribe()` for automated polling for new jobs based upon a reasonable interval to finish the most jobs with the lowest latency. While `subscribe()` is a yet another free service we offer and it can be awfully convenient, sometimes you may have a special use case around when a job can be retrieved. Or, perhaps like me, you need to provide jobs via other entry points such as a web API.
Typically one would use `subscribe()` for automated polling for new jobs based upon a reasonable interval to finish the most jobs with the lowest latency. While `subscribe()` is a yet another free service we offer and it can be awfully convenient, sometimes you may have a special use case around when a job can be retrieved. Or, perhaps like me, you need to provide jobs via other entry points such as a web API.

`fetch()` allows you to skip all that polling nonsense that `subscribe()` does and puts you back in control of database traffic. Once you have your shiny job, you'll use either `complete()` or `fail()` to mark it as finished.

Expand Down Expand Up @@ -374,7 +386,7 @@ boss.fetch(jobName, batchSize)

// our magical emailer knows what to do with job.data
let promises = jobs.map(job => emailer.send(job.data).then(() => job.done()));

return Promise.all(promises);
})
.catch(error => console.log(error));
Expand Down Expand Up @@ -417,7 +429,7 @@ The promise will resolve on a successful completion, or reject if the job could

Completes a set of active jobs.

The promise will resolve on a successful completion, or reject if not all of the requested jobs could not be marked as completed.
The promise will resolve on a successful completion, or reject if not all of the requested jobs could not be marked as completed.

> See comments above on `cancel([ids])` regarding when the promise will resolve or reject because of a batch operation.
Expand All @@ -437,7 +449,7 @@ The promise will resolve on a successful failure state assignment, or reject if
# Events

As explained in the introduction above, each instance of pg-boss is an EventEmitter. You can run multiple instances of pg-boss for a variety of use cases including distribution and load balancing. Each instance has the freedom to subscribe to whichever jobs you need. Because of this diversity, the job activity of one instance could be drastically different from another. Therefore, **all of the events raised by pg-boss are instance-bound.**
As explained in the introduction above, each instance of pg-boss is an EventEmitter. You can run multiple instances of pg-boss for a variety of use cases including distribution and load balancing. Each instance has the freedom to subscribe to whichever jobs you need. Because of this diversity, the job activity of one instance could be drastically different from another. Therefore, **all of the events raised by pg-boss are instance-bound.**

> For example, if you were to subscribe to `error` in instance A, it will not receive an `error` event from instance B. The same concept applies to all events. If a job is subscribed in instance A, the `job` event will be raised alongside the `subscribe()` callback only for instance A. There is currently no such thing as a global `job` event across instances.
Expand All @@ -457,7 +469,7 @@ boss.on('error', error => logger.error(error));
> **Note: Since error events are only raised during internal housekeeping activities, they are not raised for direct API calls, where promise `catch()` handlers should be used.**
## `job`
When a job is processed, the subscriber's callback is called *and* a `job` event is raised. Adding a listener to the job event is completely optional, but you may wish to use it for logging or tracking purposes per instance.
When a job is processed, the subscriber's callback is called *and* a `job` event is raised. Adding a listener to the job event is completely optional, but you may wish to use it for logging or tracking purposes per instance.

The payload is the same job object that the subscriber's handler function receives with id, name and data properties.

Expand Down
2 changes: 1 addition & 1 deletion src/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Db extends EventEmitter {
ssl: !!params.query.ssl
};
}

}

close(){
Expand Down
36 changes: 25 additions & 11 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const failedJobSuffix = plans.failedJobSuffix;

const events = {
job: 'job',
jobs: 'jobs',
failed: 'failed',
error: 'error'
};
Expand Down Expand Up @@ -88,7 +89,7 @@ class Manager extends EventEmitter {
watch(name, options, callback){
assert(!(name in this.subscriptions), 'this job has already been subscribed on this instance.');

options.batchSize = options.batchSize || options.teamSize;
let batchSize = options.batchSize || options.teamSize;

if('newJobCheckInterval' in options || 'newJobCheckIntervalSeconds' in options)
options = Attorney.applyNewJobCheckInterval(options);
Expand All @@ -99,7 +100,7 @@ class Manager extends EventEmitter {

let onError = error => this.emit(events.error, error);

let complete = (job, error, response) => {
let jobComplete = (job, error, response) => {
if(!error)
return this.complete(job.id, response);

Expand All @@ -114,22 +115,35 @@ class Manager extends EventEmitter {
jobs = [jobs];

setImmediate(() => {
jobs.forEach(job => {
this.emit(events.job, job);
job.done = (error, response) => complete(job, error, response);
if (options.batchSize) {
jobs.forEach(job => {
job.done = (error, response) => jobComplete(job, error, response);
})

this.emit(events.jobs, jobs);

try {
callback(job, job.done);
}
catch (error) {
this.emit(events.failed, {job, error})
callback(jobs);
} catch (error) {
this.emit(events.failed, {jobs, error})
}
});
} else {
jobs.forEach(job => {
this.emit(events.job, job);
job.done = (error, response) => jobComplete(job, error, response);

try {
callback(job, job.done);
} catch (error) {
this.emit(events.failed, {job, error})
}
});
}
});

};

let fetch = () => this.fetch(name, options.batchSize);
let fetch = () => this.fetch(name, batchSize);

let workerConfig = {
name,
Expand Down
28 changes: 21 additions & 7 deletions test/subscribeTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ describe('subscribe', function(){
boss.unsubscribe().catch(() => finished());
});

it('should handle a batch of jobs', function(finished){
it('should handle a batch of jobs individually', function(finished){
const jobName = 'subscribe-batch';
const batchSize = 4;
const teamSize = 4;
let subscribeCount = 0;

Promise.join(
Expand All @@ -98,16 +98,33 @@ describe('subscribe', function(){
boss.publish(jobName),
boss.publish(jobName)
)
.then(() => boss.subscribe(jobName, {batchSize}, job => {
.then(() => boss.subscribe(jobName, {teamSize}, job => {
subscribeCount++;

// idea here is that the test would time out if it had to wait for 4 intervals
if(subscribeCount === batchSize)
if(subscribeCount === teamSize)
finished();
})
);
});

it('should handle a batch of jobs collectively', function(finished){
const jobName = 'subscribe-batch-2';
const batchSize = 4;

Promise.join(
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName),
boss.publish(jobName)
)
.then(() => boss.subscribe(jobName, {batchSize}, jobs => {
assert(jobs.length, batchSize)
finished();
})
);
})

it('should have a done callback for single job subscriptions', function(finished){
const name = 'subscribe-single';

Expand All @@ -117,6 +134,3 @@ describe('subscribe', function(){
});

});



0 comments on commit a54789a

Please sign in to comment.