-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(infra): pause workers in bootstrap until queues are ready #3396
feat(infra): pause workers in bootstrap until queues are ready #3396
Conversation
providers: [ | ||
DalServiceHealthIndicator, | ||
QueueService, | ||
QueueServiceHealthIndicator, | ||
TriggerQueueService, | ||
TriggerQueueServiceHealthIndicator, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved to SharedModule
as I needed them there also. NestJS recommends to import it as provider in just one module.
@Injectable() | ||
export class TriggerProcessorQueueService extends TriggerQueueService { | ||
export class TriggerWorkerService extends TriggerQueueService implements INovuWorker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming as here we are instantiating a worker from BullMQ so for me the naming was confusing. Opinionated and debatable. Can roll back if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old name was like that because at some point we had two queues for the trigger: producer and processor.
I feel like the name should be similar to what we have for the other queues WorkflowQueueService, WsQueueService
, so like TriggerQueueService
, but I'm not sure why we have TriggerQueueService
and not are reusing the QueueService
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can give it a look regarding not using QueueService. But I still think using Queue when we are instantiating a worker is confusing. Maybe here I am thinking more in AWS terms of architecture (SQS, SNS). Also we can raise that as an open question in the team. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if separating the responsibility of the queue service to Publisher/Subscriber terms could help us in the future and will be more convenient terminology. ( most definitely not related to this pr, i would love to raise a POC PR if you think it will help)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You worded nicely @djabarovgeorge. Indeed it is the terminology that I was aiming with these changes as latest years have worked a lot with Pub/Sub tools and patterns and I personally enjoy that separation of concerns. I also see we have that separation in our implementation (it can be seen in the services created and point of discussion) so to me made sense heading that direction with the naming. But again, something worth to bring to the team.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have been pairing with @LetItRock and we agreed to tackle the proposed refactor in a different PR, as the separation of concerns is a good thing and we can have it less confusing as right now.
@Injectable() | ||
export class WorkflowQueueService extends QueueService<IJobData> { | ||
export class WorkflowWorkerService extends QueueService<IJobData> implements INovuWorker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming as here we are instantiating a worker from BullMQ so for me the naming was confusing. Opinionated and debatable. Can roll back if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renaming and refactor will be done in a different PR.
{ | ||
provide: QueueService, | ||
useClass: WorkflowQueueService, | ||
}, | ||
{ | ||
provide: TriggerQueueService, | ||
useClass: TriggerProcessorQueueService, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this didn't make sense. Might be very wrong.
CC: @davidsoderberg ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@p-fernandez these are injected to the BULLMQ_LIST
in line 120... also please check all the places where QueueService
and TriggerQueueService
were used before and change there accordingly... from what I remember is that AddJob
use-case is using private queueService: QueueService
in the constructor because the WorkflowQueueService
is not the part of the application-generic
package
@@ -71,6 +95,8 @@ export async function bootstrap(): Promise<INestApplication> { | |||
|
|||
await app.listen(process.env.PORT); | |||
|
|||
await startAppInfra(app); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doubting if we should enable the worker before or after letting the application to listen to requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this order makes sense here
try { | ||
const healths = await Promise.all([ | ||
this.queueServiceHealthIndicator.isHealthy(), | ||
this.triggerQueueServiceHealthIndicator.isHealthy(), | ||
this.wsQueueServiceHealthIndicator.isHealthy(), | ||
]); | ||
|
||
return healths.every((health) => !!health === true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In a µservices world I would do HTTP/gRPC calls to the health check endpoints of the different services we are dependant. In this case we share the health indicators in our app generic package. Underneath all them use the same BullMQ instance with its dependency to a dedicated in-memory database. So we check the 3 even though with one would be enough. This would let it prepared if we create a dedicate in-memory instance for every queue in the future.
0b5a557
to
bfb47ac
Compare
StorageHelperService, | ||
readinessService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Order matters. If a provider is dependant of other providers, needs to be added later in the array. Learnt it the hard way. 🙁
bfb47ac
to
2161f31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Would love to get @davidsoderberg opinion regarding the open question of the WorkflowService imports just in case, but otherwise looks good.
@@ -71,6 +95,8 @@ export async function bootstrap(): Promise<INestApplication> { | |||
|
|||
await app.listen(process.env.PORT); | |||
|
|||
await startAppInfra(app); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this order makes sense here
async pauseWorkers(workers: INovuWorker[]): Promise<void> { | ||
for (const worker of workers) { | ||
Logger.log(`Pausing worker ${worker.name}...`, LOG_CONTEXT); | ||
await worker.pauseWorker(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume that pausing might take time while finishing processing inflights calls, maybe we should do Promise.all here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you are thinking this is for the graceful shutdown. This is used here: https://github.com/novuhq/novu/pull/3396/files#r1193495088
So it is meant to be used before we even accept calls (before app is ready to listen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh okey, that makes sense. A small suggestion maybe we can utilize the autorun
https://api.docs.bullmq.io/interfaces/WorkerOptions.html#autorun worker option instead of "pausing" it in the appp bootstrap? So then we only need to enable it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had to revert this change back as I wasn't able to find the reason tests were not working fully. We can try it in the future.
@@ -40,6 +62,8 @@ export async function bootstrap(): Promise<INestApplication> { | |||
app.useLogger(app.get(PinoLogger)); | |||
app.flushLogs(); | |||
|
|||
await prepareAppInfra(app); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We pause the workers here.
1345e22
to
ba80c13
Compare
a0ad2fc
to
534ca5d
Compare
QueueService, | ||
TriggerQueueService, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks wrong to me (but I'm not sure) that we do provide: QueueService, TriggerQueueService
and then also WorkflowWorkerService, TriggerWorkerService
. This way we will have two instances of the same queues because the WorkflowWorkerService extends QueueService
and TriggerWorkerService extends TriggerQueueService
. The WorkflowWorkerService, TriggerWorkerService
instances were injected by their base class because some use cases leave in the application-generic
package but these classes are defined in the worker app.
I feel it's better to have what was there before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future readers, I have been pairing with @LetItRock and we agreed to tackle the proposed refactor in a different PR, as the separation of concerns is a good thing and we can have it less confusing as right now. Same for these dependencies and renaming. We will clarify it in that other PR.
534ca5d
to
61eac1c
Compare
8dd9e20
to
6f67938
Compare
What change does this PR introduce?
Pauses the workers till the queues are ready, checked by the health checks, in order to be able to start to process jobs.
Why was this change needed?
Is part of the improvements for the startup and the shutdown of the apps.
Other information (Screenshots)
Manually tested. Might require a better implementation.