Skip to content

Commit

Permalink
Merge branch 'singleton' into pool
Browse files Browse the repository at this point in the history
# Conflicts:
#	package.json
#	src/contractor.js
#	test/migrationTest.js
#	test/testHelper.js
  • Loading branch information
timgit committed Mar 9, 2017
2 parents a5ec330 + 5ce0e56 commit 4bf2e48
Show file tree
Hide file tree
Showing 12 changed files with 318 additions and 60 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ Queueing jobs in Node.js using PostgreSQL like a boss.
[![PostgreSql Version](https://img.shields.io/badge/PostgreSQL-9.5+-blue.svg?maxAge=2592000)](http://www.postgresql.org)

```js
var PgBoss = require('pg-boss');
var boss = new PgBoss('postgres://username:password@localhost/database');
const PgBoss = require('pg-boss');
const boss = new PgBoss('postgres://username:password@localhost/database');

boss.start()
.then(ready)
Expand Down Expand Up @@ -38,7 +38,8 @@ which significantly enhances its ability to act as a reliable, distributed messa
* Guaranteed delivery and finalizing of jobs using a promise API
* Delayed jobs
* Job retries
* Job throttling (rate limiting)
* Job throttling (singleton jobs and rate limiting)
* Configurable worker concurrency
* Distributed and/or clustered workers
* Automatic provisioning of required storage into a dedicated schema
* Automatic monitoring for expired jobs
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "0.5.1",
"version": "1.0.0-beta2",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./lib/index.js",
"dependencies": {
Expand Down
11 changes: 4 additions & 7 deletions src/contractor.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
const assert = require('assert');
const EventEmitter = require('events').EventEmitter; //node 0.10 compatibility

const plans = require('./plans');
const migrations = require('./migrations');
const schemaVersion = require('../version.json').schema;
const Promise = require("bluebird");

class Contractor extends EventEmitter {
class Contractor {

static constructionPlans(schema){
let exportPlans = plans.createAll(schema);
exportPlans.push(plans.insertVersion(schema).replace('$1', schemaVersion));
let exportPlans = plans.create(schema);
exportPlans.push(plans.insertVersion(schema).replace('$1', `'${schemaVersion}'`));

return exportPlans.join(';\n\n');
}
Expand All @@ -22,7 +20,6 @@ class Contractor extends EventEmitter {
}

constructor(db, config){
super();
this.config = config;
this.db = db;
}
Expand Down Expand Up @@ -50,7 +47,7 @@ class Contractor extends EventEmitter {
}

create(){
return Promise.each(plans.createAll(this.config.schema), command => this.db.executeSql(command))
return Promise.each(plans.create(this.config.schema), command => this.db.executeSql(command))
.then(() => this.db.executeSql(plans.insertVersion(this.config.schema), schemaVersion));
}

Expand Down
23 changes: 20 additions & 3 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class Manager extends EventEmitter {
return Promise.resolve({name, data, options});
}

function insertJob(name, data, options){
function insertJob(name, data, options, singletonOffset){
let startIn =
(options.startIn > 0) ? '' + options.startIn
: (typeof options.startIn == 'string') ? options.startIn
Expand All @@ -187,10 +187,27 @@ class Manager extends EventEmitter {
retryLimit = options.retryLimit || 0,
expireIn = options.expireIn || '15 minutes';

let values = [id, name, retryLimit, startIn, expireIn, data, singletonSeconds];
let singletonKey = options.singletonKey || null;

let values = [id, name, retryLimit, startIn, expireIn, data, singletonKey, singletonSeconds, singletonOffset || 0];

return self.db.executeSql(self.insertJobCommand, values)
.then(result => result.rowCount === 1 ? id : null);
.then(result => {
if(result.rowCount === 1)
return id;

if(!options.singletonNextSlot)
return null;

// delay starting by the offset to honor throttling config
options.startIn = singletonSeconds;
// toggle off next slot config for round 2
options.singletonNextSlot = false;

let singletonOffset = singletonSeconds;

return insertJob(name, data, options, singletonOffset);
});
}

}
Expand Down
32 changes: 32 additions & 0 deletions src/migrations.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,38 @@ function getMigrations(schema) {
`ALTER TABLE ${schema}.job DROP CONSTRAINT job_singleton`,
`ALTER TABLE ${schema}.job DROP COLUMN singletonOn`
]
},
{
version: '2',
previous: '0.1.0',
install: [
`CREATE TYPE ${schema}.job_state AS ENUM ('created','retry','active','complete','expired','cancelled')`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE ${schema}.job_state USING state::${schema}.job_state`,
`ALTER TABLE ${schema}.job DROP CONSTRAINT job_singleton`,
`ALTER TABLE ${schema}.job ADD singletonKey text`,
`CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'complete' AND singletonOn IS NULL`,
`CREATE UNIQUE INDEX job_singletonOn ON ${schema}.job (name, singletonOn) WHERE state < 'expired' AND singletonKey IS NULL`,
`CREATE UNIQUE INDEX job_singletonKeyOn ON ${schema}.job (name, singletonOn, singletonKey) WHERE state < 'expired'`,
// migrate data to use retry state
`UPDATE ${schema}.job SET state = 'retry' WHERE state = 'expired' AND retryCount < retryLimit`,
// expired jobs weren't being archived in prev schema
`UPDATE ${schema}.job SET completedOn = now() WHERE state = 'expired' and retryLimit = retryCount`,
// just using good ole fashioned completedOn
`ALTER TABLE ${schema}.job DROP COLUMN expiredOn`
],
uninstall: [
`ALTER TABLE ${schema}.job ADD expiredOn timestamp without time zone`,
`DROP INDEX ${schema}.job_singletonKey`,
`DROP INDEX ${schema}.job_singletonOn`,
`DROP INDEX ${schema}.job_singletonKeyOn`,
`ALTER TABLE ${schema}.job DROP COLUMN singletonKey`,
`ALTER TABLE ${schema}.job ALTER COLUMN state SET DATA TYPE text`,
`DROP TYPE ${schema}.job_state`,
// restoring prev unique constraint
`ALTER TABLE ${schema}.job ADD CONSTRAINT job_singleton UNIQUE(name, singletonOn)`,
// roll retry state back to expired
`UPDATE ${schema}.job SET state = 'expired' where state = 'retry'`
]
}
];
}
133 changes: 91 additions & 42 deletions src/plans.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,53 @@
module.exports = {
createAll: createAll,
createSchema: createSchema,
createJobTable: createJobTable,
createVersionTable: createVersionTable,
insertVersion: insertVersion,
getVersion: getVersion,
versionTableExists: versionTableExists,
fetchNextJob: fetchNextJob,
expireJob: expireJob,
completeJob: completeJob,
cancelJob: cancelJob,
insertJob: insertJob,
archive: archive
create,
insertVersion,
getVersion,
versionTableExists,
fetchNextJob,
expireJob,
completeJob,
cancelJob,
insertJob,
archive
};

