-
Notifications
You must be signed in to change notification settings - Fork 3.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(infra): graceful shutdown for bull mq #3326
feat(infra): graceful shutdown for bull mq #3326
Conversation
NV-2137 Graceful shutdown and bootstrap for workers
Why? (Context)ECS issues a SIGTERM command prior to removing a particular task from the service. We need to make sure we gracefully close the worker from receiving events, and finish currently processing events prior to exiting. https://docs.nestjs.com/fundamentals/lifecycle-events https://docs.bullmq.io/guide/workers/graceful-shutdown https://aws.amazon.com/blogs/containers/graceful-shutdowns-with-ecs/ What?
Definition of Done |
2830897
to
13f78aa
Compare
@@ -4,7 +4,7 @@ import { getRedisPrefix } from '@novu/shared'; | |||
import { InboundEmailParse } from '../usecases/inbound-email-parse/inbound-email-parse.usecase'; | |||
import { InboundEmailParseCommand } from '../usecases/inbound-email-parse/inbound-email-parse.command'; | |||
import { ConnectionOptions } from 'tls'; | |||
import { BullmqService } from '@novu/application-generic'; | |||
import { BullMqService } from '@novu/application-generic'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming based on pattern.
@@ -1,3 +1,2 @@ | |||
export const QUEUE_SERVICE = 'QueueService'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused. Leftover from refactor to application-generic.
@@ -118,7 +120,6 @@ const PROVIDERS = [ | |||
return dalService; | |||
}, | |||
}, | |||
cacheService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplication.
public async gracefulShutdown() { | ||
// Right now we only want this for testing purposes | ||
if (process.env.NODE_ENV === 'test') { | ||
await this.bullMqService.queue.drain(); | ||
await this.bullMqService.worker.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Used just for testing. Now this will be implemented in the BullMqService that this service is dependant.
@@ -1,2 +1 @@ | |||
export const QUEUE_SERVICE = 'QueueService'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused. Leftover from refactor to application-generic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to application generic for reusability.
packages/application-generic/src/health/queue.health-indicator.ts
Outdated
Show resolved
Hide resolved
const runningStatus = | ||
await this.triggerQueueService.bullMqService.getRunningStatus(); | ||
|
||
if (!runningStatus.queueIsPaused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only create a queue for the TriggerQueueService.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that our only indication that the service is up is if it is not paused, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't able to find in the Bull MQ API any other way to know if the queue was running besides this property and to check if the queue was instantiated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also check here is the worker is running by using the flag that you do return
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately we can't do it as only a queue is created in TriggerQueueService
therefore the worker comes as undefined. 🙁
packages/application-generic/src/health/ws-queue.health-indicator.ts
Outdated
Show resolved
Hide resolved
constructor() { | ||
super('ws_socket_queue'); | ||
} | ||
|
||
public readonly bullMqService: BullmqService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is set by QueueService
so this shouldn't be needed here.
09fe7b5
to
cdbd2f8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉
2de4455
to
169fe91
Compare
52844ed
to
b4bacf2
Compare
b4bacf2
to
50b1316
Compare
} | ||
return undefined; | ||
}) | ||
.catch((error) => Logger.error('Metric Job Exists function errored', LOG_CONTEXT, error)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching to avoid unexpected unhandled exceptions.
|
||
return resolve(); | ||
} catch (error) { | ||
return reject(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catching to avoid unexpected unhandled exceptions. This could be logged only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log it here, or do we log it somewhere on the outer scope?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I expect this rejection to be caught by the failed
event (line 35) and to be logged through jobHasFailed
function. But not 100% sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@p-fernandez @djabarovgeorge yes, this is what will happen after reject
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a really big chunk of amazing work :) Left a couple of comments, apologies for the amount of questions I am a bit out of context so I was not sure about couple of things.
The most concerning comment was related to the initialization of InMemoryProviderService in InMemoryProviderServiceHealthIndicator.
export class InMemoryProviderServiceHealthIndicator extends HealthIndicator { | ||
private INDICATOR_KEY = 'inMemory'; | ||
|
||
constructor(private inMemoryProviderService: InMemoryProviderService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit lacking in the current state of the project, how many instances we have of InMemoryProviderService at the moment, and how they are injected.
My main concern here is what health check we will check here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have 2 instances: one for the CacheService and the other for the DistributionLockService.
Internally, the service configures the in-memory connection for each of the services. This probably will need to change when implementing MemoryDB connection for the Worker.
|
||
throw new HealthCheckError( | ||
'In-memory Health', | ||
this.getStatus(this.INDICATOR_KEY, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if should we add here a third object 'data' with the reason. in this case, we know that the client could not be ready.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be a good future improvement. 👍🏻
() => this.dalHealthIndicator.isHealthy(), | ||
() => this.inMemoryHealthIndicator.isHealthy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isHealthy is an async function, should we make this function async as well and await the result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NestJS Terminus package takes care of that behind the scenes: https://github.com/nestjs/terminus/blob/410e07bf5e96d38285bc244225563d633ee1b2b5/lib/health-check/health-check-executor.service.ts#L64
|
||
throw new HealthCheckError( | ||
'Trigger Queue Health', | ||
this.getStatus(this.INDICATOR_KEY, false) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here regarding the extra data.
const runningStatus = | ||
await this.triggerQueueService.bullMqService.getRunningStatus(); | ||
|
||
if (!runningStatus.queueIsPaused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that our only indication that the service is up is if it is not paused, right?
|
||
return resolve(); | ||
} catch (error) { | ||
return reject(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we log it here, or do we log it somewhere on the outer scope?
|
||
@Get() | ||
@HealthCheck() | ||
async healthCheck() { | ||
async healthCheck(): Promise<HealthCheckResult> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add here WsQueueServiceHealthIndicator check?
workerIsRunning: boolean; | ||
}> { | ||
const queueIsPaused = | ||
(this._queue && (await this._queue.isPaused())) || undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a bit hard to read in the browser but isn't it mean is queue falsy and isPaused falsy then undefined is returned? i wonder if it is by design or if should we return here 'false' on the case above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is that BullMQ service is used both for creating queues and workers. We have some services that only create a queue and other that only create a worker. I wanted to do a generic functionality that showed the running status of the service that instantiates the BullMQ service. Therefore the undefined for the values to return to handle those cases. Open to a better suggestion though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Obviously not for this cycle but maybe we can think of splitting the logic into two classes that extend this service one producer and one consumer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we do always call createQueue
when initializing any queue, and the workers are tied to the queue, and won't do anything without it... so my suggestion is to move the queue creation to the constructor of the BullMqService
, but of course we can do this in the separate ticket in the cooldown...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ignore the reasons why it was done this way and not like you suggest. Maybe @davidsoderberg could give more context.
Though I can infer from the PR he did that the intention was to provide flexibility to create multiple queues independently as we will only have one worker that will consume them all. If we move all the queue creation to the BullMQ service it would make harder to be able to configure every queue independently as we have implemented right now.
Also to be able to pass a groupId
that at the end is to take advantage of BullMQ grouping feature when consuming jobs, that was one of the performance improvement actions we did in previous release.
public async gracefulShutdown() { | ||
// Right now we only want this for testing purposes | ||
if (process.env.NODE_ENV === 'test') { | ||
await this.bullMqService.queue.drain(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the new change we will be missing the 'queue.drain()', is that ok on the testing env?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was the one implementing that as a hack to make one Workflow Queue test pass. Seems that hack is not needed anymore.
Co-authored-by: George Djabarov <39195835+djabarovgeorge@users.noreply.github.com>
168bc8b
to
4484ece
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💫
@@ -106,6 +107,7 @@ const distributedLockService = { | |||
const PROVIDERS = [ | |||
cacheService, | |||
distributedLockService, | |||
inMemoryProviderService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason why we are creating 2 instances of the InMemoryProviderService
?
we can inject it like this:
{
provide: CacheService,
useFactory: (inMemoryProviderService: InMemoryProviderService) => {
return new CacheService(inMemoryProviderService);
},
inject: [InMemoryProviderService],
},
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are configuring the provider with auto pipelining for the CacheService and without for the DistributedLockService. That is one of the reasons. 🙁
@@ -141,6 +142,7 @@ const PROVIDERS = [ | |||
return analyticsService; | |||
}, | |||
}, | |||
TriggerQueueService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this done by mistake? we are using the TriggerHandlerQueueService
in the events.module
... we can change it to inject the TriggerQueueService
instead if you wish ;)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recall it was NestJS complaining as not set as provider due the health indicators. Let me try to remove TriggerHandlerQueueService
as dependency and see it doesn't complain. But I have a feeling that will require it too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have just tried and we need this because the HealthModule where the health indicators are used depend on this SharedModule and TriggerQueueService is a dependency for TriggerQueueServiceHealthIndicator.
How would you suggest to do it instead? I can't think right now of other choice we have and also not sure what I understand from your suggestion would work.
|
||
return resolve(); | ||
} catch (error) { | ||
return reject(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@p-fernandez @djabarovgeorge yes, this is what will happen after reject
@@ -64,6 +64,9 @@ export async function bootstrap(): Promise<INestApplication> { | |||
app.use(bodyParser.json()); | |||
app.use(bodyParser.urlencoded({ extended: true })); | |||
|
|||
// Starts listening for shutdown hooks | |||
app.enableShutdownHooks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you told me some time ago to remove this 😛 hahah
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember it 🙈 . This is needed for the graceful shutdown hooks to operate after the SIGTERM so I would need the context on why I could have said that. Maybe I mentioned what NestJS says about not optimal performance when enabled it?
workerIsRunning: boolean; | ||
}> { | ||
const queueIsPaused = | ||
(this._queue && (await this._queue.isPaused())) || undefined; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we do always call createQueue
when initializing any queue, and the workers are tied to the queue, and won't do anything without it... so my suggestion is to move the queue creation to the constructor of the BullMqService
, but of course we can do this in the separate ticket in the cooldown...
const runningStatus = | ||
await this.triggerQueueService.bullMqService.getRunningStatus(); | ||
|
||
if (!runningStatus.queueIsPaused) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also check here is the worker is running by using the flag that you do return
What change does this PR introduce?
Introduces graceful shutdown for the Bull MQ and all the services dependant on it.
Why was this change needed?
Reliability on system scaling up/down, avoiding to lose jobs while closing. These changes should help us to avoid information when shutting down an instance.
Other information (Screenshots)