A class-based wrapper around job collection, a powerful and easy to use job manager for Meteor.
Adding this package to your Meteor application adds Job
and JobsWorker
classes
into the global scope.
Server side only (with JobsWorker
available on the client side as well).
meteor add peerlibrary:classy-job
The basic use is to extend Job
class and implement the run
method:
class ExampleJob extends Job {
run() {
this.logInfo("Hello world from: " + this.data.name);
}
}
ExampleJob.register();
run
is expected to be blocking and when it returns the job is seen as successfully completed. The return value
is stored in the job's result. If run
throws an exception, the job is marked as failed and exception is logged.
Then, when you want to put a new instance of the job into the queue, run:
new ExampleJob({name: "Foo"}).enqueue({skipIfExisting: true});
Class constructor receives data which is then available to the job when it is run (data is stored in the database and retrieved on a worker, so it should be EJSON-serializable).
enqueue
accepts the following options:
skipIfExisting
, if true, the job will not be enqueued if a not-completed job already existsskipIncludingCompleted
, if true, together withskipIfExisting
, the job will not be enqueued if a job already exists, even if completeddepends
, if set it is passed to job collection'sdepends
priority
, if set it is passed to job collection'spriority
retry
, if set it is passed to job collection'sretry
repeat
, if set it is passed to job collection'srepeat
delay
, if set it is passed to job collection'sdelay
after
, if set it is passed to job collection'safter
save
, if set it is passed to job collection'ssave
When using skipIfExisting
there is a slight race-condition possible. In the worst case there will be
some duplicate work done. This should not be a problem because jobs ought to be idempotent anyway.
Call JobsWorker.initialize()
in your app on both client and server to initialize the worker environment and
JobsWorker.collection
collection.
Possible options for JobsWorker.initialize
with defaults:
JobsWorker.initialize({
collectionName: 'JobQueue',
workerInstances: parseInt(process.env.WORKER_INSTANCES || '1'),
stalledJobCheckInterval: 60 * 1000, // ms
promoteInterval: 15 * 1000 // ms
});
You can use WORKER_INSTANCES
environment variable or workerInstances
option to control how many workers are enabled
across all Meteor instances for your app. If set to 0
the current Meteor instance will not run a worker.
Call JobsWorker.start
on the server to start the worker:
Meteor.startup(function () {
JobsWorker.start()
});
JobsWorker.start
call will not do anything if workerInstances
is 0
. Alternatively, you can simply do not call
JobsWorker.start
.
Starting is randomly delayed a bit to distribute the behavior of workers equally inside configured intervals.
Jobs are executed serially inside a given worker, one by one.
Job classes will be instantiated automatically every time they are ready and their run
method will be called.
Inside your run
method you can call other methods, for example log
or progress
to report
on job's progress.
You can use Job.find(query, fields)
and Job.findOne(query, fields)
to get instances of jobs from the database.
Resulting objects will be proper instances of your job classes of the correct type for each result object.
If you want only types of a particular class you can limit the query yourself:
Job.find({
type: ExampleJob.type()
});
If you need access to any other functionality of job collection not available directly through
existing methods, you can call getQueueJob
to get underlying job collection's job.
We call job collection's jobs queue jobs.
So, for example, to cancel a job, you can do:
Job.findOne({
'data.argument': 'foobar'
}).getQueueJob().cancel();
You can use JobsWorker.collection
to access underlying job collection.
To convert its jobs (queue jobs) to an instance of a class-based job you can use Job.fromQueueJob(job)
.