Skip to content

Commit

Permalink
feat(agenda): add drainJobsBeforeClose option to agenda settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Romakita committed Jan 3, 2024
1 parent 8c8472f commit 3c10b05
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 7 deletions.
2 changes: 2 additions & 0 deletions docs/tutorials/agenda.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const mongoConnectionString = "mongodb://127.0.0.1/agenda";
@Configuration({
agenda: {
enabled: true, // Enable Agenda jobs for this instance.
// drainJobsBeforeStop: true, // Wait for jobs to finish before stopping the agenda process.
// disableJobProcessing: true, // Prevents jobs from being processed.
// pass any options that you would normally pass to new Agenda(), e.g.
db: {
address: mongoConnectionString
Expand Down
2 changes: 1 addition & 1 deletion packages/third-parties/agenda/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module.exports = {
coverageThreshold: {
global: {
statements: 100,
branches: 91.48,
branches: 91.66,
functions: 100,
lines: 100
}
Expand Down
2 changes: 2 additions & 0 deletions packages/third-parties/agenda/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ const mongoConnectionString = "mongodb://127.0.0.1/agenda";
@Configuration({
agenda: {
enabled: true, // Enable Agenda jobs for this instance.
// drainJobsBeforeStop: true, // Wait for jobs to finish before stopping the agenda process.
// disableJobProcessing: true, // Prevents jobs from being processed.
// pass any options that you would normally pass to new Agenda(), e.g.
db: {
address: mongoConnectionString
Expand Down
22 changes: 22 additions & 0 deletions packages/third-parties/agenda/src/AgendaModule.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jest.mock("agenda", () => {
Agenda: class {
close = jest.fn();
stop = jest.fn();
drain = jest.fn();
define = jest.fn();
every = jest.fn();
schedule = jest.fn();
Expand Down Expand Up @@ -131,11 +132,32 @@ describe("AgendaModule", () => {

await agendaModule.$onDestroy();

expect(agendaModule.agenda.stop).toHaveBeenCalledWith();
expect(agendaModule.agenda.close).toHaveBeenCalledWith({force: true});
});
});
});
describe("when agenda is enabled and drainJobsBeforeClose = true", () => {
beforeEach(() =>
PlatformTest.create({
agenda: {
enabled: true,
drainJobsBeforeClose: true
}
})
);
afterEach(() => PlatformTest.reset());
describe("$onDestroy()", () => {
it("should close agenda", async () => {
const agendaModule = PlatformTest.get<any>(AgendaModule)!;

await agendaModule.$onDestroy();

expect(agendaModule.agenda.drain).toHaveBeenCalledWith();
expect(agendaModule.agenda.close).toHaveBeenCalledWith({force: true});
});
});
});
describe("when agenda is enabled but disableJobProcessing = true", () => {
beforeEach(() =>
PlatformTest.create({
Expand Down
32 changes: 26 additions & 6 deletions packages/third-parties/agenda/src/AgendaModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export class AgendaModule implements OnDestroy, AfterListen {
@Constant("agenda.enabled", false)
private enabled: boolean;

@Constant("agenda.drainJobsBeforeClose", false)
private drainJobsBeforeClose: boolean;

@Constant("agenda.disableJobProcessing", false)
private disableJobProcessing: boolean;

Expand All @@ -28,7 +31,10 @@ export class AgendaModule implements OnDestroy, AfterListen {
const providers = this.getProviders();

if (!this.disableJobProcessing) {
this.logger.info("Agenda add definitions...");
this.logger.info({
event: "AGENDA_ADD_DEFINITIONS",
message: "Agenda add definitions"
});
providers.forEach((provider) => this.addAgendaDefinitionsForProvider(provider));

await this.injector.emit("$beforeAgendaStart");
Expand All @@ -37,22 +43,36 @@ export class AgendaModule implements OnDestroy, AfterListen {
await this.agenda.start();

if (!this.disableJobProcessing) {
this.logger.info("Agenda add scheduled jobs...");
this.logger.info({
event: "AGENDA_ADD_JOBS",
message: "Agenda add scheduled jobs"
});
await Promise.all(providers.map((provider) => this.scheduleJobsForProvider(provider)));

await this.injector.emit("$afterAgendaStart");
}
} else {
this.logger.info("Agenda disabled...");
}
}

async $onDestroy(): Promise<any> {
if (this.enabled) {
await this.agenda.stop();
if (this.drainJobsBeforeClose) {
this.logger.info({
event: "AGENDA_DRAIN",
message: "Agenda is draining all jobs before close"
});
await this.agenda.drain();
} else {
this.logger.info({
event: "AGENDA_STOP",
message: "Agenda is stopping"
});
await this.agenda.stop();
}

await this.agenda.close({force: true});

this.logger.info("Agenda stopped...");
this.logger.info({event: "AGENDA_STOP", message: "Agenda stopped"});
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/third-parties/agenda/src/interfaces/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ declare global {
*/
enabled?: boolean;
disableJobProcessing?: boolean;
drainJobsBeforeClose?: boolean;
} & AgendaConfig;
}
}
Expand Down

0 comments on commit 3c10b05

Please sign in to comment.