Skip to content

Commit

Permalink
fix(core): Add recoveryInProgress flag file (#6962)
Browse files Browse the repository at this point in the history
Issue: during startup, unfinished executions trigger a recovery process
that, under certain circumstances, can in itself crash the instance
(e.g. by running our of memory), resulting in an infinite recovery loop

This PR aims to change this behaviour by writing a flag file when the
recovery process starts, and removing it when it finishes. In the case
of a crash, this flag will persist and upon the next attempt, the
recovery will instead do the absolute minimal (marking executions as
'crashed'), without attempting any 'crashable' actions.
  • Loading branch information
flipswitchingmonkey committed Aug 18, 2023
1 parent 4fc69b7 commit 7b96820
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 7 deletions.
53 changes: 46 additions & 7 deletions packages/cli/src/eventbus/MessageEventBus/MessageEventBus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import {
} from '../EventMessageClasses/EventMessageGeneric';
import { recoverExecutionDataFromEventLogMessages } from './recoverEvents';
import { METRICS_EVENT_NAME } from '../MessageEventBusDestination/Helpers.ee';
import Container from 'typedi';
import { ExecutionRepository, WorkflowRepository } from '@/databases/repositories';

export type EventMessageReturnMode = 'sent' | 'unsent' | 'all' | 'unfinished';

Expand Down Expand Up @@ -93,6 +95,10 @@ export class MessageEventBus extends EventEmitter {
LoggerProxy.debug('Initializing event writer');
this.logWriter = await MessageEventBusLogWriter.getInstance();

if (!this.logWriter) {
LoggerProxy.warn('Could not initialize event writer');
}

// unsent event check:
// - find unsent messages in current event log(s)
// - cycle event logs and start the logging to a fresh file
Expand All @@ -105,14 +111,47 @@ export class MessageEventBus extends EventEmitter {
this.logWriter?.startLogging();
await this.send(unsentAndUnfinished.unsentMessages);

if (Object.keys(unsentAndUnfinished.unfinishedExecutions).length > 0) {
for (const executionId of Object.keys(unsentAndUnfinished.unfinishedExecutions)) {
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
const unfinishedExecutionIds = Object.keys(unsentAndUnfinished.unfinishedExecutions);

if (unfinishedExecutionIds.length > 0) {
LoggerProxy.warn(`Found unfinished executions: ${unfinishedExecutionIds.join(', ')}`);
LoggerProxy.info('This could be due to a crash of an active workflow or a restart of n8n.');
const activeWorkflows = await Container.get(WorkflowRepository).find({
where: { active: true },
select: ['id', 'name'],
});
if (activeWorkflows.length > 0) {
LoggerProxy.info('Currently active workflows:');
for (const workflowData of activeWorkflows) {
LoggerProxy.info(` - ${workflowData.name} (ID: ${workflowData.id})`);
}
}
if (this.logWriter?.isRecoveryProcessRunning()) {
// if we end up here, it means that the previous recovery process did not finish
// a possible reason would be that recreating the workflow data itself caused e.g an OOM error
// in that case, we do not want to retry the recovery process, but rather mark the executions as crashed
LoggerProxy.warn('Skipping recover process since it previously failed.');
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.info(`Setting status of execution ${executionId} to crashed`);
await Container.get(ExecutionRepository).updateExistingExecution(executionId, {
status: 'crashed',
stoppedAt: new Date(),
});
}
} else {
// start actual recovery process and write recovery process flag file
this.logWriter?.startRecoveryProcess();
for (const executionId of unfinishedExecutionIds) {
LoggerProxy.warn(`Attempting to recover execution ${executionId}`);
await recoverExecutionDataFromEventLogMessages(
executionId,
unsentAndUnfinished.unfinishedExecutions[executionId],
true,
);
}
}
// remove the recovery process flag file
this.logWriter?.endRecoveryProcess();
}

// if configured, run this test every n ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,22 @@ export class MessageEventBusLogWriter {
}
}

startRecoveryProcess() {
if (this.worker) {
this.worker.postMessage({ command: 'startRecoveryProcess', data: {} });
}
}

isRecoveryProcessRunning(): boolean {
return existsSync(this.getRecoveryInProgressFileName());
}

endRecoveryProcess() {
if (this.worker) {
this.worker.postMessage({ command: 'endRecoveryProcess', data: {} });
}
}

private async startThread() {
if (this.worker) {
await this.close();
Expand Down Expand Up @@ -240,6 +256,10 @@ export class MessageEventBusLogWriter {
}
}

getRecoveryInProgressFileName(): string {
return `${MessageEventBusLogWriter.options.logFullBasePath}.recoveryInProgress`;
}

cleanAllLogs() {
for (let i = 0; i <= MessageEventBusLogWriter.options.keepNumberOfFiles; i++) {
if (existsSync(this.getLogFileName(i))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ function setKeepFiles(keepNumberOfFiles: number) {
keepFiles = keepNumberOfFiles;
}

function buildRecoveryInProgressFileName(): string {
return `${logFileBasePath}.recoveryInProgress`;
}

function startRecoveryProcess() {
if (existsSync(buildRecoveryInProgressFileName())) {
return false;
}
const fileHandle = openSync(buildRecoveryInProgressFileName(), 'a');
closeSync(fileHandle);
return true;
}

function endRecoveryProcess() {
if (existsSync(buildRecoveryInProgressFileName())) {
rmSync(buildRecoveryInProgressFileName());
}
}

function buildLogFileNameWithCounter(counter?: number): string {
if (counter) {
return `${logFileBasePath}-${counter}.log`;
Expand Down Expand Up @@ -112,6 +131,14 @@ if (!isMainThread) {
cleanAllLogs();
parentPort?.postMessage('cleanedAllLogs');
break;
case 'startRecoveryProcess':
const recoveryStarted = startRecoveryProcess();
parentPort?.postMessage({ command, data: recoveryStarted });
break;
case 'endRecoveryProcess':
endRecoveryProcess();
parentPort?.postMessage({ command, data: true });
break;
default:
break;
}
Expand Down

0 comments on commit 7b96820

Please sign in to comment.