Using Redis for pub/sub background tasks
> npm install nxus-worker-queue --save
"worker_queue": {
"redis_url": "redis://localhost:6379",
"cleanInterval": 3600000
}
It's conventional to use a configuration variable to set the Redis URL in the production environment. For example:
let config = {}
if (process.env.REDIS_URL)
config.worker_queue = { redis_url: process.env.REDIS_URL }
application.start(config)
For each task, you need to define a unique task name.
import {workerQueue} from 'nxus-worker-queue'
workerQueue.worker('myBackgroundTask', ({data}) => {
this.log.debug("Hello", data.hi)
})
import {workerQueue} from 'nxus-worker-queue'
let job = workerQueue.task('myBackgroundTask', {hi: world})
The worker queue module interacts with Redis through the intermediary Bull package. This "fastest, most reliable, Redis-based queue for Node" is "carefully written for rock solid stability and atomicity". For documentation, a good place to start is the Reference page.
The task() method returns a Bull Job object that allows you to
interact with the background task.
In particular, the Job object exposes a finished() method that,
when invoked, returns a promise that resolves when the job finishes.
The value of the promise corresponds to the value of the promise
returned by the task handler.
let job = workerQueue.task('myBackgroundTask', {hi: world})
job.finished().then((rslt) = { console.log('background task finished: ', rslt) })
Extends NxusModule
Worker Queue module for background tasks
Provide a task handler
Parameters
taskNamestring Name of the task (channel) to listen forhandlerfunction Handler for processing task requests; should return a promise that resolves on completionopts(optional, default{})
Examples
workerQueue.worker('backgroundJob', (msg) -> {})Request handling of a background task
Parameters
taskNamestring Name of the task (channel) to publish tomessageobject Options for the task worker; must be JSON serializableopts(optional, default{})
Examples
workerQueue.task('backgroundJob', {hi: 'world'})Returns object Bull job object
Cleans the current queue for the given taskName.
Parameters
taskNamestring The queue/task name to clean.typeString The type of message to clean. Defaults to 'completed'. (optional, default'completed')delayNumber The grace period. Messages older than this will be cleaned. Defaults to 1 hour. (optional, default3600000)
Cleans all queues for the specified message type.
Parameters
typeString The type of message to clean. Defaults to 'completed'. (optional, default'completed')delayNumber The grace period. Messages older than this will be cleaned. Defaults to 1 hour. (optional, default3600000)
Emptys the current queue for the given taskName.
Parameters
taskNamestring The name of the queue to empty. If not provided, all queues are emptied.
Emptys the all queues.
Parameters
taskNamestring The name of the queue to empty. If not provided, all queues are emptied.