Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exponential back-off for process restarting #462

Merged
merged 2 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions bin/concurrently.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ const args = yargs(argsBeforeSep)
type: 'number',
},
'restart-after': {
describe: 'Delay time to respawn the process, in milliseconds.',
describe: 'Delay before restarting the process, in milliseconds, or "exponential".',
default: defaults.restartDelay,
type: 'number',
type: 'string',
},

// Input
Expand Down Expand Up @@ -223,7 +223,8 @@ concurrently(
prefix: args.prefix,
prefixColors: args.prefixColors.split(','),
prefixLength: args.prefixLength,
restartDelay: args.restartAfter,
restartDelay:
args.restartAfter === 'exponential' ? 'exponential' : Number(args.restartAfter),
restartTries: args.restartTries,
successCondition: args.success,
timestampFormat: args.timestampFormat,
Expand Down
46 changes: 41 additions & 5 deletions src/flow-control/restart-process.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createMockInstance } from 'jest-create-mock-instance';
import { TestScheduler } from 'rxjs/testing';
import { VirtualTimeScheduler } from 'rxjs';

import { createFakeCloseEvent, FakeCommand } from '../fixtures/fake-command';
import { Logger } from '../logger';
Expand All @@ -8,12 +8,14 @@ import { RestartProcess } from './restart-process';
let commands: FakeCommand[];
let controller: RestartProcess;
let logger: Logger;
let scheduler: TestScheduler;
let scheduler: VirtualTimeScheduler;
beforeEach(() => {
commands = [new FakeCommand(), new FakeCommand()];

logger = createMockInstance(Logger);
scheduler = new TestScheduler(() => true);

// Don't use TestScheduler as it's hardcoded to a max number of "frames" (time),
// which don't work for some tests in this suite
scheduler = new VirtualTimeScheduler();
Comment on lines +15 to +18
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

controller = new RestartProcess({
logger,
scheduler,
Expand All @@ -34,14 +36,31 @@ it('does not restart processes that complete with success', () => {
expect(commands[1].start).toHaveBeenCalledTimes(0);
});

it('restarts processes that fail after delay has passed', () => {
it('restarts processes that fail immediately, if no delay was passed', () => {
controller = new RestartProcess({ logger, scheduler, tries: 1 });
controller.handle(commands);

commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
scheduler.flush();

expect(scheduler.now()).toBe(0);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(1);
expect(logger.logCommandEvent).toHaveBeenCalledWith(
`${commands[0].command} restarted`,
commands[0],
);
expect(commands[0].start).toHaveBeenCalledTimes(1);
});

it('restarts processes that fail after delay ms has passed', () => {
controller.handle(commands);

commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
commands[1].close.next(createFakeCloseEvent({ exitCode: 0 }));

scheduler.flush();

expect(scheduler.now()).toBe(100);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(1);
expect(logger.logCommandEvent).toHaveBeenCalledWith(
`${commands[0].command} restarted`,
Expand All @@ -51,6 +70,23 @@ it('restarts processes that fail after delay has passed', () => {
expect(commands[1].start).not.toHaveBeenCalled();
});

it('restarts processes that fail with an exponential back-off', () => {
const tries = 4;
controller = new RestartProcess({ logger, scheduler, tries, delay: 'exponential' });
controller.handle(commands);

let time = 0;
for (let i = 0; i < tries; i++) {
commands[0].close.next(createFakeCloseEvent({ exitCode: 1 }));
scheduler.flush();

time += Math.pow(2, i) * 1000;
expect(scheduler.now()).toBe(time);
expect(logger.logCommandEvent).toHaveBeenCalledTimes(i + 1);
expect(commands[0].start).toHaveBeenCalledTimes(i + 1);
}
});

it('restarts processes up to tries', () => {
controller.handle(commands);

Expand Down
18 changes: 13 additions & 5 deletions src/flow-control/restart-process.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import * as Rx from 'rxjs';
import { defaultIfEmpty, delay, filter, map, skip, take, takeWhile } from 'rxjs/operators';
import { defaultIfEmpty, delayWhen, filter, map, skip, take, takeWhile } from 'rxjs/operators';

import { Command } from '../command';
import * as defaults from '../defaults';
import { Logger } from '../logger';
import { FlowController } from './flow-controller';

export type RestartDelay = number | 'exponential';

/**
* Restarts commands that fail up to a defined number of times.
*/
export class RestartProcess implements FlowController {
private readonly logger: Logger;
private readonly scheduler?: Rx.SchedulerLike;
readonly delay: number;
private readonly delay: RestartDelay;
readonly tries: number;

constructor({
Expand All @@ -21,13 +23,13 @@ export class RestartProcess implements FlowController {
logger,
scheduler,
}: {
delay?: number;
delay?: RestartDelay;
tries?: number;
logger: Logger;
scheduler?: Rx.SchedulerLike;
}) {
this.logger = logger;
this.delay = delay != null ? +delay : defaults.restartDelay;
this.delay = delay ?? 0;
this.tries = tries != null ? +tries : defaults.restartTries;
this.tries = this.tries < 0 ? Infinity : this.tries;
this.scheduler = scheduler;
Expand All @@ -38,6 +40,12 @@ export class RestartProcess implements FlowController {
return { commands };
}

const delayOperator = delayWhen((_, index) => {
const { delay } = this;
const value = delay === 'exponential' ? Math.pow(2, index) * 1000 : delay;
return Rx.timer(value, this.scheduler);
});

commands
.map((command) =>
command.close.pipe(
Expand All @@ -50,7 +58,7 @@ export class RestartProcess implements FlowController {
// Delay the emission (so that the restarts happen on time),
// explicitly telling the subscriber that a restart is needed
failure.pipe(
delay(this.delay, this.scheduler),
delayOperator,
map(() => true),
),
// Skip the first N emissions (as these would be duplicates of the above),
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { LogError } from './flow-control/log-error';
import { LogExit } from './flow-control/log-exit';
import { LogOutput } from './flow-control/log-output';
import { LogTimings } from './flow-control/log-timings';
import { RestartProcess } from './flow-control/restart-process';
import { RestartDelay, RestartProcess } from './flow-control/restart-process';
import { Logger } from './logger';

export type ConcurrentlyOptions = BaseConcurrentlyOptions & {
Expand Down Expand Up @@ -59,7 +59,7 @@ export type ConcurrentlyOptions = BaseConcurrentlyOptions & {
*
* @see RestartProcess
*/
restartDelay?: number;
restartDelay?: RestartDelay;

/**
* How many times commands should be restarted when they exit with a failure.
Expand Down