function createAll(schema) {
function create(schema) {
return [
createSchema(schema),
createVersionTable(schema),
createJobStateEnum(schema),
createJobTable(schema),
createVersionTable(schema)
createIndexSingletonOn(schema),
createIndexSingletonKeyOn(schema),
createIndexSingletonKey(schema)
];
}

function createSchema(schema) {
return `CREATE SCHEMA IF NOT EXISTS ${schema}`;
return `
CREATE SCHEMA IF NOT EXISTS ${schema}
`;
}

function createVersionTable(schema) {
return `
CREATE TABLE IF NOT EXISTS ${schema}.version (
version text primary key
)`;
}

function createJobStateEnum(schema) {
// ENUM definition order is important
// base type is numeric and first values are less than last values
return `
CREATE TYPE ${schema}.job_state AS ENUM (
'created',
'retry',
'active',
'complete',
'expired',
'cancelled'
)`;
}

function createJobTable(schema) {
Expand All @@ -32,33 +56,50 @@ function createJobTable(schema) {
id uuid primary key not null,
name text not null,
data jsonb,
state text not null,
state ${schema}.job_state not null,
retryLimit integer not null default(0),
retryCount integer not null default(0),
startIn interval,
startedOn timestamp without time zone,
singletonKey text,
singletonOn timestamp without time zone,
expireIn interval,
expiredOn timestamp without time zone,
createdOn timestamp without time zone not null default now(),
completedOn timestamp without time zone,
CONSTRAINT job_singleton UNIQUE(name, singletonOn)
completedOn timestamp without time zone
)`;
}

function getVersion(schema) {
return `select version from ${schema}.version`;
function createIndexSingletonKey(schema){
// anything with singletonKey means "only 1 job can be queued or active at a time"
return `
CREATE UNIQUE INDEX job_singletonKey ON ${schema}.job (name, singletonKey) WHERE state < 'complete' AND singletonOn IS NULL
`;
}

function createVersionTable(schema) {
function createIndexSingletonOn(schema){
// anything with singletonOn means "only 1 job within this time period, queued, active or completed"
return `
CREATE TABLE IF NOT EXISTS ${schema}.version (
version text primary key
)`;
CREATE UNIQUE INDEX job_singletonOn ON ${schema}.job (name, singletonOn) WHERE state < 'expired' AND singletonKey IS NULL
`;
}

function createIndexSingletonKeyOn(schema){
// anything with both singletonOn and singletonKey means "only 1 job within this time period with this key, queued, active or completed"
return `
CREATE UNIQUE INDEX job_singletonKeyOn ON ${schema}.job (name, singletonOn, singletonKey) WHERE state < 'expired'
`;
}

function getVersion(schema) {
return `
SELECT version from ${schema}.version
`;
}

function versionTableExists(schema) {
return `select to_regclass('${schema}.version') as name`;
return `
SELECT to_regclass('${schema}.version') as name
`;
}

function insertVersion(schema) {
Expand All @@ -70,7 +111,7 @@ function fetchNextJob(schema) {
WITH nextJob as (
SELECT id
FROM ${schema}.job
WHERE (state = 'created' OR (state = 'expired' AND retryCount < retryLimit))
WHERE state < 'active'
AND name = $1
AND (createdOn + startIn) < now()
ORDER BY createdOn, id
Expand All @@ -80,19 +121,21 @@ function fetchNextJob(schema) {
UPDATE ${schema}.job SET
state = 'active',
startedOn = now(),
retryCount = CASE WHEN state = 'expired' THEN retryCount + 1 ELSE retryCount END
retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
FROM nextJob
WHERE ${schema}.job.id = nextJob.id
RETURNING ${schema}.job.id, ${schema}.job.data`;
RETURNING ${schema}.job.id, ${schema}.job.data
`;
}

function expireJob(schema) {
return `
UPDATE ${schema}.job
SET state = 'expired',
expiredOn = now()
SET state = CASE WHEN retryCount < retryLimit THEN 'retry'::${schema}.job_state ELSE 'expired'::${schema}.job_state END,
completedOn = CASE WHEN retryCount < retryLimit THEN NULL ELSE now() END
WHERE state = 'active'
AND (startedOn + expireIn) < now()`;
AND (startedOn + expireIn) < now()
`;
}

function completeJob(schema){
Expand All @@ -101,7 +144,8 @@ function completeJob(schema){
SET completedOn = now(),
state = 'complete'
WHERE id = $1
AND state = 'active'`;
AND state = 'active'
`;
}

function cancelJob(schema){
Expand All @@ -110,18 +154,23 @@ function cancelJob(schema){
SET completedOn = now(),
state = 'cancelled'
WHERE id = $1
AND state IN ('created','active')`;
AND state < 'complete'
`;
}

function insertJob(schema) {
return `INSERT INTO ${schema}.job (id, name, state, retryLimit, startIn, expireIn, data, singletonOn)
VALUES (
$1, $2, 'created', $3, CAST($4 as interval), CAST($5 as interval), $6,
CASE WHEN $7::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($7 * floor((date_part('epoch', now())) / $7)) ELSE NULL END
)
ON CONFLICT ON CONSTRAINT job_singleton DO NOTHING`;
return `
INSERT INTO ${schema}.job (id, name, state, retryLimit, startIn, expireIn, data, singletonKey, singletonOn)
VALUES (
$1, $2, 'created', $3, CAST($4 as interval), CAST($5 as interval), $6, $7,
CASE WHEN $8::integer IS NOT NULL THEN 'epoch'::timestamp + '1 second'::interval * ($8 * floor((date_part('epoch', now()) + $9) / $8)) ELSE NULL END
)
ON CONFLICT DO NOTHING
`;
}

function archive(schema) {
return `DELETE FROM ${schema}.job WHERE completedOn + CAST($1 as interval) < now()`;
return `
DELETE FROM ${schema}.job WHERE completedOn + CAST($1 as interval) < now()
`;
}
Loading

0 comments on commit 4bf2e48

Please sign in to comment.