Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
LSIF: Replace node-resque with bull (#6062)
- Loading branch information
Showing
9 changed files
with
175 additions
and
440 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,43 @@ | ||
import { Queue } from 'node-resque' | ||
import Bull, { Queue, Job } from 'bull' | ||
import { Span, Tracer, FORMAT_TEXT_MAP } from 'opentracing' | ||
import { Logger } from 'winston' | ||
|
||
/** | ||
* Enqueue a job to be run by the worker. | ||
* Creates a queue instance. | ||
* | ||
* @param name The name of the queue. | ||
* @param endpoint The host:port redis address. | ||
* @param logger The logger instance. | ||
*/ | ||
export function createQueue(name: string, endpoint: string, logger: Logger): Queue { | ||
const [host, port] = endpoint.split(':', 2) | ||
|
||
const redis = { | ||
host, | ||
port: parseInt(port, 10), | ||
namespace: `lsif_${name}`, | ||
} | ||
|
||
const queue = new Bull(name, { redis }) | ||
queue.on('error', (error: Error) => logger.error('queue error', { error })) | ||
queue.on('global:stalled', (id: string) => logger.error('job stalled', { jobId: id })) | ||
|
||
return queue | ||
} | ||
|
||
/** | ||
* Enqueue a job to be run by a worker. | ||
* | ||
* @param queue The job queue. | ||
* @param job The job name. | ||
* @param args The job arguments. | ||
* @param tracer The tracer instance. | ||
* @param span The parent span. | ||
*/ | ||
export const enqueue = ( | ||
queue: Queue, | ||
job: string, | ||
args: { [K: string]: any }, | ||
tracer?: Tracer, | ||
span?: Span | ||
): Promise<void> => { | ||
export const enqueue = (queue: Queue, args: object, tracer?: Tracer, span?: Span): Promise<Job> => { | ||
const tracing = {} | ||
if (tracer && span) { | ||
const tracing = {} | ||
tracer.inject(span, FORMAT_TEXT_MAP, tracing) | ||
args.tracing = tracing | ||
} | ||
|
||
return queue.enqueue('lsif', job, [args]) | ||
return queue.add({ args, tracing }) | ||
} |
Oops, something went wrong.