Skip to content

Commit

Permalink
fix(core): Wait for worker tasks to complete on app shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelbromley committed Jun 19, 2019
1 parent 33b2fe1 commit 2a9fb0b
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 9 deletions.
15 changes: 11 additions & 4 deletions packages/core/src/app.module.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
import { MiddlewareConsumer, Module, NestModule, OnModuleDestroy } from '@nestjs/common';
import { MiddlewareConsumer, Module, NestModule, OnApplicationShutdown, OnModuleDestroy } from '@nestjs/common';
import cookieSession = require('cookie-session');
import { RequestHandler } from 'express';
import { GraphQLDateTime } from 'graphql-iso-date';

import { ApiModule } from './api/api.module';
import { ConfigModule } from './config/config.module';
import { ConfigService } from './config/config.service';
import { Logger } from './config/logger/vendure-logger';
import { validateCustomFieldsConfig } from './entity/custom-entity-fields';
import { I18nModule } from './i18n/i18n.module';
import { I18nService } from './i18n/i18n.service';

@Module({
imports: [ConfigModule, I18nModule, ApiModule],
})
export class AppModule implements NestModule, OnModuleDestroy {
constructor(private configService: ConfigService, private i18nService: I18nService) {}
export class AppModule implements NestModule, OnModuleDestroy, OnApplicationShutdown {
constructor(private configService: ConfigService,
private i18nService: I18nService) {}

configure(consumer: MiddlewareConsumer) {
const { adminApiPath, shopApiPath } = this.configService;
Expand Down Expand Up @@ -51,6 +52,12 @@ export class AppModule implements NestModule, OnModuleDestroy {
}
}

onApplicationShutdown(signal?: string) {
if (signal) {
Logger.info('Received shutdown signal:' + signal);
}
}

/**
* Groups middleware handlers together in an object with the route as the key.
*/
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export async function bootstrap(userConfig: Partial<VendureConfig>): Promise<INe
app.useLogger(new Logger());
await runPluginOnBootstrapMethods(config, app);
await app.listen(config.port, config.hostname);
app.enableShutdownHooks();
if (config.workerOptions.runInMainProcess) {
const worker = await bootstrapWorkerInternal(config);
Logger.warn(`Worker is running in main process. This is not recommended for production.`);
Expand Down Expand Up @@ -77,6 +78,7 @@ async function bootstrapWorkerInternal(userConfig: Partial<VendureConfig>): Prom
});
DefaultLogger.restoreOriginalLogLevel();
workerApp.useLogger(new Logger());
workerApp.enableShutdownHooks();
await workerApp.listenAsync();
workerWelcomeMessage(config);
return workerApp;
Expand Down
4 changes: 1 addition & 3 deletions packages/core/src/plugin/plugin.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DynamicModule, Module } from '@nestjs/common';
import { ClientProxyFactory, ClientsModule, Transport } from '@nestjs/microservices';
import { ClientProxyFactory } from '@nestjs/microservices';
import { Type } from '@vendure/common/lib/shared-types';
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';

Expand Down Expand Up @@ -55,11 +55,9 @@ export class PluginModule {
}

static forWorker(): DynamicModule {
const controllers = getWorkerControllers();
return {
module: PluginModule,
imports: [ServiceModule.forWorker()],
controllers,
};
}
}
Expand Down
25 changes: 25 additions & 0 deletions packages/core/src/worker/message-interceptor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common';
import { Observable } from 'rxjs';
import { finalize, tap } from 'rxjs/operators';

import { WorkerMonitor } from './worker-monitor';

/**
* This interceptor is used to keep track of open worker tasks, so that the WorkerModule
* is not allowed to be destroyed while tasks are in progress.
*/
@Injectable()
export class MessageInterceptor implements NestInterceptor {
constructor(private monitor: WorkerMonitor) {}

intercept(context: ExecutionContext, next: CallHandler): Observable<any> {
this.monitor.increment();
return next
.handle()
.pipe(
finalize(() => {
this.monitor.decrement();
}),
);
}
}
37 changes: 37 additions & 0 deletions packages/core/src/worker/worker-monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import { Injectable } from '@nestjs/common';
import { BehaviorSubject } from 'rxjs';
import { debounceTime, takeWhile, tap } from 'rxjs/operators';

import { Logger } from '../config/logger/vendure-logger';

/**
* This service is responsible for keeping track of incomplete worker tasks
* to ensure that the WorkerModule is not destroyed before active tasks complete.
*/
@Injectable()
export class WorkerMonitor {
openTasks = new BehaviorSubject<number>(0);
get openTaskCount(): number {
return this.openTasks.value;
}
increment() {
this.openTasks.next(this.openTasks.value + 1);
}
decrement() {
this.openTasks.next(this.openTasks.value - 1);
}
waitForOpenTasksToComplete(): Promise<number> {
if (0 < this.openTaskCount) {
Logger.info('Waiting for open worker tasks to complete...');
}
return this.openTasks.asObservable().pipe(
tap(count => {
if (0 < count) {
Logger.info(`${count} tasks open`);
}
}),
debounceTime(100),
takeWhile(value => value > 0),
).toPromise();
}
}
38 changes: 36 additions & 2 deletions packages/core/src/worker/worker.module.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,48 @@
import { Module } from '@nestjs/common';
import { Module, OnApplicationShutdown, OnModuleDestroy } from '@nestjs/common';
import { APP_INTERCEPTOR } from '@nestjs/core';
import { notNullOrUndefined } from '@vendure/common/lib/shared-utils';

import { getConfig } from '../config/config-helpers';
import { ConfigModule } from '../config/config.module';
import { Logger } from '../config/logger/vendure-logger';
import { PluginModule } from '../plugin/plugin.module';
import { ServiceModule } from '../service/service.module';

import { MessageInterceptor } from './message-interceptor';
import { WorkerMonitor } from './worker-monitor';

@Module({
imports: [
ConfigModule,
ServiceModule.forWorker(),
PluginModule.forWorker(),
],
providers: [
WorkerMonitor,
{
provide: APP_INTERCEPTOR,
useClass: MessageInterceptor,
},
],
controllers: getWorkerControllers(),
})
export class WorkerModule {}
export class WorkerModule implements OnModuleDestroy, OnApplicationShutdown {
constructor(private monitor: WorkerMonitor) {}
onModuleDestroy() {
return this.monitor.waitForOpenTasksToComplete();
}

onApplicationShutdown(signal?: string) {
if (signal) {
Logger.info('Worker Received shutdown signal:' + signal);
}
}
}

function getWorkerControllers() {
const plugins = getConfig().plugins;
return plugins
.map(p => (p.defineWorkers ? p.defineWorkers() : undefined))
.filter(notNullOrUndefined)
.reduce((flattened, controllers) => flattened.concat(controllers), []);
}

0 comments on commit 2a9fb0b

Please sign in to comment.