From 14ba7a5f4805431cd4397f40b8a7e53bb51356db Mon Sep 17 00:00:00 2001 From: Gustavo Henke Date: Fri, 12 Jan 2024 14:14:35 -0300 Subject: [PATCH 1/2] Implement exponential back-off --- bin/concurrently.ts | 7 ++-- src/flow-control/restart-process.spec.ts | 46 +++++++++++++++++++++--- src/flow-control/restart-process.ts | 18 +++++++--- src/index.ts | 4 +-- 4 files changed, 60 insertions(+), 15 deletions(-) diff --git a/bin/concurrently.ts b/bin/concurrently.ts index 13cfe271..9c5d6aca 100755 --- a/bin/concurrently.ts +++ b/bin/concurrently.ts @@ -165,9 +165,9 @@ const args = yargs(argsBeforeSep) type: 'number', }, 'restart-after': { - describe: 'Delay time to respawn the process, in milliseconds.', + describe: 'Delay before restarting the process, in milliseconds, or "exponential".', default: defaults.restartDelay, - type: 'number', + type: 'string', }, // Input @@ -223,7 +223,8 @@ concurrently( prefix: args.prefix, prefixColors: args.prefixColors.split(','), prefixLength: args.prefixLength, - restartDelay: args.restartAfter, + restartDelay: + args.restartAfter === 'exponential' ? 'exponential' : Number(args.restartAfter), restartTries: args.restartTries, successCondition: args.success, timestampFormat: args.timestampFormat, diff --git a/src/flow-control/restart-process.spec.ts b/src/flow-control/restart-process.spec.ts index f0759faa..b46192f1 100644 --- a/src/flow-control/restart-process.spec.ts +++ b/src/flow-control/restart-process.spec.ts @@ -1,5 +1,5 @@ import { createMockInstance } from 'jest-create-mock-instance'; -import { TestScheduler } from 'rxjs/testing'; +import { VirtualTimeScheduler } from 'rxjs'; import { createFakeCloseEvent, FakeCommand } from '../fixtures/fake-command'; import { Logger } from '../logger'; @@ -8,12 +8,14 @@ import { RestartProcess } from './restart-process'; let commands: FakeCommand[]; let controller: RestartProcess; let logger: Logger; -let scheduler: TestScheduler; +let scheduler: VirtualTimeScheduler; beforeEach(() => { commands = [new FakeCommand(), new FakeCommand()]; - logger = createMockInstance(Logger); - scheduler = new TestScheduler(() => true); + + // Don't use TestScheduler as it's hardcoded to a max number of "frames" (time), + // which don't work for some tests in this suite + scheduler = new VirtualTimeScheduler(); controller = new RestartProcess({ logger, scheduler, @@ -34,7 +36,23 @@ it('does not restart processes that complete with success', () => { expect(commands[1].start).toHaveBeenCalledTimes(0); }); -it('restarts processes that fail after delay has passed', () => { +it('restarts processes that fail immediately, if no delay was passed', () => { + controller = new RestartProcess({ logger, scheduler, tries: 1 }); + controller.handle(commands); + + commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); + scheduler.flush(); + + expect(scheduler.now()).toBe(0); + expect(logger.logCommandEvent).toHaveBeenCalledTimes(1); + expect(logger.logCommandEvent).toHaveBeenCalledWith( + `${commands[0].command} restarted`, + commands[0], + ); + expect(commands[0].start).toHaveBeenCalledTimes(1); +}); + +it('restarts processes that fail after delay ms has passed', () => { controller.handle(commands); commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); @@ -42,6 +60,7 @@ it('restarts processes that fail after delay has passed', () => { scheduler.flush(); + expect(scheduler.now()).toBe(100); expect(logger.logCommandEvent).toHaveBeenCalledTimes(1); expect(logger.logCommandEvent).toHaveBeenCalledWith( `${commands[0].command} restarted`, @@ -51,6 +70,23 @@ it('restarts processes that fail after delay has passed', () => { expect(commands[1].start).not.toHaveBeenCalled(); }); +it('restarts processes that fail with an exponential back-off', () => { + const tries = 4; + controller = new RestartProcess({ logger, scheduler, tries, delay: 'exponential' }); + controller.handle(commands); + + let time = 0; + for (let i = 0; i < tries; i++) { + commands[0].close.next(createFakeCloseEvent({ exitCode: 1 })); + scheduler.flush(); + + time += Math.pow(2, i) * 1000; + expect(scheduler.now()).toBe(time); + expect(logger.logCommandEvent).toHaveBeenCalledTimes(i + 1); + expect(commands[0].start).toHaveBeenCalledTimes(i + 1); + } +}); + it('restarts processes up to tries', () => { controller.handle(commands); diff --git a/src/flow-control/restart-process.ts b/src/flow-control/restart-process.ts index d6afe33d..b65a5b87 100644 --- a/src/flow-control/restart-process.ts +++ b/src/flow-control/restart-process.ts @@ -1,18 +1,20 @@ import * as Rx from 'rxjs'; -import { defaultIfEmpty, delay, filter, map, skip, take, takeWhile } from 'rxjs/operators'; +import { defaultIfEmpty, delayWhen, filter, map, skip, take, takeWhile } from 'rxjs/operators'; import { Command } from '../command'; import * as defaults from '../defaults'; import { Logger } from '../logger'; import { FlowController } from './flow-controller'; +export type RestartDelay = number | 'exponential'; + /** * Restarts commands that fail up to a defined number of times. */ export class RestartProcess implements FlowController { private readonly logger: Logger; private readonly scheduler?: Rx.SchedulerLike; - readonly delay: number; + private readonly delay: RestartDelay; readonly tries: number; constructor({ @@ -21,13 +23,13 @@ export class RestartProcess implements FlowController { logger, scheduler, }: { - delay?: number; + delay?: RestartDelay; tries?: number; logger: Logger; scheduler?: Rx.SchedulerLike; }) { this.logger = logger; - this.delay = delay != null ? +delay : defaults.restartDelay; + this.delay = delay != null ? delay : 0; this.tries = tries != null ? +tries : defaults.restartTries; this.tries = this.tries < 0 ? Infinity : this.tries; this.scheduler = scheduler; @@ -38,6 +40,12 @@ export class RestartProcess implements FlowController { return { commands }; } + const delayOperator = delayWhen((_, index) => { + const { delay } = this; + const value = delay === 'exponential' ? Math.pow(2, index) * 1000 : delay; + return Rx.timer(value, this.scheduler); + }); + commands .map((command) => command.close.pipe( @@ -50,7 +58,7 @@ export class RestartProcess implements FlowController { // Delay the emission (so that the restarts happen on time), // explicitly telling the subscriber that a restart is needed failure.pipe( - delay(this.delay, this.scheduler), + delayOperator, map(() => true), ), // Skip the first N emissions (as these would be duplicates of the above), diff --git a/src/index.ts b/src/index.ts index 3dce77b6..11d89f38 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,7 +15,7 @@ import { LogError } from './flow-control/log-error'; import { LogExit } from './flow-control/log-exit'; import { LogOutput } from './flow-control/log-output'; import { LogTimings } from './flow-control/log-timings'; -import { RestartProcess } from './flow-control/restart-process'; +import { RestartDelay, RestartProcess } from './flow-control/restart-process'; import { Logger } from './logger'; export type ConcurrentlyOptions = BaseConcurrentlyOptions & { @@ -59,7 +59,7 @@ export type ConcurrentlyOptions = BaseConcurrentlyOptions & { * * @see RestartProcess */ - restartDelay?: number; + restartDelay?: RestartDelay; /** * How many times commands should be restarted when they exit with a failure. From 597a1f80cf160b8e7a1f6ab6926c44cb43793fca Mon Sep 17 00:00:00 2001 From: Gustavo Henke Date: Fri, 12 Jan 2024 16:59:30 -0300 Subject: [PATCH 2/2] Update src/flow-control/restart-process.ts --- src/flow-control/restart-process.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/flow-control/restart-process.ts b/src/flow-control/restart-process.ts index b65a5b87..7a843d35 100644 --- a/src/flow-control/restart-process.ts +++ b/src/flow-control/restart-process.ts @@ -29,7 +29,7 @@ export class RestartProcess implements FlowController { scheduler?: Rx.SchedulerLike; }) { this.logger = logger; - this.delay = delay != null ? delay : 0; + this.delay = delay ?? 0; this.tries = tries != null ? +tries : defaults.restartTries; this.tries = this.tries < 0 ? Infinity : this.tries; this.scheduler = scheduler;