Skip to content

Commit

Permalink
fix: data fetcher add graceful shutdown timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
vasyl-ivanchuk committed Mar 4, 2024
1 parent 0d96121 commit 417b044
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 19 deletions.
2 changes: 2 additions & 0 deletions packages/data-fetcher/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
LOG_LEVEL=debug
PORT=3040

GRACEFUL_SHUTDOWN_TIMEOUT_MS=0

BLOCKCHAIN_RPC_URL=http://localhost:3050

RPC_CALLS_DEFAULT_RETRY_TIMEOUT=30000
Expand Down
2 changes: 2 additions & 0 deletions packages/data-fetcher/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ export default () => {
WS_MAX_CONNECTIONS,
USE_WEBSOCKETS_FOR_TRANSACTIONS,
MAX_BLOCKS_BATCH_SIZE,
GRACEFUL_SHUTDOWN_TIMEOUT_MS,
} = process.env;

return {
Expand All @@ -31,5 +32,6 @@ export default () => {
useWebSocketsForTransactions: USE_WEBSOCKETS_FOR_TRANSACTIONS === "true",
},
maxBlocksBatchSize: parseInt(MAX_BLOCKS_BATCH_SIZE, 10) || 20,
gracefulShutdownTimeoutMs: parseInt(GRACEFUL_SHUTDOWN_TIMEOUT_MS, 10) || 0,
};
};
17 changes: 14 additions & 3 deletions packages/data-fetcher/src/health/health.controller.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
import { Logger, Controller, Get, OnApplicationShutdown, BeforeApplicationShutdown } from "@nestjs/common";
import { HealthCheckService, HealthCheck, HealthCheckResult } from "@nestjs/terminus";
import { HttpAdapterHost } from "@nestjs/core";
import { ConfigService } from "@nestjs/config";
import { JsonRpcHealthIndicator } from "./jsonRpcProvider.health";
import { setTimeout } from "node:timers/promises";

@Controller(["health", "ready"])
export class HealthController implements OnApplicationShutdown, BeforeApplicationShutdown {
private readonly logger: Logger;
private readonly gracefulShutdownTimeoutMs: number;

constructor(
private readonly healthCheckService: HealthCheckService,
private readonly jsonRpcHealthIndicator: JsonRpcHealthIndicator,
private readonly httpAdapter: HttpAdapterHost
private readonly httpAdapter: HttpAdapterHost,
configService: ConfigService
) {
this.logger = new Logger(HealthController.name);
this.gracefulShutdownTimeoutMs = configService.get<number>("gracefulShutdownTimeoutMs");
}

@Get()
Expand All @@ -27,24 +32,30 @@ export class HealthController implements OnApplicationShutdown, BeforeApplicatio
}

async beforeApplicationShutdown(signal?: string): Promise<void> {
this.logger.debug(`Received termination signal ${signal || ""}`);
const httpServer = this.httpAdapter.httpAdapter.getHttpServer();
await new Promise((resolve) => {
httpServer.getConnections((_, count) => {
this.logger.debug({ message: `Number of active connections: ${count}` });
resolve(true);
});
});
this.logger.debug({ message: "Before application shutdown called", signal });

if (signal === "SIGTERM") {
this.logger.debug(`Awaiting ${this.gracefulShutdownTimeoutMs}ms before shutdown`);
await setTimeout(this.gracefulShutdownTimeoutMs);
this.logger.debug(`Timeout reached, shutting down now`);
}
}

async onApplicationShutdown(signal?: string): Promise<void> {
this.logger.debug({ message: "On application shutdown called", signal });
const httpServer = this.httpAdapter.httpAdapter.getHttpServer();
await new Promise((resolve) => {
httpServer.getConnections((_, count) => {
this.logger.debug({ message: `Number of active connections: ${count}` });
resolve(true);
});
});
this.logger.debug({ message: "On application shutdown called", signal });
}
}
4 changes: 0 additions & 4 deletions packages/data-fetcher/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ async function bootstrap() {
app.enableShutdownHooks();
app.useGlobalInterceptors(new ResponseTransformInterceptor());
await app.listen(configService.get("port"));

setInterval(() => {
logger.debug({ message: "I'm alive", context: "Main" });
}, 1000);
}

bootstrap();
17 changes: 5 additions & 12 deletions packages/worker/src/block/block.watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,11 @@ export class BlockWatcher implements OnModuleInit, OnModuleDestroy {
private async getBlockInfoFromBlockChain(blockNumber: number): Promise<BlockData> {
this.logger.debug({ message: "Getting block from the blockchain", blockNumber });

//const stopGetBlockInfoDurationMetric = this.getBlockInfoDurationMetric.startTimer();
this.dataFetchService
.getBlockData(blockNumber)
.then((_) => {
this.logger.debug({ message: "Received block from the blockchain", blockNumber });
})
.catch((e) => {
this.logger.error(e);
});
//stopGetBlockInfoDurationMetric();

return null;
const stopGetBlockInfoDurationMetric = this.getBlockInfoDurationMetric.startTimer();
const blockData = await this.dataFetchService.getBlockData(blockNumber);
stopGetBlockInfoDurationMetric();

return blockData;
}

public async onModuleInit(): Promise<void> {
Expand Down

0 comments on commit 417b044

Please sign in to comment.