From 285fc9fd225a6bf8e007df95ee08135032b2537c Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 7 Feb 2025 15:56:33 -0600 Subject: [PATCH 01/10] feat: adds CompositeDataSource for FDv2 --- .../DataSystem/CompositeDataSource.test.ts | 208 ++++++++++++++++++ .../subsystem/DataSystem/CallbackHandler.ts | 93 ++++++++ .../DataSystem/CompositeDataSource.ts | 153 +++++++++++++ .../api/subsystem/DataSystem/DataSource.ts | 35 +++ .../DataSystem/DataSystemInitializer.ts | 24 ++ .../DataSystem/DataSystemSynchronizer.ts | 24 ++ .../src/api/subsystem/DataSystem/index.ts | 3 + .../sdk-server/src/options/Configuration.ts | 9 +- 8 files changed, 545 insertions(+), 4 deletions(-) create mode 100644 packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts create mode 100644 packages/shared/common/src/api/subsystem/DataSystem/index.ts diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts new file mode 100644 index 0000000000..bb4f9ef49e --- /dev/null +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -0,0 +1,208 @@ +import { CompositeDataSource } from '../../../src/api/subsystem/DataSystem/CompositeDataSource'; +import { Data, HealthStatus } from '../../../src/api/subsystem/DataSystem/DataSource'; +import { + DataSystemInitializer, + InitializerFactory, +} from '../../../src/api/subsystem/DataSystem/DataSystemInitializer'; +import { + DataSystemSynchronizer, + SynchronizerFactory, +} from '../../../src/api/subsystem/DataSystem/DataSystemSynchronizer'; + +function makeInitializerFactory(internal: DataSystemInitializer): InitializerFactory { + return { + create: () => internal, + }; +} + +function makeSynchronizerFactory(internal: DataSystemSynchronizer): SynchronizerFactory { + return { + create: () => internal, + }; +} + +it('initializer gets basis, switch to syncrhonizer', async () => { + const mockInitializer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _dataCallback(false, { key: 'sync1' }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + ); + const callback = jest.fn(); + underTest.run(callback, jest.fn()); + + // pause so scheduler can resolve awaits + await new Promise((f) => { + setTimeout(f, 1); + }); + + expect(mockInitializer1.run).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.run).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledTimes(2); + expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); +}); + +it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2, recover to synchronizer 1', async () => { + const mockInitializer1: DataSystemInitializer = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + let sync1RunCount = 0; + const mockSynchronizer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + if (sync1RunCount === 0) { + _errorHander({ name: 'Error', message: 'I am error...man!' }); // error that will lead to fallback + } else { + _dataCallback(false, { key: 'sync1' }); // second run will lead to data + } + sync1RunCount += 1; + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer2 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _dataCallback(false, { key: 'sync2' }); + _statusCallback(HealthStatus.Online, Number.MAX_VALUE); // this should lead to recovery + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + ); + const callback = jest.fn(); + underTest.run(callback, jest.fn()); + + // pause so scheduler can resolve awaits + await new Promise((f) => { + setTimeout(f, 1); + }); + expect(mockInitializer1.run).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); + expect(mockSynchronizer2.run).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledTimes(3); + expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); // sync1 errors and fallsback + expect(callback).toHaveBeenNthCalledWith(3, false, { key: 'sync1' }); // sync2 recovers back to sync1 +}); + +it('it reports error when all initializers fail', async () => { + const mockInitializer1: DataSystemInitializer = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _errorHander({ name: 'Error', message: 'I am initializer1 error!' }); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2: DataSystemInitializer = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: HealthStatus, durationMS: number) => void, + _errorHander: (err: Error) => void, + ) => { + _errorHander({ name: 'Error', message: 'I am initializer2 error!' }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], + [], // no synchronizers for this test + ); + + const dataCallback = jest.fn(); + const errorCallback = jest.fn(); + underTest.run(dataCallback, errorCallback); + + // pause so scheduler can resolve awaits + await new Promise((f) => { + setTimeout(f, 1); + }); + + expect(mockInitializer1.run).toHaveBeenCalledTimes(1); + expect(mockInitializer2.run).toHaveBeenCalledTimes(1); + expect(dataCallback).toHaveBeenCalledTimes(0); + expect(errorCallback).toHaveBeenCalledTimes(3); + expect(errorCallback).toHaveBeenNthCalledWith(1, { + name: 'Error', + message: 'I am initializer1 error!', + }); + expect(errorCallback).toHaveBeenNthCalledWith(2, { + name: 'Error', + message: 'I am initializer2 error!', + }); + expect(errorCallback).toHaveBeenNthCalledWith(3, { + name: 'ExhaustedDataSources', + message: 'CompositeDataSource has exhausted all configured datasources.', + }); +}); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts new file mode 100644 index 0000000000..29212222df --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -0,0 +1,93 @@ +import { Data, HealthStatus } from './DataSource'; + +/** + * Represents a transition between data sources. + */ +export enum Transition { + /** + * Transition from current data source to the first synchronizer. + */ + SwitchToSync, + + /** + * Transition to the next data source of the same kind. + */ + Fallback, + + /** + * Transition to the first data source of the same kind. + */ + Recover, + + /** + * A no-op transition. + */ + None, +} + +/** + * Evaluated to determine if a transition should occur. + */ +export type TransitionCondition = (status: HealthStatus, durationMS: number) => boolean; + +/** + * Handler that connects the current {@link DataSource} to the {@link CompositeDataSource}. A single + * {@link CallbackHandler} should only be given to one {@link DataSource}. Once an instance of + * a {@link CallbackHandler} triggers a transition, it will disable itself so that future invocatons + * on it are no-op. + */ +export class CallbackHandler { + private _disabled: boolean = false; + + constructor( + private readonly _dataCallback: (basis: boolean, data: Data) => void, + private readonly _errorCallback: (err: any) => void, + private readonly _triggerTransition: (value: Transition | PromiseLike) => void, + private readonly _isInitializer: boolean, + private readonly _recoveryCondition: (status: HealthStatus, durationMS: number) => boolean, + private readonly _fallbackCondition: (status: HealthStatus, durationMS: number) => boolean, + ) {} + + dataHanlder = async (basis: boolean, data: Data) => { + if (this._disabled) { + return; + } + + // report data up + this._dataCallback(basis, data); + + // TODO: SDK-1044 track selector for future synchronizer to use + if (basis && this._isInitializer) { + this._disabled = true; // getting basis means this initializer has done its job, time to move on to sync! + this._triggerTransition(Transition.SwitchToSync); + } + }; + + statusHandler = async (status: HealthStatus, durationMS: number) => { + if (this._disabled) { + return; + } + + if (this._recoveryCondition(status, durationMS)) { + this._disabled = true; + this._triggerTransition(Transition.Recover); + } else if (this._fallbackCondition(status, durationMS)) { + this._disabled = true; + this._triggerTransition(Transition.Fallback); + } + }; + + errorHandler = async (err: any) => { + // TODO: unrecoverable error handling + if (this._disabled) { + return; + } + this._disabled = true; + + // TODO: should this error be reported or contained silently if we have a fallback? + // report error up, discuss with others on team. + this._errorCallback(err); + + this._triggerTransition(Transition.Fallback); + }; +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts new file mode 100644 index 0000000000..132b3f7095 --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -0,0 +1,153 @@ +import { CallbackHandler, Transition, TransitionCondition } from './CallbackHandler'; +import { Data, DataSource, HealthStatus } from './DataSource'; +import { DataSystemInitializer, InitializerFactory } from './DataSystemInitializer'; +import { DataSystemSynchronizer, SynchronizerFactory } from './DataSystemSynchronizer'; + +/** + * The {@link CompositeDataSource} can combine a number of {@link DataSystemInitializer}s and {@link DataSystemSynchronizer}s + * into a single {@link DataSource}, implementing fallback and recovery logic internally to choose where data is sourced from. + */ +export class CompositeDataSource implements DataSource { + // TODO: SDK-856 async notification if initializer takes too long + // TODO: SDK-1044 utilize selector from initializers + + // TODO: add datasource status APIs + + private readonly _defaultFallbackTimeMs = 2 * 60 * 1000; + private readonly _defaultRecoveryTimeMs = 5 * 60 * 1000; + + private _initPhaseActive: boolean = true; + private _currentPosition: number = 0; + + private _stopped: boolean = true; + private _externalStopPromise: Promise; + private _externalStopResolve?: (value: Transition) => void; + + /** + * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. + * @param _synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. + */ + constructor( + private readonly _initializers: InitializerFactory[], + private readonly _synchronizers: SynchronizerFactory[], + ) { + this._externalStopPromise = new Promise((transition) => { + this._externalStopResolve = transition; + }); + this._initPhaseActive = true; + this._currentPosition = 0; + } + + async run( + dataCallback: (basis: boolean, data: Data) => void, + errorCallback: (err: Error) => void, + ): Promise { + if (!this._stopped) { + // don't allow multiple simultaneous runs + return; + } + this._stopped = false; + + let transition: Transition = Transition.None; // first loop has no transition + while (!this._stopped) { + const current: DataSystemInitializer | DataSystemSynchronizer | undefined = + this._pickDataSource(transition); + if (current === undefined) { + errorCallback({ + name: 'ExhaustedDataSources', + message: 'CompositeDataSource has exhausted all configured datasources.', + }); + return; + } + + const internalTransitionPromise = new Promise((transitionResolve) => { + const recoveryCondition = this._makeRecoveryCondition(); + const fallbackCondition = this._makeFallbackCondition(); + const callbackHandler = new CallbackHandler( + dataCallback, + errorCallback, + transitionResolve, + this._initPhaseActive, + recoveryCondition, + fallbackCondition, + ); + current.run( + callbackHandler.dataHanlder, + callbackHandler.statusHandler, + callbackHandler.errorHandler, + ); + }); + + // await transition triggered by internal data source or an external stop request + // eslint-disable-next-line no-await-in-loop + transition = await Promise.race([internalTransitionPromise, this._externalStopPromise]); + + // stop the current datasource before transitioning to next state + current.stop(); + } + + // reset so that run can be called again in the future + this._reset(); + } + + async stop() { + this._stopped = true; + this._externalStopResolve?.(Transition.None); // TODO: this feels a little hacky. + } + + private _reset() { + this._initPhaseActive = true; + this._currentPosition = 0; + this._externalStopPromise = new Promise((transition) => { + this._externalStopResolve = transition; + }); + } + + private _pickDataSource( + transition: Transition | undefined, + ): DataSystemInitializer | DataSystemSynchronizer | undefined { + switch (transition) { + case Transition.SwitchToSync: + this._initPhaseActive = false; // one way toggle to false, unless this class is reset() + this._currentPosition = 0; + break; + case Transition.Fallback: + this._currentPosition += 1; + break; + case Transition.Recover: + this._currentPosition = 0; + break; + case Transition.None: + default: + // don't do anything in this case + break; + } + + if (this._initPhaseActive) { + // if outside range of initializers, don't loop back to start, instead return undefined + if (this._currentPosition > this._initializers.length - 1) { + return undefined; + } + + return this._initializers[this._currentPosition].create(); + } + + // getting here indicates we are using a synchronizer + this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start + if (this._currentPosition > this._synchronizers.length - 1) { + // this is only possible if no synchronizers were provided + return undefined; + } + return this._synchronizers[this._currentPosition].create(); + } + + private _makeFallbackCondition(): TransitionCondition { + return (status: HealthStatus, durationMS: number) => + status === HealthStatus.Interrupted && durationMS >= this._defaultFallbackTimeMs; + } + + private _makeRecoveryCondition(): TransitionCondition { + return (status: HealthStatus, durationMS: number) => + status === HealthStatus.Online && durationMS >= this._defaultRecoveryTimeMs; + } +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts new file mode 100644 index 0000000000..3e80f7324a --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -0,0 +1,35 @@ +export interface Data {} + +export enum HealthStatus { + Online, + Interrupted, +} + +export interface DataSource { + /** + * May be called any number of times, if already started, has no effect + * @param cb may be invoked many times + * @returns + */ + run(dataCallback: (basis: boolean, data: Data) => void, errorHander: (err: Error) => void): void; + + /** + * May be called any number of times, if already stopped, has no effect. + * @param cb + * @returns + */ + stop(): void; +} + +export interface DataSourceWithStatus { + /** + * May be called any number of times, if already started, has no effect + * @param cb may be invoked many times + * @returns + */ + run( + dataCallback: (basis: boolean, data: Data) => void, + statusCallback: (status: HealthStatus, durationMS: number) => void, + errorHander: (err: any) => void, + ): void; +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts new file mode 100644 index 0000000000..4d53b79bd5 --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts @@ -0,0 +1,24 @@ +import { Data, HealthStatus } from './DataSource'; + +/** + * Will make best effort to retrieve all data. Data recieved will be reported via the {@link dataCallback}. Status changes + * will be reported via the {@link statusCallback}. Errors will be reported via the {@link errorCallback}. + */ +export interface DataSystemInitializer { + run( + dataCallback: (basis: boolean, data: Data) => void, + statusCallback: (status: HealthStatus, durationMS: number) => void, + errorCallback: (err: Error) => void, + ): void; + + /** + * May be called any number of times, if already stopped, has no effect. + * @param cb + * @returns + */ + stop(): void; +} + +export interface InitializerFactory { + create(): DataSystemInitializer; +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts new file mode 100644 index 0000000000..09babe13ea --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts @@ -0,0 +1,24 @@ +import { Data, HealthStatus } from './DataSource'; + +/** + * Will make best effort to retrieve data. Data recieved will be reported via the {@link dataCallback}. Status changes + * will be reported via the {@link statusCallback}. Errors will be reported via the {@link errorCallback}. + */ +export interface DataSystemSynchronizer { + run( + dataCallback: (basis: boolean, data: Data) => void, + statusCallback: (status: HealthStatus, durationMS: number) => void, + errorCallback: (err: Error) => void, + ): void; + + /** + * May be called any number of times, if already stopped, has no effect. + * @param cb + * @returns + */ + stop(): void; +} + +export interface SynchronizerFactory { + create(): DataSystemSynchronizer; +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts new file mode 100644 index 0000000000..8cedfee2a4 --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -0,0 +1,3 @@ +export { DataSystemInitializer, InitializerFactory } from './DataSystemInitializer'; +export { DataSystemSynchronizer, SynchronizerFactory } from './DataSystemSynchronizer'; +export { CompositeDataSource } from './CompositeDataSource'; diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 77baf0de39..fe14a43f53 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -247,10 +247,6 @@ export default class Configuration { this.pollInterval = validatedOptions.pollInterval; this.proxyOptions = validatedOptions.proxyOptions; - this.offline = validatedOptions.offline; - this.stream = validatedOptions.stream; - this.streamInitialReconnectDelay = validatedOptions.streamInitialReconnectDelay; - this.useLdd = validatedOptions.useLdd; this.sendEvents = validatedOptions.sendEvents; this.allAttributesPrivate = validatedOptions.allAttributesPrivate; this.privateAttributes = validatedOptions.privateAttributes; @@ -264,6 +260,11 @@ export default class Configuration { this.tags = new ApplicationTags(validatedOptions); this.diagnosticRecordingInterval = validatedOptions.diagnosticRecordingInterval; + this.offline = validatedOptions.offline; + this.stream = validatedOptions.stream; + this.streamInitialReconnectDelay = validatedOptions.streamInitialReconnectDelay; + this.useLdd = validatedOptions.useLdd; + if (TypeValidators.Function.is(validatedOptions.updateProcessor)) { // @ts-ignore this.updateProcessorFactory = validatedOptions.updateProcessor; From 3e9953b4a8b5f66f50a122a4000021e34c3f2c02 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 21 Feb 2025 12:51:57 -0600 Subject: [PATCH 02/10] implementing status based scheduled transitions and backoff support --- .../src/platform/DefaultBrowserEventSource.ts | 2 +- packages/sdk/browser/tsconfig.json | 2 +- .../DataSystem/CompositeDataSource.test.ts | 348 +++++++++++++++--- .../subsystem/DataSystem/CallbackHandler.ts | 78 +--- .../DataSystem/CompositeDataSource.ts | 228 +++++++++--- .../api/subsystem/DataSystem/DataSource.ts | 39 +- .../DataSystem/DataSystemInitializer.ts | 24 -- .../DataSystem/DataSystemSynchronizer.ts | 24 -- .../common/src/datasource}/Backoff.ts | 0 9 files changed, 511 insertions(+), 234 deletions(-) delete mode 100644 packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts delete mode 100644 packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts rename packages/{sdk/browser/src/platform => shared/common/src/datasource}/Backoff.ts (100%) diff --git a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts index 3ecdeb3a12..4d7af58fe4 100644 --- a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts +++ b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts @@ -6,7 +6,7 @@ import { EventSource as LDEventSource, } from '@launchdarkly/js-client-sdk-common'; -import Backoff from './Backoff'; +import Backoff from '../../../../shared/common/src/datasource/Backoff'; /** * Implementation Notes: diff --git a/packages/sdk/browser/tsconfig.json b/packages/sdk/browser/tsconfig.json index 7306c5b0c6..82f7f65361 100644 --- a/packages/sdk/browser/tsconfig.json +++ b/packages/sdk/browser/tsconfig.json @@ -18,7 +18,7 @@ "types": ["node", "jest"], "allowJs": true }, - "include": ["src"], + "include": ["src", "../../shared/common/src/datasource/Backoff.ts"], "exclude": [ "vite.config.ts", "__tests__", diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index bb4f9ef49e..89097461ab 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1,13 +1,16 @@ -import { CompositeDataSource } from '../../../src/api/subsystem/DataSystem/CompositeDataSource'; -import { Data, HealthStatus } from '../../../src/api/subsystem/DataSystem/DataSource'; import { - DataSystemInitializer, - InitializerFactory, -} from '../../../src/api/subsystem/DataSystem/DataSystemInitializer'; + CompositeDataSource, + Transition, + TransitionConditions, +} from '../../../src/api/subsystem/DataSystem/CompositeDataSource'; import { + Data, + DataSourceState, + DataSystemInitializer, DataSystemSynchronizer, + InitializerFactory, SynchronizerFactory, -} from '../../../src/api/subsystem/DataSystem/DataSystemSynchronizer'; +} from '../../../src/api/subsystem/DataSystem/DataSource'; function makeInitializerFactory(internal: DataSystemInitializer): InitializerFactory { return { @@ -21,6 +24,27 @@ function makeSynchronizerFactory(internal: DataSystemSynchronizer): Synchronizer }; } +function makeTestTransitionConditions(): TransitionConditions { + return { + [DataSourceState.Initializing]: { + durationMS: 10000, + transition: Transition.Fallback, + }, + [DataSourceState.Interrupted]: { + durationMS: 10000, + transition: Transition.Fallback, + }, + [DataSourceState.Closed]: { + durationMS: 10000, + transition: Transition.Fallback, + }, + [DataSourceState.Valid]: { + durationMS: 10000, + transition: Transition.Fallback, + }, + }; +} + it('initializer gets basis, switch to syncrhonizer', async () => { const mockInitializer1 = { run: jest @@ -28,8 +52,7 @@ it('initializer gets basis, switch to syncrhonizer', async () => { .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); }, @@ -37,16 +60,16 @@ it('initializer gets basis, switch to syncrhonizer', async () => { stop: jest.fn(), }; + const mockSynchronizer1Data = { key: 'sync1' }; const mockSynchronizer1 = { run: jest .fn() .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _dataCallback(false, { key: 'sync1' }); + _dataCallback(false, mockSynchronizer1Data); }, ), stop: jest.fn(), @@ -55,13 +78,20 @@ it('initializer gets basis, switch to syncrhonizer', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], + makeTestTransitionConditions(), + 0, + 0, ); - const callback = jest.fn(); - underTest.run(callback, jest.fn()); - // pause so scheduler can resolve awaits - await new Promise((f) => { - setTimeout(f, 1); + let callback; + await new Promise((resolve) => { + callback = jest.fn((_: boolean, data: Data) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.run(callback, jest.fn()); }); expect(mockInitializer1.run).toHaveBeenCalledTimes(1); @@ -78,8 +108,7 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); }, @@ -88,19 +117,22 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 }; let sync1RunCount = 0; + const mockSynchronizer1Data = { key: 'sync1' }; const mockSynchronizer1 = { run: jest .fn() .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { if (sync1RunCount === 0) { - _errorHander({ name: 'Error', message: 'I am error...man!' }); // error that will lead to fallback + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: 'I am error...man!', + }); // error that will lead to fallback } else { - _dataCallback(false, { key: 'sync1' }); // second run will lead to data + _dataCallback(false, mockSynchronizer1Data); // second run will lead to data } sync1RunCount += 1; }, @@ -108,17 +140,17 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 stop: jest.fn(), }; + const mockSynchronizer2Data = { key: 'sync2' }; const mockSynchronizer2 = { run: jest .fn() .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _dataCallback(false, { key: 'sync2' }); - _statusCallback(HealthStatus.Online, Number.MAX_VALUE); // this should lead to recovery + _dataCallback(false, mockSynchronizer2Data); + _statusCallback(DataSourceState.Valid, null); // this should lead to recovery }, ), stop: jest.fn(), @@ -127,14 +159,22 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + makeTestTransitionConditions(), + 0, + 0, ); - const callback = jest.fn(); - underTest.run(callback, jest.fn()); - // pause so scheduler can resolve awaits - await new Promise((f) => { - setTimeout(f, 1); + let callback; + await new Promise((resolve) => { + callback = jest.fn((_: boolean, data: Data) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.run(callback, jest.fn()); }); + expect(mockInitializer1.run).toHaveBeenCalledTimes(1); expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); expect(mockSynchronizer2.run).toHaveBeenCalledTimes(1); @@ -145,31 +185,37 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 }); it('it reports error when all initializers fail', async () => { + const mockInitializer1Error = { + name: 'Error', + message: 'I am initializer1 error!', + }; const mockInitializer1: DataSystemInitializer = { run: jest .fn() .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _errorHander({ name: 'Error', message: 'I am initializer1 error!' }); + _statusCallback(DataSourceState.Closed, mockInitializer1Error); }, ), stop: jest.fn(), }; + const mockInitializer2Error = { + name: 'Error', + message: 'I am initializer2 error!', + }; const mockInitializer2: DataSystemInitializer = { run: jest .fn() .mockImplementation( ( _dataCallback: (basis: boolean, data: Data) => void, - _statusCallback: (status: HealthStatus, durationMS: number) => void, - _errorHander: (err: Error) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _errorHander({ name: 'Error', message: 'I am initializer2 error!' }); + _statusCallback(DataSourceState.Closed, mockInitializer2Error); }, ), stop: jest.fn(), @@ -178,31 +224,231 @@ it('it reports error when all initializers fail', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], [], // no synchronizers for this test + makeTestTransitionConditions(), + 0, + 0, ); const dataCallback = jest.fn(); - const errorCallback = jest.fn(); - underTest.run(dataCallback, errorCallback); + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((_: DataSourceState, err?: any) => { + if (err?.name === 'ExhaustedDataSources') { + resolve(); + } + }); - // pause so scheduler can resolve awaits - await new Promise((f) => { - setTimeout(f, 1); + underTest.run(dataCallback, statusCallback); }); expect(mockInitializer1.run).toHaveBeenCalledTimes(1); expect(mockInitializer2.run).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledTimes(0); - expect(errorCallback).toHaveBeenCalledTimes(3); - expect(errorCallback).toHaveBeenNthCalledWith(1, { - name: 'Error', - message: 'I am initializer1 error!', + expect(statusCallback).toHaveBeenCalledTimes(3); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Interrupted, null); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Interrupted, null); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: + 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', }); - expect(errorCallback).toHaveBeenNthCalledWith(2, { - name: 'Error', - message: 'I am initializer2 error!', +}); + +it('it can be stopped when in thrashing synchronizer fallback loop', async () => { + const mockInitializer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Closed, { name: 'Error', message: 'I am error...man!' }); // error that will lead to fallback + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself + makeTestTransitionConditions(), + 0, + 0, + ); + + const dataCallback = jest.fn(); + let statusCallback; + await new Promise((resolve) => { + let statusCount = 0; + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + statusCount += 1; + if (statusCount >= 5) { + resolve(); + } + }); + + underTest.run(dataCallback, statusCallback); }); - expect(errorCallback).toHaveBeenNthCalledWith(3, { + + expect(mockInitializer1.run).toHaveBeenCalled(); + expect(mockSynchronizer1.run).toHaveBeenCalled(); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + + // wait for trashing to occur + await new Promise((f) => { + setTimeout(f, 1); + }).then(() => underTest.stop()); + + // wait for stop to take effect before checking status is closed + await new Promise((f) => { + setTimeout(f, 1); + }); + + expect(statusCallback).toHaveBeenLastCalledWith(DataSourceState.Closed, null); +}); + +it('it can be stopped and restarted', async () => { + const mockInitializer1Data = { key: 'init1' }; + const mockInitializer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, mockInitializer1Data); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + makeTestTransitionConditions(), + 0, + 0, + ); + + let callback1; + await new Promise((resolve) => { + callback1 = jest.fn((_: boolean, data: Data) => { + if (data === mockSynchronizer1Data) { + underTest.stop(); + resolve(); + } + }); + // first run + underTest.run(callback1, jest.fn()); + }); + + // check first run triggered underlying data sources + expect(mockInitializer1.run).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.run).toHaveBeenCalledTimes(1); + expect(callback1).toHaveBeenCalledTimes(2); + + let callback2; + await new Promise((resolve) => { + callback2 = jest.fn((_: boolean, data: Data) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + // second run + underTest.run(callback2, jest.fn()); + }); + + // check that second run triggers underlying data sources again + expect(mockInitializer1.run).toHaveBeenCalledTimes(2); + expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); + expect(callback2).toHaveBeenCalledTimes(4); +}); + +it('it is well behaved with no initializers and no synchronizers configured', async () => { + const underTest = new CompositeDataSource([], [], makeTestTransitionConditions(), 0, 0); + + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + resolve(); + }); + + underTest.run(jest.fn(), statusCallback); + }); + + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: + 'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).', + }); +}); + +it('it is well behaved with an initializer and no synchronizers configured', async () => { + const mockInitializer1 = { + run: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: Data) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [], + makeTestTransitionConditions(), + 0, + 0, + ); + + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + resolve(); + }); + + underTest.run(jest.fn(), statusCallback); + }); + + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: 'CompositeDataSource has exhausted all configured datasources.', + message: + 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', }); }); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts index 29212222df..7ad446ddcd 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -1,93 +1,37 @@ -import { Data, HealthStatus } from './DataSource'; - -/** - * Represents a transition between data sources. - */ -export enum Transition { - /** - * Transition from current data source to the first synchronizer. - */ - SwitchToSync, - - /** - * Transition to the next data source of the same kind. - */ - Fallback, - - /** - * Transition to the first data source of the same kind. - */ - Recover, - - /** - * A no-op transition. - */ - None, -} - -/** - * Evaluated to determine if a transition should occur. - */ -export type TransitionCondition = (status: HealthStatus, durationMS: number) => boolean; +import { Data, DataSourceState } from './DataSource'; /** * Handler that connects the current {@link DataSource} to the {@link CompositeDataSource}. A single - * {@link CallbackHandler} should only be given to one {@link DataSource}. Once an instance of - * a {@link CallbackHandler} triggers a transition, it will disable itself so that future invocatons - * on it are no-op. + * {@link CallbackHandler} should only be given to one {@link DataSource}. Use {@link disable()} to + * prevent additional callback events. */ export class CallbackHandler { private _disabled: boolean = false; constructor( private readonly _dataCallback: (basis: boolean, data: Data) => void, - private readonly _errorCallback: (err: any) => void, - private readonly _triggerTransition: (value: Transition | PromiseLike) => void, - private readonly _isInitializer: boolean, - private readonly _recoveryCondition: (status: HealthStatus, durationMS: number) => boolean, - private readonly _fallbackCondition: (status: HealthStatus, durationMS: number) => boolean, + private readonly _statusCallback: (status: DataSourceState, err?: any) => void, ) {} + disable() { + this._disabled = true; + } + dataHanlder = async (basis: boolean, data: Data) => { if (this._disabled) { return; } + // TODO: SDK-1044 track selector for future synchronizer to use // report data up this._dataCallback(basis, data); - - // TODO: SDK-1044 track selector for future synchronizer to use - if (basis && this._isInitializer) { - this._disabled = true; // getting basis means this initializer has done its job, time to move on to sync! - this._triggerTransition(Transition.SwitchToSync); - } - }; - - statusHandler = async (status: HealthStatus, durationMS: number) => { - if (this._disabled) { - return; - } - - if (this._recoveryCondition(status, durationMS)) { - this._disabled = true; - this._triggerTransition(Transition.Recover); - } else if (this._fallbackCondition(status, durationMS)) { - this._disabled = true; - this._triggerTransition(Transition.Fallback); - } }; - errorHandler = async (err: any) => { - // TODO: unrecoverable error handling + statusHandler = async (status: DataSourceState, err?: any) => { if (this._disabled) { return; } - this._disabled = true; - - // TODO: should this error be reported or contained silently if we have a fallback? - // report error up, discuss with others on team. - this._errorCallback(err); - this._triggerTransition(Transition.Fallback); + this._statusCallback(status, err); }; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts index 132b3f7095..73e71d6c9e 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -1,7 +1,49 @@ -import { CallbackHandler, Transition, TransitionCondition } from './CallbackHandler'; -import { Data, DataSource, HealthStatus } from './DataSource'; -import { DataSystemInitializer, InitializerFactory } from './DataSystemInitializer'; -import { DataSystemSynchronizer, SynchronizerFactory } from './DataSystemSynchronizer'; +/* eslint-disable no-await-in-loop */ +import Backoff from '../../../datasource/Backoff'; +import { CallbackHandler } from './CallbackHandler'; +import { + Data, + DataSource, + DataSourceState, + InitializerFactory, + SynchronizerFactory, +} from './DataSource'; + +/** + * Represents a transition between data sources. + */ +export enum Transition { + /** + * A no-op transition. + */ + None, + /** + * Transition from current data source to the first synchronizer. + */ + SwitchToSync, + + /** + * Transition to the next data source of the same kind. + */ + Fallback, + + /** + * Transition to the first data source of the same kind. + */ + Recover, +} + +/** + * Given a {@link DataSourceState}, how long to wait before transitioning. + */ +export type TransitionConditions = { + [k in DataSourceState]?: { durationMS: number; transition: Transition }; +}; + +interface TransitionRequest { + transition: Transition; + err?: Error; +} /** * The {@link CompositeDataSource} can combine a number of {@link DataSystemInitializer}s and {@link DataSystemSynchronizer}s @@ -10,18 +52,16 @@ import { DataSystemSynchronizer, SynchronizerFactory } from './DataSystemSynchro export class CompositeDataSource implements DataSource { // TODO: SDK-856 async notification if initializer takes too long // TODO: SDK-1044 utilize selector from initializers - - // TODO: add datasource status APIs - private readonly _defaultFallbackTimeMs = 2 * 60 * 1000; private readonly _defaultRecoveryTimeMs = 5 * 60 * 1000; private _initPhaseActive: boolean = true; private _currentPosition: number = 0; + private _backoff: Backoff; private _stopped: boolean = true; - private _externalStopPromise: Promise; - private _externalStopResolve?: (value: Transition) => void; + private _externalStopPromise: Promise; + private _externalStopResolve?: (value: TransitionRequest) => void; /** * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. @@ -30,17 +70,21 @@ export class CompositeDataSource implements DataSource { constructor( private readonly _initializers: InitializerFactory[], private readonly _synchronizers: SynchronizerFactory[], + private readonly _transitionConditions: TransitionConditions, + initialRetryDelayMillis: number, + retryResetIntervalMillis: number, ) { - this._externalStopPromise = new Promise((transition) => { - this._externalStopResolve = transition; + this._externalStopPromise = new Promise((tr) => { + this._externalStopResolve = tr; }); this._initPhaseActive = true; this._currentPosition = 0; + this._backoff = new Backoff(initialRetryDelayMillis, retryResetIntervalMillis); } async run( dataCallback: (basis: boolean, data: Data) => void, - errorCallback: (err: Error) => void, + statusCallback: (status: DataSourceState, err?: any) => void, ): Promise { if (!this._stopped) { // don't allow multiple simultaneous runs @@ -48,42 +92,91 @@ export class CompositeDataSource implements DataSource { } this._stopped = false; - let transition: Transition = Transition.None; // first loop has no transition - while (!this._stopped) { - const current: DataSystemInitializer | DataSystemSynchronizer | undefined = - this._pickDataSource(transition); + let lastTransition: Transition = Transition.None; + // eslint-disable-next-line no-constant-condition + while (true) { + if (this._stopped) { + // report we are closed, no error as this was due to stop breaking the loop + statusCallback(DataSourceState.Closed, null); + break; + } + + const current: DataSource | undefined = this._pickDataSource(lastTransition); if (current === undefined) { - errorCallback({ + statusCallback(DataSourceState.Closed, { name: 'ExhaustedDataSources', - message: 'CompositeDataSource has exhausted all configured datasources.', + message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, }); - return; + break; } - const internalTransitionPromise = new Promise((transitionResolve) => { - const recoveryCondition = this._makeRecoveryCondition(); - const fallbackCondition = this._makeFallbackCondition(); + const internalTransitionPromise = new Promise((transitionResolve) => { + // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after + // secondary has been valid for N many minutes) + let lastState: DataSourceState | undefined; + let cancelScheduledTransition: (() => void) | undefined; + const callbackHandler = new CallbackHandler( - dataCallback, - errorCallback, - transitionResolve, - this._initPhaseActive, - recoveryCondition, - fallbackCondition, - ); - current.run( - callbackHandler.dataHanlder, - callbackHandler.statusHandler, - callbackHandler.errorHandler, + (basis: boolean, data: Data) => { + dataCallback(basis, data); + if (basis && this._initPhaseActive) { + // transition to sync if we get basis during init + callbackHandler.disable(); + cancelScheduledTransition?.(); + transitionResolve({ transition: Transition.SwitchToSync }); + } + }, + (state: DataSourceState, err?: any) => { + // When we get a status update, we want to fallback if it is an error. We also want to schedule a transition for some + // time in the future if this status remains for some duration (ex: Recover to primary synchronizer after the secondary + // synchronizer has been Valid for some time). These scheduled transitions are configurable in the constructor. + + if (err || state === DataSourceState.Closed) { + callbackHandler.disable(); + statusCallback(DataSourceState.Interrupted, null); // underlying errors or closed states are masked as interrupted while we transition + cancelScheduledTransition?.(); + transitionResolve({ transition: Transition.Fallback, err }); // unrecoverable error has occurred, so fallback + } else { + if (state !== lastState) { + lastState = state; + cancelScheduledTransition?.(); // cancel previously scheduled status transition if one was scheduled + const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it + const condition = this._lookupTransitionCondition(state, excludeRecovery); + if (condition) { + const { promise, cancel } = this._cancellableDelay(condition.durationMS); + cancelScheduledTransition = cancel; + promise.then(() => { + callbackHandler.disable(); + transitionResolve({ transition: condition.transition }); + }); + } else { + // this data source state does not have a transition condition, so don't schedule any transition + } + } + statusCallback(state, null); // report the status upward + } + }, ); + current.run(callbackHandler.dataHanlder, callbackHandler.statusHandler); }); // await transition triggered by internal data source or an external stop request - // eslint-disable-next-line no-await-in-loop - transition = await Promise.race([internalTransitionPromise, this._externalStopPromise]); + const transitionRequest = await Promise.race([ + internalTransitionPromise, + this._externalStopPromise, + ]); - // stop the current datasource before transitioning to next state + // if the transition was due to an error, throttle the transition + if (transitionRequest.err) { + const delay = this._backoff.fail(); + await new Promise((resolve) => { + setTimeout(resolve, delay); + }); + } + + // stop the underlying datasource before transitioning to next state current.stop(); + lastTransition = transitionRequest.transition; } // reset so that run can be called again in the future @@ -92,20 +185,19 @@ export class CompositeDataSource implements DataSource { async stop() { this._stopped = true; - this._externalStopResolve?.(Transition.None); // TODO: this feels a little hacky. + this._externalStopResolve?.({ transition: Transition.None }); // the value here doesn't matter, just needs to break the loop's await } private _reset() { this._initPhaseActive = true; this._currentPosition = 0; - this._externalStopPromise = new Promise((transition) => { - this._externalStopResolve = transition; + this._externalStopPromise = new Promise((tr) => { + this._externalStopResolve = tr; }); + // intentionally not resetting the backoff to avoid a code path that could circumvent throttling } - private _pickDataSource( - transition: Transition | undefined, - ): DataSystemInitializer | DataSystemSynchronizer | undefined { + private _pickDataSource(transition: Transition | undefined): DataSource | undefined { switch (transition) { case Transition.SwitchToSync: this._initPhaseActive = false; // one way toggle to false, unless this class is reset() @@ -124,16 +216,20 @@ export class CompositeDataSource implements DataSource { } if (this._initPhaseActive) { - // if outside range of initializers, don't loop back to start, instead return undefined + // We don't loop back through initializers, so if outside range of initializers, instead return undefined. if (this._currentPosition > this._initializers.length - 1) { return undefined; } return this._initializers[this._currentPosition].create(); } - // getting here indicates we are using a synchronizer - this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start + + // if no synchronizers, return undefined + if (this._synchronizers.length <= 0) { + return undefined; + } + this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start if necessary if (this._currentPosition > this._synchronizers.length - 1) { // this is only possible if no synchronizers were provided return undefined; @@ -141,13 +237,41 @@ export class CompositeDataSource implements DataSource { return this._synchronizers[this._currentPosition].create(); } - private _makeFallbackCondition(): TransitionCondition { - return (status: HealthStatus, durationMS: number) => - status === HealthStatus.Interrupted && durationMS >= this._defaultFallbackTimeMs; - } + /** + * @returns the transition condition for the provided data source state or undefined + * if there is no transition condition + */ + private _lookupTransitionCondition( + state: DataSourceState, + excludeRecover: boolean, + ): { durationMS: number; transition: Transition } | undefined { + const condition = this._transitionConditions[state]; - private _makeRecoveryCondition(): TransitionCondition { - return (status: HealthStatus, durationMS: number) => - status === HealthStatus.Online && durationMS >= this._defaultRecoveryTimeMs; + // exclude recovery can happen for certain initializers/synchronizers (ex: the primary synchronizer shouldn't recover to itself) + if (!condition || (excludeRecover && condition.transition === Transition.Recover)) { + return undefined; + } + + return condition; } + + private _cancellableDelay = (delayMS: number) => { + let timeout: ReturnType | undefined; + let reject: ((reason?: any) => void) | undefined; + const promise = new Promise((res, rej) => { + timeout = setTimeout(res, delayMS); + reject = rej; + }); + return { + promise, + cancel() { + if (timeout) { + clearTimeout(timeout); + reject?.(); + timeout = undefined; + reject = undefined; + } + }, + }; + }; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index 3e80f7324a..352d668f6b 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -1,8 +1,11 @@ export interface Data {} -export enum HealthStatus { - Online, +// TODO: refactor client-sdk to use this enum +export enum DataSourceState { + Initializing, + Valid, Interrupted, + Closed, } export interface DataSource { @@ -11,7 +14,10 @@ export interface DataSource { * @param cb may be invoked many times * @returns */ - run(dataCallback: (basis: boolean, data: Data) => void, errorHander: (err: Error) => void): void; + run( + dataCallback: (basis: boolean, data: Data) => void, + statusCallback: (status: DataSourceState, err?: any) => void, + ): void; /** * May be called any number of times, if already stopped, has no effect. @@ -21,15 +27,20 @@ export interface DataSource { stop(): void; } -export interface DataSourceWithStatus { - /** - * May be called any number of times, if already started, has no effect - * @param cb may be invoked many times - * @returns - */ - run( - dataCallback: (basis: boolean, data: Data) => void, - statusCallback: (status: HealthStatus, durationMS: number) => void, - errorHander: (err: any) => void, - ): void; +/** + * A data source that can be used to fetch the basis. + */ +export interface DataSystemInitializer extends DataSource {} + +/** + * A data source that can be used to fetch the basis or ongoing data changes. + */ +export interface DataSystemSynchronizer extends DataSource {} + +export interface InitializerFactory { + create(): DataSystemInitializer; +} + +export interface SynchronizerFactory { + create(): DataSystemSynchronizer; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts deleted file mode 100644 index 4d53b79bd5..0000000000 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemInitializer.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Data, HealthStatus } from './DataSource'; - -/** - * Will make best effort to retrieve all data. Data recieved will be reported via the {@link dataCallback}. Status changes - * will be reported via the {@link statusCallback}. Errors will be reported via the {@link errorCallback}. - */ -export interface DataSystemInitializer { - run( - dataCallback: (basis: boolean, data: Data) => void, - statusCallback: (status: HealthStatus, durationMS: number) => void, - errorCallback: (err: Error) => void, - ): void; - - /** - * May be called any number of times, if already stopped, has no effect. - * @param cb - * @returns - */ - stop(): void; -} - -export interface InitializerFactory { - create(): DataSystemInitializer; -} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts deleted file mode 100644 index 09babe13ea..0000000000 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSystemSynchronizer.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Data, HealthStatus } from './DataSource'; - -/** - * Will make best effort to retrieve data. Data recieved will be reported via the {@link dataCallback}. Status changes - * will be reported via the {@link statusCallback}. Errors will be reported via the {@link errorCallback}. - */ -export interface DataSystemSynchronizer { - run( - dataCallback: (basis: boolean, data: Data) => void, - statusCallback: (status: HealthStatus, durationMS: number) => void, - errorCallback: (err: Error) => void, - ): void; - - /** - * May be called any number of times, if already stopped, has no effect. - * @param cb - * @returns - */ - stop(): void; -} - -export interface SynchronizerFactory { - create(): DataSystemSynchronizer; -} diff --git a/packages/sdk/browser/src/platform/Backoff.ts b/packages/shared/common/src/datasource/Backoff.ts similarity index 100% rename from packages/sdk/browser/src/platform/Backoff.ts rename to packages/shared/common/src/datasource/Backoff.ts From 80582bc267b126fa6e6bd3f5fdabb7e48f46f224 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Mon, 24 Feb 2025 15:18:04 -0600 Subject: [PATCH 03/10] Refactoring backoff and transitioning handling logic to handle more edge cases. Added tests. --- .../src/platform/DefaultBrowserEventSource.ts | 10 +- .../__tests__/datasource}/Backoff.test.ts | 18 +- .../DataSystem/CompositeDataSource.test.ts | 95 ++++++---- .../DataSystem/CompositeDataSource.ts | 174 ++++++++++-------- .../src/api/subsystem/DataSystem/index.ts | 8 +- .../shared/common/src/datasource/Backoff.ts | 7 +- .../shared/common/src/datasource/index.ts | 3 + packages/shared/common/src/index.ts | 4 + 8 files changed, 189 insertions(+), 130 deletions(-) rename packages/{sdk/browser/__tests__/platform => shared/common/__tests__/datasource}/Backoff.test.ts (79%) diff --git a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts index 4d7af58fe4..742f1f1654 100644 --- a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts +++ b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts @@ -5,8 +5,7 @@ import { HttpErrorResponse, EventSource as LDEventSource, } from '@launchdarkly/js-client-sdk-common'; - -import Backoff from '../../../../shared/common/src/datasource/Backoff'; +import { DefaultBackoff } from '@launchdarkly/js-sdk-common'; /** * Implementation Notes: @@ -22,7 +21,7 @@ import Backoff from '../../../../shared/common/src/datasource/Backoff'; */ export default class DefaultBrowserEventSource implements LDEventSource { private _es?: EventSource; - private _backoff: Backoff; + private _backoff: DefaultBackoff; private _errorFilter: (err: HttpErrorResponse) => boolean; // The type of the handle can be platform specific and we treat is opaquely. @@ -34,7 +33,10 @@ export default class DefaultBrowserEventSource implements LDEventSource { private readonly _url: string, options: EventSourceInitDict, ) { - this._backoff = new Backoff(options.initialRetryDelayMillis, options.retryResetIntervalMillis); + this._backoff = new DefaultBackoff( + options.initialRetryDelayMillis, + options.retryResetIntervalMillis, + ); this._errorFilter = options.errorFilter; this._openConnection(); } diff --git a/packages/sdk/browser/__tests__/platform/Backoff.test.ts b/packages/shared/common/__tests__/datasource/Backoff.test.ts similarity index 79% rename from packages/sdk/browser/__tests__/platform/Backoff.test.ts rename to packages/shared/common/__tests__/datasource/Backoff.test.ts index 49cdd901cb..c4a0e25abf 100644 --- a/packages/sdk/browser/__tests__/platform/Backoff.test.ts +++ b/packages/shared/common/__tests__/datasource/Backoff.test.ts @@ -1,29 +1,29 @@ -import Backoff from '../../src/platform/Backoff'; +import DefaultBackoff from '../../src/datasource/Backoff'; const noJitter = (): number => 0; const maxJitter = (): number => 1; const defaultResetInterval = 60 * 1000; it.each([1, 1000, 5000])('has the correct starting delay', (initialDelay) => { - const backoff = new Backoff(initialDelay, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(initialDelay, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(initialDelay); }); it.each([1, 1000, 5000])('doubles delay on consecutive failures', (initialDelay) => { - const backoff = new Backoff(initialDelay, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(initialDelay, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(initialDelay); expect(backoff.fail()).toEqual(initialDelay * 2); expect(backoff.fail()).toEqual(initialDelay * 4); }); it('stops increasing delay when the max backoff is encountered', () => { - const backoff = new Backoff(5000, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(5000, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(5000); expect(backoff.fail()).toEqual(10000); expect(backoff.fail()).toEqual(20000); expect(backoff.fail()).toEqual(30000); - const backoff2 = new Backoff(1000, defaultResetInterval, noJitter); + const backoff2 = new DefaultBackoff(1000, defaultResetInterval, noJitter); expect(backoff2.fail()).toEqual(1000); expect(backoff2.fail()).toEqual(2000); expect(backoff2.fail()).toEqual(4000); @@ -33,12 +33,12 @@ it('stops increasing delay when the max backoff is encountered', () => { }); it('handles an initial retry delay longer than the maximum retry delay', () => { - const backoff = new Backoff(40000, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(40000, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(30000); }); it('jitters the backoff value', () => { - const backoff = new Backoff(1000, defaultResetInterval, maxJitter); + const backoff = new DefaultBackoff(1000, defaultResetInterval, maxJitter); expect(backoff.fail()).toEqual(500); expect(backoff.fail()).toEqual(1000); expect(backoff.fail()).toEqual(2000); @@ -51,7 +51,7 @@ it.each([10 * 1000, 60 * 1000])( 'resets the delay when the last successful connection was connected greater than the retry reset interval', (retryResetInterval) => { let time = 1000; - const backoff = new Backoff(1000, retryResetInterval, noJitter); + const backoff = new DefaultBackoff(1000, retryResetInterval, noJitter); expect(backoff.fail(time)).toEqual(1000); time += 1; backoff.success(time); @@ -69,7 +69,7 @@ it.each([10 * 1000, 60 * 1000])( it.each([10 * 1000, 60 * 1000])( 'does not reset the delay when the connection did not persist longer than the retry reset interval', (retryResetInterval) => { - const backoff = new Backoff(1000, retryResetInterval, noJitter); + const backoff = new DefaultBackoff(1000, retryResetInterval, noJitter); let time = 1000; expect(backoff.fail(time)).toEqual(1000); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 89097461ab..70d917879e 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -11,6 +11,7 @@ import { InitializerFactory, SynchronizerFactory, } from '../../../src/api/subsystem/DataSystem/DataSource'; +import DefaultBackoff, { Backoff } from '../../../src/datasource/Backoff'; function makeInitializerFactory(internal: DataSystemInitializer): InitializerFactory { return { @@ -27,24 +28,35 @@ function makeSynchronizerFactory(internal: DataSystemSynchronizer): Synchronizer function makeTestTransitionConditions(): TransitionConditions { return { [DataSourceState.Initializing]: { - durationMS: 10000, + durationMS: 0, transition: Transition.Fallback, }, [DataSourceState.Interrupted]: { - durationMS: 10000, + durationMS: 0, transition: Transition.Fallback, }, [DataSourceState.Closed]: { - durationMS: 10000, + durationMS: 0, transition: Transition.Fallback, }, [DataSourceState.Valid]: { - durationMS: 10000, + durationMS: 0, transition: Transition.Fallback, }, }; } +function makeZeroBackoff(): Backoff { + return { + success(_timeStampMs) { + return 0; + }, + fail(_timeStampMs) { + return 0; + }, + }; +} + it('initializer gets basis, switch to syncrhonizer', async () => { const mockInitializer1 = { run: jest @@ -79,8 +91,7 @@ it('initializer gets basis, switch to syncrhonizer', async () => { [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); let callback; @@ -160,8 +171,7 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); let callback; @@ -225,8 +235,7 @@ it('it reports error when all initializers fail', async () => { [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], [], // no synchronizers for this test makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); const dataCallback = jest.fn(); @@ -244,14 +253,22 @@ it('it reports error when all initializers fail', async () => { expect(mockInitializer1.run).toHaveBeenCalledTimes(1); expect(mockInitializer2.run).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledTimes(0); - expect(statusCallback).toHaveBeenCalledTimes(3); - expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Interrupted, null); - expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Interrupted, null); + expect(statusCallback).toHaveBeenNthCalledWith( + 1, + DataSourceState.Interrupted, + mockInitializer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith( + 2, + DataSourceState.Interrupted, + mockInitializer2Error, + ); expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Closed, { name: 'ExhaustedDataSources', message: 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', }); + expect(statusCallback).toHaveBeenCalledTimes(3); }); it('it can be stopped when in thrashing synchronizer fallback loop', async () => { @@ -269,6 +286,7 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => stop: jest.fn(), }; + const mockSynchronizer1Error = { name: 'Error', message: 'I am error...man!' }; const mockSynchronizer1 = { run: jest .fn() @@ -277,7 +295,7 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => _dataCallback: (basis: boolean, data: Data) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { - _statusCallback(DataSourceState.Closed, { name: 'Error', message: 'I am error...man!' }); // error that will lead to fallback + _statusCallback(DataSourceState.Closed, mockSynchronizer1Error); // error that will lead to fallback }, ), stop: jest.fn(), @@ -287,18 +305,15 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); const dataCallback = jest.fn(); let statusCallback; await new Promise((resolve) => { - let statusCount = 0; - statusCallback = jest.fn((_1: DataSourceState, _2: any) => { - statusCount += 1; - if (statusCount >= 5) { - resolve(); + statusCallback = jest.fn((state: DataSourceState, _: any) => { + if (state === DataSourceState.Interrupted) { + resolve(); // waiting interruption due to sync error } }); @@ -308,18 +323,21 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => expect(mockInitializer1.run).toHaveBeenCalled(); expect(mockSynchronizer1.run).toHaveBeenCalled(); expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - - // wait for trashing to occur - await new Promise((f) => { - setTimeout(f, 1); - }).then(() => underTest.stop()); + console.log(`About to call stop on composite source.`); + underTest.stop(); + console.log(`Got past stop`); // wait for stop to take effect before checking status is closed await new Promise((f) => { - setTimeout(f, 1); + setTimeout(f, 100); }); - expect(statusCallback).toHaveBeenLastCalledWith(DataSourceState.Closed, null); + expect(statusCallback).toHaveBeenNthCalledWith( + 1, + DataSourceState.Interrupted, + mockSynchronizer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, undefined); }); it('it can be stopped and restarted', async () => { @@ -357,14 +375,14 @@ it('it can be stopped and restarted', async () => { [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); let callback1; await new Promise((resolve) => { callback1 = jest.fn((_: boolean, data: Data) => { if (data === mockSynchronizer1Data) { + console.log(`About to call stop`); underTest.stop(); resolve(); } @@ -378,6 +396,11 @@ it('it can be stopped and restarted', async () => { expect(mockSynchronizer1.run).toHaveBeenCalledTimes(1); expect(callback1).toHaveBeenCalledTimes(2); + // wait a moment for pending awaits to resolve the stop request + await new Promise((f) => { + setTimeout(f, 1); + }); + let callback2; await new Promise((resolve) => { callback2 = jest.fn((_: boolean, data: Data) => { @@ -392,11 +415,16 @@ it('it can be stopped and restarted', async () => { // check that second run triggers underlying data sources again expect(mockInitializer1.run).toHaveBeenCalledTimes(2); expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); - expect(callback2).toHaveBeenCalledTimes(4); + expect(callback2).toHaveBeenCalledTimes(2); }); it('it is well behaved with no initializers and no synchronizers configured', async () => { - const underTest = new CompositeDataSource([], [], makeTestTransitionConditions(), 0, 0); + const underTest = new CompositeDataSource( + [], + [], + makeTestTransitionConditions(), + makeZeroBackoff(), + ); let statusCallback; await new Promise((resolve) => { @@ -433,8 +461,7 @@ it('it is well behaved with an initializer and no synchronizers configured', asy [makeInitializerFactory(mockInitializer1)], [], makeTestTransitionConditions(), - 0, - 0, + makeZeroBackoff(), ); let statusCallback; diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts index 73e71d6c9e..50da6160ba 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -1,5 +1,5 @@ /* eslint-disable no-await-in-loop */ -import Backoff from '../../../datasource/Backoff'; +import { Backoff } from '../../../datasource/Backoff'; import { CallbackHandler } from './CallbackHandler'; import { Data, @@ -31,6 +31,11 @@ export enum Transition { * Transition to the first data source of the same kind. */ Recover, + + /** + * Transition to idle and reset + */ + Stop, } /** @@ -57,11 +62,10 @@ export class CompositeDataSource implements DataSource { private _initPhaseActive: boolean = true; private _currentPosition: number = 0; - private _backoff: Backoff; private _stopped: boolean = true; - private _externalStopPromise: Promise; - private _externalStopResolve?: (value: TransitionRequest) => void; + private _externalTransitionPromise: Promise; + private _externalTransitionResolve?: (value: TransitionRequest) => void; /** * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. @@ -71,15 +75,13 @@ export class CompositeDataSource implements DataSource { private readonly _initializers: InitializerFactory[], private readonly _synchronizers: SynchronizerFactory[], private readonly _transitionConditions: TransitionConditions, - initialRetryDelayMillis: number, - retryResetIntervalMillis: number, + private readonly _backoff: Backoff, ) { - this._externalStopPromise = new Promise((tr) => { - this._externalStopResolve = tr; + this._externalTransitionPromise = new Promise((tr) => { + this._externalTransitionResolve = tr; }); this._initPhaseActive = true; this._currentPosition = 0; - this._backoff = new Backoff(initialRetryDelayMillis, retryResetIntervalMillis); } async run( @@ -95,87 +97,99 @@ export class CompositeDataSource implements DataSource { let lastTransition: Transition = Transition.None; // eslint-disable-next-line no-constant-condition while (true) { - if (this._stopped) { - // report we are closed, no error as this was due to stop breaking the loop - statusCallback(DataSourceState.Closed, null); - break; - } - - const current: DataSource | undefined = this._pickDataSource(lastTransition); - if (current === undefined) { - statusCallback(DataSourceState.Closed, { - name: 'ExhaustedDataSources', - message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, - }); - break; - } - + const currentDS: DataSource | undefined = this._pickDataSource(lastTransition); const internalTransitionPromise = new Promise((transitionResolve) => { - // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after - // secondary has been valid for N many minutes) - let lastState: DataSourceState | undefined; - let cancelScheduledTransition: (() => void) | undefined; + if (currentDS) { + // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after + // secondary has been valid for N many seconds) + let lastState: DataSourceState | undefined; + let cancelScheduledTransition: (() => void) | undefined; - const callbackHandler = new CallbackHandler( - (basis: boolean, data: Data) => { - dataCallback(basis, data); - if (basis && this._initPhaseActive) { - // transition to sync if we get basis during init - callbackHandler.disable(); - cancelScheduledTransition?.(); - transitionResolve({ transition: Transition.SwitchToSync }); - } - }, - (state: DataSourceState, err?: any) => { - // When we get a status update, we want to fallback if it is an error. We also want to schedule a transition for some - // time in the future if this status remains for some duration (ex: Recover to primary synchronizer after the secondary - // synchronizer has been Valid for some time). These scheduled transitions are configurable in the constructor. + // this callback handler can be disabled and ensures only one transition request occurs + const callbackHandler = new CallbackHandler( + (basis: boolean, data: Data) => { + this._backoff.success(Date.now()); + dataCallback(basis, data); + if (basis && this._initPhaseActive) { + // transition to sync if we get basis during init + callbackHandler.disable(); + cancelScheduledTransition?.(); + transitionResolve({ transition: Transition.SwitchToSync }); + } + }, + (state: DataSourceState, err?: any) => { + // When we get a status update, we want to fallback if it is an error. We also want to schedule a transition for some + // time in the future if this status remains for some duration (ex: Recover to primary synchronizer after the secondary + // synchronizer has been Valid for some time). These scheduled transitions are configurable in the constructor. - if (err || state === DataSourceState.Closed) { - callbackHandler.disable(); - statusCallback(DataSourceState.Interrupted, null); // underlying errors or closed states are masked as interrupted while we transition - cancelScheduledTransition?.(); - transitionResolve({ transition: Transition.Fallback, err }); // unrecoverable error has occurred, so fallback - } else { - if (state !== lastState) { - lastState = state; - cancelScheduledTransition?.(); // cancel previously scheduled status transition if one was scheduled - const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it - const condition = this._lookupTransitionCondition(state, excludeRecovery); - if (condition) { - const { promise, cancel } = this._cancellableDelay(condition.durationMS); - cancelScheduledTransition = cancel; - promise.then(() => { - callbackHandler.disable(); - transitionResolve({ transition: condition.transition }); - }); - } else { - // this data source state does not have a transition condition, so don't schedule any transition + if (err || state === DataSourceState.Closed) { + callbackHandler.disable(); + statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition + cancelScheduledTransition?.(); + transitionResolve({ transition: Transition.Fallback, err }); // unrecoverable error has occurred, so fallback + } else { + statusCallback(state, null); // report the status upward + if (state !== lastState) { + lastState = state; + cancelScheduledTransition?.(); // cancel previously scheduled status transition if one was scheduled + const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it + const condition = this._lookupTransitionCondition(state, excludeRecovery); + if (condition) { + const { promise, cancel } = this._cancellableDelay(condition.durationMS); + cancelScheduledTransition = cancel; + promise.then(() => { + callbackHandler.disable(); + transitionResolve({ transition: condition.transition }); + }); + } else { + // this data source state does not have a transition condition, so don't schedule any transition + } } } - statusCallback(state, null); // report the status upward - } - }, - ); - current.run(callbackHandler.dataHanlder, callbackHandler.statusHandler); + }, + ); + currentDS.run(callbackHandler.dataHanlder, callbackHandler.statusHandler); + } else { + // we don't have a data source to use! + transitionResolve({ + transition: Transition.Stop, + err: { + name: 'ExhaustedDataSources', + message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + }, + }); + } }); // await transition triggered by internal data source or an external stop request - const transitionRequest = await Promise.race([ + let transitionRequest = await Promise.race([ internalTransitionPromise, - this._externalStopPromise, + this._externalTransitionPromise, ]); - // if the transition was due to an error, throttle the transition - if (transitionRequest.err) { - const delay = this._backoff.fail(); - await new Promise((resolve) => { + // stop the underlying datasource before transitioning to next state + currentDS?.stop(); + + if (transitionRequest.err && transitionRequest.transition !== Transition.Stop) { + // if the transition was due to an error, throttle the transition + const delay = this._backoff.fail(Date.now()); + const delayedTransition = new Promise((resolve) => { setTimeout(resolve, delay); - }); + }).then(() => transitionRequest); + + // race the delayed transition and external transition requests to be responsive + transitionRequest = await Promise.race([ + delayedTransition, + this._externalTransitionPromise, + ]); + } + + if (transitionRequest.transition === Transition.Stop) { + // exit the loop + statusCallback(DataSourceState.Closed, transitionRequest.err); + break; } - // stop the underlying datasource before transitioning to next state - current.stop(); lastTransition = transitionRequest.transition; } @@ -184,15 +198,15 @@ export class CompositeDataSource implements DataSource { } async stop() { - this._stopped = true; - this._externalStopResolve?.({ transition: Transition.None }); // the value here doesn't matter, just needs to break the loop's await + this._externalTransitionResolve?.({ transition: Transition.Stop }); } private _reset() { + this._stopped = true; this._initPhaseActive = true; this._currentPosition = 0; - this._externalStopPromise = new Promise((tr) => { - this._externalStopResolve = tr; + this._externalTransitionPromise = new Promise((tr) => { + this._externalTransitionResolve = tr; }); // intentionally not resetting the backoff to avoid a code path that could circumvent throttling } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts index 8cedfee2a4..d49e3438fd 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/index.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -1,3 +1,7 @@ -export { DataSystemInitializer, InitializerFactory } from './DataSystemInitializer'; -export { DataSystemSynchronizer, SynchronizerFactory } from './DataSystemSynchronizer'; +export { + DataSystemInitializer, + DataSystemSynchronizer, + InitializerFactory, + SynchronizerFactory, +} from './DataSource'; export { CompositeDataSource } from './CompositeDataSource'; diff --git a/packages/shared/common/src/datasource/Backoff.ts b/packages/shared/common/src/datasource/Backoff.ts index ce0e931ee5..6a0a99c052 100644 --- a/packages/shared/common/src/datasource/Backoff.ts +++ b/packages/shared/common/src/datasource/Backoff.ts @@ -1,6 +1,11 @@ const MAX_RETRY_DELAY = 30 * 1000; // Maximum retry delay 30 seconds. const JITTER_RATIO = 0.5; // Delay should be 50%-100% of calculated time. +export interface Backoff { + success(timeStampMs: number): void; + fail(timeStampMs: number): number; +} + /** * Implements exponential backoff and jitter. This class tracks successful connections and failures * and produces a retry delay. @@ -11,7 +16,7 @@ const JITTER_RATIO = 0.5; // Delay should be 50%-100% of calculated time. * initialRetryDelayMillis and capping at MAX_RETRY_DELAY. If RESET_INTERVAL has elapsed after a * success, without an intervening faulure, then the backoff is reset to initialRetryDelayMillis. */ -export default class Backoff { +export class DefaultBackoff { private _retryCount: number = 0; private _activeSince?: number; private _initialRetryDelayMillis: number; diff --git a/packages/shared/common/src/datasource/index.ts b/packages/shared/common/src/datasource/index.ts index fe4b250c5e..e727947dce 100644 --- a/packages/shared/common/src/datasource/index.ts +++ b/packages/shared/common/src/datasource/index.ts @@ -1,3 +1,4 @@ +import { Backoff, DefaultBackoff } from './Backoff'; import { DataSourceErrorKind } from './DataSourceErrorKinds'; import { LDFileDataSourceError, @@ -7,6 +8,8 @@ import { } from './errors'; export { + Backoff, + DefaultBackoff, DataSourceErrorKind, LDFileDataSourceError, LDPollingError, diff --git a/packages/shared/common/src/index.ts b/packages/shared/common/src/index.ts index ae3f9daf08..7df308f674 100644 --- a/packages/shared/common/src/index.ts +++ b/packages/shared/common/src/index.ts @@ -2,7 +2,9 @@ import AttributeReference from './AttributeReference'; import Context from './Context'; import ContextFilter from './ContextFilter'; import { + Backoff, DataSourceErrorKind, + DefaultBackoff, LDFileDataSourceError, LDPollingError, LDStreamingError, @@ -23,6 +25,8 @@ export { Context, ContextFilter, DataSourceErrorKind, + Backoff, + DefaultBackoff, LDPollingError, LDStreamingError, StreamingErrorHandler, From eeaa3d10cc4267e2d799ecc02014b6c552facb4f Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Mon, 24 Feb 2025 15:29:49 -0600 Subject: [PATCH 04/10] removing accidental source include --- packages/sdk/browser/tsconfig.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sdk/browser/tsconfig.json b/packages/sdk/browser/tsconfig.json index 82f7f65361..7306c5b0c6 100644 --- a/packages/sdk/browser/tsconfig.json +++ b/packages/sdk/browser/tsconfig.json @@ -18,7 +18,7 @@ "types": ["node", "jest"], "allowJs": true }, - "include": ["src", "../../shared/common/src/datasource/Backoff.ts"], + "include": ["src"], "exclude": [ "vite.config.ts", "__tests__", From 6d0a08be685fd42e1543dfed10275a822fe72053 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Tue, 4 Mar 2025 15:19:10 -0600 Subject: [PATCH 05/10] Fixing review comments. --- .../src/platform/DefaultBrowserEventSource.ts | 2 +- .../__tests__/datasource/Backoff.test.ts | 2 +- .../DataSystem/CompositeDataSource.test.ts | 17 ++--- .../subsystem/DataSystem/CallbackHandler.ts | 8 +-- .../DataSystem/CompositeDataSource.ts | 71 +++++++------------ .../api/subsystem/DataSystem/DataSource.ts | 7 +- .../shared/common/src/datasource/Backoff.ts | 4 +- 7 files changed, 45 insertions(+), 66 deletions(-) diff --git a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts index 742f1f1654..d753931aa8 100644 --- a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts +++ b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts @@ -1,11 +1,11 @@ import { + DefaultBackoff, EventListener, EventName, EventSourceInitDict, HttpErrorResponse, EventSource as LDEventSource, } from '@launchdarkly/js-client-sdk-common'; -import { DefaultBackoff } from '@launchdarkly/js-sdk-common'; /** * Implementation Notes: diff --git a/packages/shared/common/__tests__/datasource/Backoff.test.ts b/packages/shared/common/__tests__/datasource/Backoff.test.ts index c4a0e25abf..77b09ce0fa 100644 --- a/packages/shared/common/__tests__/datasource/Backoff.test.ts +++ b/packages/shared/common/__tests__/datasource/Backoff.test.ts @@ -1,4 +1,4 @@ -import DefaultBackoff from '../../src/datasource/Backoff'; +import { DefaultBackoff } from '../../src/datasource/Backoff'; const noJitter = (): number => 0; const maxJitter = (): number => 1; diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 70d917879e..4f9ad66fe1 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -11,7 +11,7 @@ import { InitializerFactory, SynchronizerFactory, } from '../../../src/api/subsystem/DataSystem/DataSource'; -import DefaultBackoff, { Backoff } from '../../../src/datasource/Backoff'; +import { Backoff } from '../../../src/datasource/Backoff'; function makeInitializerFactory(internal: DataSystemInitializer): InitializerFactory { return { @@ -29,29 +29,29 @@ function makeTestTransitionConditions(): TransitionConditions { return { [DataSourceState.Initializing]: { durationMS: 0, - transition: Transition.Fallback, + transition: 'fallback', }, [DataSourceState.Interrupted]: { durationMS: 0, - transition: Transition.Fallback, + transition: 'fallback', }, [DataSourceState.Closed]: { durationMS: 0, - transition: Transition.Fallback, + transition: 'fallback', }, [DataSourceState.Valid]: { durationMS: 0, - transition: Transition.Fallback, + transition: 'fallback', }, }; } function makeZeroBackoff(): Backoff { return { - success(_timeStampMs) { + success() { return 0; }, - fail(_timeStampMs) { + fail() { return 0; }, }; @@ -323,9 +323,7 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => expect(mockInitializer1.run).toHaveBeenCalled(); expect(mockSynchronizer1.run).toHaveBeenCalled(); expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); - console.log(`About to call stop on composite source.`); underTest.stop(); - console.log(`Got past stop`); // wait for stop to take effect before checking status is closed await new Promise((f) => { @@ -382,7 +380,6 @@ it('it can be stopped and restarted', async () => { await new Promise((resolve) => { callback1 = jest.fn((_: boolean, data: Data) => { if (data === mockSynchronizer1Data) { - console.log(`About to call stop`); underTest.stop(); resolve(); } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts index 7ad446ddcd..cbd7f94337 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -17,7 +17,7 @@ export class CallbackHandler { this._disabled = true; } - dataHanlder = async (basis: boolean, data: Data) => { + async dataHanlder(basis: boolean, data: Data) { if (this._disabled) { return; } @@ -25,13 +25,13 @@ export class CallbackHandler { // TODO: SDK-1044 track selector for future synchronizer to use // report data up this._dataCallback(basis, data); - }; + } - statusHandler = async (status: DataSourceState, err?: any) => { + async statusHandler(status: DataSourceState, err?: any) { if (this._disabled) { return; } this._statusCallback(status, err); - }; + } } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts index 50da6160ba..3e2c0f7b27 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -9,34 +9,16 @@ import { SynchronizerFactory, } from './DataSource'; +// TODO: SDK-858, specify these constants when CompositeDataSource is used. +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; +// eslint-disable-next-line @typescript-eslint/no-unused-vars +const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; + /** * Represents a transition between data sources. */ -export enum Transition { - /** - * A no-op transition. - */ - None, - /** - * Transition from current data source to the first synchronizer. - */ - SwitchToSync, - - /** - * Transition to the next data source of the same kind. - */ - Fallback, - - /** - * Transition to the first data source of the same kind. - */ - Recover, - - /** - * Transition to idle and reset - */ - Stop, -} +export type Transition = 'none' | 'switchToSync' | 'fallback' | 'recover' | 'stop'; /** * Given a {@link DataSourceState}, how long to wait before transitioning. @@ -57,8 +39,6 @@ interface TransitionRequest { export class CompositeDataSource implements DataSource { // TODO: SDK-856 async notification if initializer takes too long // TODO: SDK-1044 utilize selector from initializers - private readonly _defaultFallbackTimeMs = 2 * 60 * 1000; - private readonly _defaultRecoveryTimeMs = 5 * 60 * 1000; private _initPhaseActive: boolean = true; private _currentPosition: number = 0; @@ -77,8 +57,8 @@ export class CompositeDataSource implements DataSource { private readonly _transitionConditions: TransitionConditions, private readonly _backoff: Backoff, ) { - this._externalTransitionPromise = new Promise((tr) => { - this._externalTransitionResolve = tr; + this._externalTransitionPromise = new Promise((resolveTransition) => { + this._externalTransitionResolve = resolveTransition; }); this._initPhaseActive = true; this._currentPosition = 0; @@ -94,7 +74,7 @@ export class CompositeDataSource implements DataSource { } this._stopped = false; - let lastTransition: Transition = Transition.None; + let lastTransition: Transition = 'none'; // eslint-disable-next-line no-constant-condition while (true) { const currentDS: DataSource | undefined = this._pickDataSource(lastTransition); @@ -108,13 +88,13 @@ export class CompositeDataSource implements DataSource { // this callback handler can be disabled and ensures only one transition request occurs const callbackHandler = new CallbackHandler( (basis: boolean, data: Data) => { - this._backoff.success(Date.now()); + this._backoff.success(); dataCallback(basis, data); if (basis && this._initPhaseActive) { // transition to sync if we get basis during init callbackHandler.disable(); cancelScheduledTransition?.(); - transitionResolve({ transition: Transition.SwitchToSync }); + transitionResolve({ transition: 'switchToSync' }); } }, (state: DataSourceState, err?: any) => { @@ -126,7 +106,7 @@ export class CompositeDataSource implements DataSource { callbackHandler.disable(); statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition cancelScheduledTransition?.(); - transitionResolve({ transition: Transition.Fallback, err }); // unrecoverable error has occurred, so fallback + transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback } else { statusCallback(state, null); // report the status upward if (state !== lastState) { @@ -148,11 +128,14 @@ export class CompositeDataSource implements DataSource { } }, ); - currentDS.run(callbackHandler.dataHanlder, callbackHandler.statusHandler); + currentDS.run( + (basis, data) => callbackHandler.dataHanlder(basis, data), + (status, err) => callbackHandler.statusHandler(status, err), + ); } else { // we don't have a data source to use! transitionResolve({ - transition: Transition.Stop, + transition: 'stop', err: { name: 'ExhaustedDataSources', message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, @@ -170,9 +153,9 @@ export class CompositeDataSource implements DataSource { // stop the underlying datasource before transitioning to next state currentDS?.stop(); - if (transitionRequest.err && transitionRequest.transition !== Transition.Stop) { + if (transitionRequest.err && transitionRequest.transition !== 'stop') { // if the transition was due to an error, throttle the transition - const delay = this._backoff.fail(Date.now()); + const delay = this._backoff.fail(); const delayedTransition = new Promise((resolve) => { setTimeout(resolve, delay); }).then(() => transitionRequest); @@ -184,7 +167,7 @@ export class CompositeDataSource implements DataSource { ]); } - if (transitionRequest.transition === Transition.Stop) { + if (transitionRequest.transition === 'stop') { // exit the loop statusCallback(DataSourceState.Closed, transitionRequest.err); break; @@ -198,7 +181,7 @@ export class CompositeDataSource implements DataSource { } async stop() { - this._externalTransitionResolve?.({ transition: Transition.Stop }); + this._externalTransitionResolve?.({ transition: 'stop' }); } private _reset() { @@ -213,17 +196,17 @@ export class CompositeDataSource implements DataSource { private _pickDataSource(transition: Transition | undefined): DataSource | undefined { switch (transition) { - case Transition.SwitchToSync: + case 'switchToSync': this._initPhaseActive = false; // one way toggle to false, unless this class is reset() this._currentPosition = 0; break; - case Transition.Fallback: + case 'fallback': this._currentPosition += 1; break; - case Transition.Recover: + case 'recover': this._currentPosition = 0; break; - case Transition.None: + case 'none': default: // don't do anything in this case break; @@ -262,7 +245,7 @@ export class CompositeDataSource implements DataSource { const condition = this._transitionConditions[state]; // exclude recovery can happen for certain initializers/synchronizers (ex: the primary synchronizer shouldn't recover to itself) - if (!condition || (excludeRecover && condition.transition === Transition.Recover)) { + if (!condition || (excludeRecover && condition.transition === 'recover')) { return undefined; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index 352d668f6b..80885a401e 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -11,8 +11,9 @@ export enum DataSourceState { export interface DataSource { /** * May be called any number of times, if already started, has no effect - * @param cb may be invoked many times - * @returns + * @param dataCallback that will be called when data arrives, may be called multiple times. + * @param statusCallback that will be called when data source state changes or an unrecoverable error + * has been encountered. */ run( dataCallback: (basis: boolean, data: Data) => void, @@ -21,8 +22,6 @@ export interface DataSource { /** * May be called any number of times, if already stopped, has no effect. - * @param cb - * @returns */ stop(): void; } diff --git a/packages/shared/common/src/datasource/Backoff.ts b/packages/shared/common/src/datasource/Backoff.ts index 6a0a99c052..8b2a7f8be9 100644 --- a/packages/shared/common/src/datasource/Backoff.ts +++ b/packages/shared/common/src/datasource/Backoff.ts @@ -2,8 +2,8 @@ const MAX_RETRY_DELAY = 30 * 1000; // Maximum retry delay 30 seconds. const JITTER_RATIO = 0.5; // Delay should be 50%-100% of calculated time. export interface Backoff { - success(timeStampMs: number): void; - fail(timeStampMs: number): number; + success(): void; + fail(): number; } /** From ea9845da3948fea042cfd786f648f2efe4954ef8 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 14 Mar 2025 17:09:36 -0500 Subject: [PATCH 06/10] Addressing review comments --- .../DataSystem/CompositeDataSource.test.ts | 15 +++++++------ .../subsystem/DataSystem/CallbackHandler.ts | 2 +- .../DataSystem/CompositeDataSource.ts | 21 +++++++++++-------- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 4f9ad66fe1..3415ece2fe 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1,6 +1,5 @@ import { CompositeDataSource, - Transition, TransitionConditions, } from '../../../src/api/subsystem/DataSystem/CompositeDataSource'; import { @@ -57,7 +56,7 @@ function makeZeroBackoff(): Backoff { }; } -it('initializer gets basis, switch to syncrhonizer', async () => { +it('handles initializer getting basis, switching to syncrhonizer', async () => { const mockInitializer1 = { run: jest .fn() @@ -112,7 +111,7 @@ it('initializer gets basis, switch to syncrhonizer', async () => { expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); }); -it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2, recover to synchronizer 1', async () => { +it('handles initializer getting basis, switches to synchronizer 1, falls back to synchronizer 2, recovers to synchronizer 1', async () => { const mockInitializer1: DataSystemInitializer = { run: jest .fn() @@ -194,7 +193,7 @@ it('initializer gets basis, switch to synchronizer 1, fallback to synchronizer 2 expect(callback).toHaveBeenNthCalledWith(3, false, { key: 'sync1' }); // sync2 recovers back to sync1 }); -it('it reports error when all initializers fail', async () => { +it('reports error when all initializers fail', async () => { const mockInitializer1Error = { name: 'Error', message: 'I am initializer1 error!', @@ -271,7 +270,7 @@ it('it reports error when all initializers fail', async () => { expect(statusCallback).toHaveBeenCalledTimes(3); }); -it('it can be stopped when in thrashing synchronizer fallback loop', async () => { +it('can be stopped when in thrashing synchronizer fallback loop', async () => { const mockInitializer1 = { run: jest .fn() @@ -338,7 +337,7 @@ it('it can be stopped when in thrashing synchronizer fallback loop', async () => expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, undefined); }); -it('it can be stopped and restarted', async () => { +it('can be stopped and restarted', async () => { const mockInitializer1Data = { key: 'init1' }; const mockInitializer1 = { run: jest @@ -415,7 +414,7 @@ it('it can be stopped and restarted', async () => { expect(callback2).toHaveBeenCalledTimes(2); }); -it('it is well behaved with no initializers and no synchronizers configured', async () => { +it('is well behaved with no initializers and no synchronizers configured', async () => { const underTest = new CompositeDataSource( [], [], @@ -439,7 +438,7 @@ it('it is well behaved with no initializers and no synchronizers configured', as }); }); -it('it is well behaved with an initializer and no synchronizers configured', async () => { +it('is well behaved with an initializer and no synchronizers configured', async () => { const mockInitializer1 = { run: jest .fn() diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts index cbd7f94337..12ec52ce79 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -17,7 +17,7 @@ export class CallbackHandler { this._disabled = true; } - async dataHanlder(basis: boolean, data: Data) { + async dataHandler(basis: boolean, data: Data) { if (this._disabled) { return; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts index 3e2c0f7b27..4b0d19a5b5 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -1,5 +1,6 @@ /* eslint-disable no-await-in-loop */ import { Backoff } from '../../../datasource/Backoff'; +import { LDLogger } from '../../logging'; import { CallbackHandler } from './CallbackHandler'; import { Data, @@ -46,6 +47,7 @@ export class CompositeDataSource implements DataSource { private _stopped: boolean = true; private _externalTransitionPromise: Promise; private _externalTransitionResolve?: (value: TransitionRequest) => void; + private _cancelTokens: (() => void)[] = []; /** * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. @@ -56,6 +58,7 @@ export class CompositeDataSource implements DataSource { private readonly _synchronizers: SynchronizerFactory[], private readonly _transitionConditions: TransitionConditions, private readonly _backoff: Backoff, + private readonly _logger?: LDLogger, ) { this._externalTransitionPromise = new Promise((resolveTransition) => { this._externalTransitionResolve = resolveTransition; @@ -70,6 +73,7 @@ export class CompositeDataSource implements DataSource { ): Promise { if (!this._stopped) { // don't allow multiple simultaneous runs + this._logger?.info('CompositeDataSource already running. Ignoring call to start.'); return; } this._stopped = false; @@ -116,6 +120,7 @@ export class CompositeDataSource implements DataSource { const condition = this._lookupTransitionCondition(state, excludeRecovery); if (condition) { const { promise, cancel } = this._cancellableDelay(condition.durationMS); + this._cancelTokens.push(cancel); cancelScheduledTransition = cancel; promise.then(() => { callbackHandler.disable(); @@ -129,7 +134,7 @@ export class CompositeDataSource implements DataSource { }, ); currentDS.run( - (basis, data) => callbackHandler.dataHanlder(basis, data), + (basis, data) => callbackHandler.dataHandler(basis, data), (status, err) => callbackHandler.statusHandler(status, err), ); } else { @@ -156,9 +161,9 @@ export class CompositeDataSource implements DataSource { if (transitionRequest.err && transitionRequest.transition !== 'stop') { // if the transition was due to an error, throttle the transition const delay = this._backoff.fail(); - const delayedTransition = new Promise((resolve) => { - setTimeout(resolve, delay); - }).then(() => transitionRequest); + const { promise, cancel } = this._cancellableDelay(delay); + this._cancelTokens.push(cancel); + const delayedTransition = promise.then(() => transitionRequest); // race the delayed transition and external transition requests to be responsive transitionRequest = await Promise.race([ @@ -170,6 +175,7 @@ export class CompositeDataSource implements DataSource { if (transitionRequest.transition === 'stop') { // exit the loop statusCallback(DataSourceState.Closed, transitionRequest.err); + lastTransition = transitionRequest.transition; break; } @@ -181,6 +187,7 @@ export class CompositeDataSource implements DataSource { } async stop() { + this._cancelTokens.forEach((cancel) => cancel()); this._externalTransitionResolve?.({ transition: 'stop' }); } @@ -254,19 +261,15 @@ export class CompositeDataSource implements DataSource { private _cancellableDelay = (delayMS: number) => { let timeout: ReturnType | undefined; - let reject: ((reason?: any) => void) | undefined; - const promise = new Promise((res, rej) => { + const promise = new Promise((res, _) => { timeout = setTimeout(res, delayMS); - reject = rej; }); return { promise, cancel() { if (timeout) { clearTimeout(timeout); - reject?.(); timeout = undefined; - reject = undefined; } }, }; From 33c873e2b0b46f073bf8ed741d06b94bbfdd836f Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 14 Mar 2025 17:33:23 -0500 Subject: [PATCH 07/10] Fixing cancellation token issue --- .../DataSystem/CompositeDataSource.ts | 26 ++++++++++++++----- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts index 4b0d19a5b5..d77c3916a3 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts @@ -87,7 +87,7 @@ export class CompositeDataSource implements DataSource { // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after // secondary has been valid for N many seconds) let lastState: DataSourceState | undefined; - let cancelScheduledTransition: (() => void) | undefined; + let cancelScheduledTransition: () => void = () => {}; // this callback handler can be disabled and ensures only one transition request occurs const callbackHandler = new CallbackHandler( @@ -97,7 +97,7 @@ export class CompositeDataSource implements DataSource { if (basis && this._initPhaseActive) { // transition to sync if we get basis during init callbackHandler.disable(); - cancelScheduledTransition?.(); + this._consumeCancelToken(cancelScheduledTransition); transitionResolve({ transition: 'switchToSync' }); } }, @@ -109,19 +109,19 @@ export class CompositeDataSource implements DataSource { if (err || state === DataSourceState.Closed) { callbackHandler.disable(); statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition - cancelScheduledTransition?.(); + this._consumeCancelToken(cancelScheduledTransition); transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback } else { statusCallback(state, null); // report the status upward if (state !== lastState) { lastState = state; - cancelScheduledTransition?.(); // cancel previously scheduled status transition if one was scheduled + this._consumeCancelToken(cancelScheduledTransition); // cancel previously scheduled status transition if one was scheduled const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it const condition = this._lookupTransitionCondition(state, excludeRecovery); if (condition) { const { promise, cancel } = this._cancellableDelay(condition.durationMS); - this._cancelTokens.push(cancel); cancelScheduledTransition = cancel; + this._cancelTokens.push(cancelScheduledTransition); promise.then(() => { callbackHandler.disable(); transitionResolve({ transition: condition.transition }); @@ -161,8 +161,8 @@ export class CompositeDataSource implements DataSource { if (transitionRequest.err && transitionRequest.transition !== 'stop') { // if the transition was due to an error, throttle the transition const delay = this._backoff.fail(); - const { promise, cancel } = this._cancellableDelay(delay); - this._cancelTokens.push(cancel); + const { promise, cancel: cancelDelay } = this._cancellableDelay(delay); + this._cancelTokens.push(cancelDelay); const delayedTransition = promise.then(() => transitionRequest); // race the delayed transition and external transition requests to be responsive @@ -170,6 +170,9 @@ export class CompositeDataSource implements DataSource { delayedTransition, this._externalTransitionPromise, ]); + + // consume the delay cancel token (even if it resolved, need to stop tracking its token) + this._consumeCancelToken(cancelDelay); } if (transitionRequest.transition === 'stop') { @@ -188,6 +191,7 @@ export class CompositeDataSource implements DataSource { async stop() { this._cancelTokens.forEach((cancel) => cancel()); + this._cancelTokens = []; this._externalTransitionResolve?.({ transition: 'stop' }); } @@ -274,4 +278,12 @@ export class CompositeDataSource implements DataSource { }, }; }; + + private _consumeCancelToken(cancel: () => void) { + cancel(); + const index = this._cancelTokens.indexOf(cancel, 0); + if (index > -1) { + this._cancelTokens.splice(index, 1); + } + } } From fa634f5783a48b8fd032f446de7a557ada4ae6d2 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Mon, 24 Mar 2025 10:56:32 -0500 Subject: [PATCH 08/10] Additional changes after working on contract test integration --- package.json | 2 +- .../DataSystem/CompositeDataSource.test.ts | 179 +++++++++++------- .../subsystem/DataSystem/CallbackHandler.ts | 6 +- .../api/subsystem/DataSystem/DataSource.ts | 22 +-- .../src/api/subsystem/DataSystem/index.ts | 7 +- .../shared/common/src/api/subsystem/index.ts | 14 ++ .../CompositeDataSource.ts | 63 +++--- .../shared/common/src/datasource/index.ts | 2 + packages/shared/common/src/index.ts | 2 + 9 files changed, 184 insertions(+), 113 deletions(-) rename packages/shared/common/src/{api/subsystem/DataSystem => datasource}/CompositeDataSource.ts (86%) diff --git a/package.json b/package.json index b7b0bdc24b..53163bd25c 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "test": "echo Please run tests for individual packages.", "coverage": "npm run test -- --coverage", "contract-test-service": "npm --prefix contract-tests install && npm --prefix contract-tests start", - "contract-test-harness": "curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/master/downloader/run.sh \\ | VERSION=v2 PARAMS=\"-url http://localhost:8000 -debug -stop-service-at-end $TEST_HARNESS_PARAMS\" sh", + "contract-test-harness": "curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/master/downloader/run.sh \\ | VERSION=v2 PARAMS=\"-url http://localhost:8000 -debug --skip-from=./contract-tests/testharness-suppressions.txt -stop-service-at-end $TEST_HARNESS_PARAMS\" sh", "contract-tests": "npm run contract-test-service & npm run contract-test-harness", "prettier": "npx prettier --write \"**/*.{js,ts,tsx,json,yaml,yml,md}\" --log-level warn", "check": "yarn && yarn prettier && yarn lint && tsc && yarn build" diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 3415ece2fe..5b69a0d40f 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -1,27 +1,22 @@ import { - CompositeDataSource, - TransitionConditions, -} from '../../../src/api/subsystem/DataSystem/CompositeDataSource'; -import { - Data, DataSourceState, DataSystemInitializer, DataSystemSynchronizer, - InitializerFactory, - SynchronizerFactory, + LDInitializerFactory, + LDSynchronizerFactory, } from '../../../src/api/subsystem/DataSystem/DataSource'; import { Backoff } from '../../../src/datasource/Backoff'; +import { + CompositeDataSource, + TransitionConditions, +} from '../../../src/datasource/CompositeDataSource'; -function makeInitializerFactory(internal: DataSystemInitializer): InitializerFactory { - return { - create: () => internal, - }; +function makeInitializerFactory(internal: DataSystemInitializer): LDInitializerFactory { + return () => internal; } -function makeSynchronizerFactory(internal: DataSystemSynchronizer): SynchronizerFactory { - return { - create: () => internal, - }; +function makeSynchronizerFactory(internal: DataSystemSynchronizer): LDSynchronizerFactory { + return () => internal; } function makeTestTransitionConditions(): TransitionConditions { @@ -58,11 +53,11 @@ function makeZeroBackoff(): Backoff { it('handles initializer getting basis, switching to syncrhonizer', async () => { const mockInitializer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); @@ -73,11 +68,11 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { const mockSynchronizer1Data = { key: 'sync1' }; const mockSynchronizer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(false, mockSynchronizer1Data); @@ -89,23 +84,24 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let callback; await new Promise((resolve) => { - callback = jest.fn((_: boolean, data: Data) => { + callback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { resolve(); } }); - underTest.run(callback, jest.fn()); + underTest.start(callback, jest.fn()); }); - expect(mockInitializer1.run).toHaveBeenCalledTimes(1); - expect(mockSynchronizer1.run).toHaveBeenCalledTimes(1); + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); expect(callback).toHaveBeenCalledTimes(2); expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); @@ -113,11 +109,11 @@ it('handles initializer getting basis, switching to syncrhonizer', async () => { it('handles initializer getting basis, switches to synchronizer 1, falls back to synchronizer 2, recovers to synchronizer 1', async () => { const mockInitializer1: DataSystemInitializer = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); @@ -129,11 +125,11 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to let sync1RunCount = 0; const mockSynchronizer1Data = { key: 'sync1' }; const mockSynchronizer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { if (sync1RunCount === 0) { @@ -142,7 +138,7 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to message: 'I am error...man!', }); // error that will lead to fallback } else { - _dataCallback(false, mockSynchronizer1Data); // second run will lead to data + _dataCallback(false, mockSynchronizer1Data); // second start will lead to data } sync1RunCount += 1; }, @@ -152,11 +148,11 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to const mockSynchronizer2Data = { key: 'sync2' }; const mockSynchronizer2 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(false, mockSynchronizer2Data); @@ -169,24 +165,25 @@ it('handles initializer getting basis, switches to synchronizer 1, falls back to const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let callback; await new Promise((resolve) => { - callback = jest.fn((_: boolean, data: Data) => { + callback = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { resolve(); } }); - underTest.run(callback, jest.fn()); + underTest.start(callback, jest.fn()); }); - expect(mockInitializer1.run).toHaveBeenCalledTimes(1); - expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); - expect(mockSynchronizer2.run).toHaveBeenCalledTimes(1); + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(2); + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); expect(callback).toHaveBeenCalledTimes(3); expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); // sync1 errors and fallsback @@ -199,11 +196,11 @@ it('reports error when all initializers fail', async () => { message: 'I am initializer1 error!', }; const mockInitializer1: DataSystemInitializer = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Closed, mockInitializer1Error); @@ -217,11 +214,11 @@ it('reports error when all initializers fail', async () => { message: 'I am initializer2 error!', }; const mockInitializer2: DataSystemInitializer = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Closed, mockInitializer2Error); @@ -233,6 +230,7 @@ it('reports error when all initializers fail', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], [], // no synchronizers for this test + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); @@ -246,11 +244,11 @@ it('reports error when all initializers fail', async () => { } }); - underTest.run(dataCallback, statusCallback); + underTest.start(dataCallback, statusCallback); }); - expect(mockInitializer1.run).toHaveBeenCalledTimes(1); - expect(mockInitializer2.run).toHaveBeenCalledTimes(1); + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledTimes(0); expect(statusCallback).toHaveBeenNthCalledWith( 1, @@ -272,11 +270,11 @@ it('reports error when all initializers fail', async () => { it('can be stopped when in thrashing synchronizer fallback loop', async () => { const mockInitializer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); @@ -287,11 +285,11 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { const mockSynchronizer1Error = { name: 'Error', message: 'I am error...man!' }; const mockSynchronizer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _statusCallback(DataSourceState.Closed, mockSynchronizer1Error); // error that will lead to fallback @@ -303,6 +301,7 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); @@ -316,11 +315,11 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { } }); - underTest.run(dataCallback, statusCallback); + underTest.start(dataCallback, statusCallback); }); - expect(mockInitializer1.run).toHaveBeenCalled(); - expect(mockSynchronizer1.run).toHaveBeenCalled(); + expect(mockInitializer1.start).toHaveBeenCalled(); + expect(mockSynchronizer1.start).toHaveBeenCalled(); expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); underTest.stop(); @@ -340,11 +339,11 @@ it('can be stopped when in thrashing synchronizer fallback loop', async () => { it('can be stopped and restarted', async () => { const mockInitializer1Data = { key: 'init1' }; const mockInitializer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, mockInitializer1Data); @@ -355,11 +354,11 @@ it('can be stopped and restarted', async () => { const mockSynchronizer1Data = { key: 'sync1' }; const mockSynchronizer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(false, mockSynchronizer1Data); @@ -371,25 +370,26 @@ it('can be stopped and restarted', async () => { const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [makeSynchronizerFactory(mockSynchronizer1)], + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); let callback1; await new Promise((resolve) => { - callback1 = jest.fn((_: boolean, data: Data) => { + callback1 = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { underTest.stop(); resolve(); } }); - // first run - underTest.run(callback1, jest.fn()); + // first start + underTest.start(callback1, jest.fn()); }); - // check first run triggered underlying data sources - expect(mockInitializer1.run).toHaveBeenCalledTimes(1); - expect(mockSynchronizer1.run).toHaveBeenCalledTimes(1); + // check first start triggered underlying data sources + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); expect(callback1).toHaveBeenCalledTimes(2); // wait a moment for pending awaits to resolve the stop request @@ -399,18 +399,18 @@ it('can be stopped and restarted', async () => { let callback2; await new Promise((resolve) => { - callback2 = jest.fn((_: boolean, data: Data) => { + callback2 = jest.fn((_: boolean, data: any) => { if (data === mockSynchronizer1Data) { resolve(); } }); - // second run - underTest.run(callback2, jest.fn()); + // second start + underTest.start(callback2, jest.fn()); }); - // check that second run triggers underlying data sources again - expect(mockInitializer1.run).toHaveBeenCalledTimes(2); - expect(mockSynchronizer1.run).toHaveBeenCalledTimes(2); + // check that second start triggers underlying data sources again + expect(mockInitializer1.start).toHaveBeenCalledTimes(2); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(2); expect(callback2).toHaveBeenCalledTimes(2); }); @@ -418,6 +418,7 @@ it('is well behaved with no initializers and no synchronizers configured', async const underTest = new CompositeDataSource( [], [], + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); @@ -428,7 +429,7 @@ it('is well behaved with no initializers and no synchronizers configured', async resolve(); }); - underTest.run(jest.fn(), statusCallback); + underTest.start(jest.fn(), statusCallback); }); expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { @@ -438,13 +439,49 @@ it('is well behaved with no initializers and no synchronizers configured', async }); }); +it('is well behaved with no initializer and synchronizer configured', async () => { + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + underTest.start(dataCallback, jest.fn()); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, false, { key: 'sync1' }); +}); + it('is well behaved with an initializer and no synchronizers configured', async () => { const mockInitializer1 = { - run: jest + start: jest .fn() .mockImplementation( ( - _dataCallback: (basis: boolean, data: Data) => void, + _dataCallback: (basis: boolean, data: any) => void, _statusCallback: (status: DataSourceState, err?: any) => void, ) => { _dataCallback(true, { key: 'init1' }); @@ -456,19 +493,23 @@ it('is well behaved with an initializer and no synchronizers configured', async const underTest = new CompositeDataSource( [makeInitializerFactory(mockInitializer1)], [], + undefined, makeTestTransitionConditions(), makeZeroBackoff(), ); + let dataCallback; let statusCallback; await new Promise((resolve) => { + dataCallback = jest.fn(); statusCallback = jest.fn((_1: DataSourceState, _2: any) => { resolve(); }); - underTest.run(jest.fn(), statusCallback); + underTest.start(dataCallback, statusCallback); }); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { name: 'ExhaustedDataSources', message: diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts index 12ec52ce79..5ae69fb1c6 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -1,4 +1,4 @@ -import { Data, DataSourceState } from './DataSource'; +import { DataSourceState } from './DataSource'; /** * Handler that connects the current {@link DataSource} to the {@link CompositeDataSource}. A single @@ -9,7 +9,7 @@ export class CallbackHandler { private _disabled: boolean = false; constructor( - private readonly _dataCallback: (basis: boolean, data: Data) => void, + private readonly _dataCallback: (basis: boolean, data: any) => void, private readonly _statusCallback: (status: DataSourceState, err?: any) => void, ) {} @@ -17,7 +17,7 @@ export class CallbackHandler { this._disabled = true; } - async dataHandler(basis: boolean, data: Data) { + async dataHandler(basis: boolean, data: any) { if (this._disabled) { return; } diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts index 80885a401e..f7a5834761 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -1,10 +1,12 @@ -export interface Data {} - // TODO: refactor client-sdk to use this enum export enum DataSourceState { + // Spinning up to make first connection attempt Initializing, + // Positive confirmation of connection/data receipt Valid, + // Transient issue, automatic retry is expected Interrupted, + // Permanent issue, external intervention required Closed, } @@ -15,8 +17,8 @@ export interface DataSource { * @param statusCallback that will be called when data source state changes or an unrecoverable error * has been encountered. */ - run( - dataCallback: (basis: boolean, data: Data) => void, + start( + dataCallback: (basis: boolean, data: any) => void, statusCallback: (status: DataSourceState, err?: any) => void, ): void; @@ -26,6 +28,10 @@ export interface DataSource { stop(): void; } +export type LDInitializerFactory = () => DataSystemInitializer; + +export type LDSynchronizerFactory = () => DataSystemSynchronizer; + /** * A data source that can be used to fetch the basis. */ @@ -35,11 +41,3 @@ export interface DataSystemInitializer extends DataSource {} * A data source that can be used to fetch the basis or ongoing data changes. */ export interface DataSystemSynchronizer extends DataSource {} - -export interface InitializerFactory { - create(): DataSystemInitializer; -} - -export interface SynchronizerFactory { - create(): DataSystemSynchronizer; -} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts index d49e3438fd..3ad2737e2d 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/index.ts +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -1,7 +1,8 @@ export { + DataSource, + DataSourceState, DataSystemInitializer, DataSystemSynchronizer, - InitializerFactory, - SynchronizerFactory, + LDInitializerFactory, + LDSynchronizerFactory, } from './DataSource'; -export { CompositeDataSource } from './CompositeDataSource'; diff --git a/packages/shared/common/src/api/subsystem/index.ts b/packages/shared/common/src/api/subsystem/index.ts index 000f60f686..7dc2af969b 100644 --- a/packages/shared/common/src/api/subsystem/index.ts +++ b/packages/shared/common/src/api/subsystem/index.ts @@ -1,9 +1,23 @@ +import { + DataSource, + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, +} from './DataSystem'; import LDContextDeduplicator from './LDContextDeduplicator'; import LDEventProcessor from './LDEventProcessor'; import LDEventSender, { LDDeliveryStatus, LDEventSenderResult, LDEventType } from './LDEventSender'; import { LDStreamProcessor } from './LDStreamProcessor'; export { + DataSource, + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, LDEventProcessor, LDContextDeduplicator, LDEventSender, diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts similarity index 86% rename from packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts rename to packages/shared/common/src/datasource/CompositeDataSource.ts index d77c3916a3..e68dc6f7dd 100644 --- a/packages/shared/common/src/api/subsystem/DataSystem/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -1,19 +1,15 @@ /* eslint-disable no-await-in-loop */ -import { Backoff } from '../../../datasource/Backoff'; -import { LDLogger } from '../../logging'; -import { CallbackHandler } from './CallbackHandler'; +import { LDLogger } from '../api/logging'; +import { CallbackHandler } from '../api/subsystem/DataSystem/CallbackHandler'; import { - Data, DataSource, DataSourceState, - InitializerFactory, - SynchronizerFactory, -} from './DataSource'; + LDInitializerFactory, + LDSynchronizerFactory, +} from '../api/subsystem/DataSystem/DataSource'; +import { Backoff, DefaultBackoff } from './Backoff'; -// TODO: SDK-858, specify these constants when CompositeDataSource is used. -// eslint-disable-next-line @typescript-eslint/no-unused-vars const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; -// eslint-disable-next-line @typescript-eslint/no-unused-vars const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; /** @@ -41,8 +37,8 @@ export class CompositeDataSource implements DataSource { // TODO: SDK-856 async notification if initializer takes too long // TODO: SDK-1044 utilize selector from initializers - private _initPhaseActive: boolean = true; - private _currentPosition: number = 0; + private _initPhaseActive: boolean; + private _currentPosition: number; private _stopped: boolean = true; private _externalTransitionPromise: Promise; @@ -54,21 +50,33 @@ export class CompositeDataSource implements DataSource { * @param _synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. */ constructor( - private readonly _initializers: InitializerFactory[], - private readonly _synchronizers: SynchronizerFactory[], - private readonly _transitionConditions: TransitionConditions, - private readonly _backoff: Backoff, + private readonly _initializers: LDInitializerFactory[], + private readonly _synchronizers: LDSynchronizerFactory[], private readonly _logger?: LDLogger, + private readonly _transitionConditions: TransitionConditions = { + [DataSourceState.Valid]: { + durationMS: DEFAULT_RECOVERY_TIME_MS, + transition: 'recover', + }, + [DataSourceState.Interrupted]: { + durationMS: DEFAULT_FALLBACK_TIME_MS, + transition: 'fallback', + }, + }, + private readonly _backoff: Backoff = new DefaultBackoff( + 1000, // TODO SDK-1137: handle blacklisting perpetually failing sources + 30000, + ), ) { this._externalTransitionPromise = new Promise((resolveTransition) => { this._externalTransitionResolve = resolveTransition; }); - this._initPhaseActive = true; + this._initPhaseActive = _initializers.length > 0; // init phase if we have initializers this._currentPosition = 0; } - async run( - dataCallback: (basis: boolean, data: Data) => void, + async start( + dataCallback: (basis: boolean, data: any) => void, statusCallback: (status: DataSourceState, err?: any) => void, ): Promise { if (!this._stopped) { @@ -78,6 +86,9 @@ export class CompositeDataSource implements DataSource { } this._stopped = false; + this._logger?.debug( + `CompositeDataSource starting with (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + ); let lastTransition: Transition = 'none'; // eslint-disable-next-line no-constant-condition while (true) { @@ -91,7 +102,7 @@ export class CompositeDataSource implements DataSource { // this callback handler can be disabled and ensures only one transition request occurs const callbackHandler = new CallbackHandler( - (basis: boolean, data: Data) => { + (basis: boolean, data: any) => { this._backoff.success(); dataCallback(basis, data); if (basis && this._initPhaseActive) { @@ -105,7 +116,9 @@ export class CompositeDataSource implements DataSource { // When we get a status update, we want to fallback if it is an error. We also want to schedule a transition for some // time in the future if this status remains for some duration (ex: Recover to primary synchronizer after the secondary // synchronizer has been Valid for some time). These scheduled transitions are configurable in the constructor. - + this._logger?.debug( + `CompositeDataSource received state ${state} from underlying data source.`, + ); if (err || state === DataSourceState.Closed) { callbackHandler.disable(); statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition @@ -133,7 +146,7 @@ export class CompositeDataSource implements DataSource { } }, ); - currentDS.run( + currentDS.start( (basis, data) => callbackHandler.dataHandler(basis, data), (status, err) => callbackHandler.statusHandler(status, err), ); @@ -197,7 +210,7 @@ export class CompositeDataSource implements DataSource { private _reset() { this._stopped = true; - this._initPhaseActive = true; + this._initPhaseActive = this._initializers.length > 0; // init phase if we have initializers; this._currentPosition = 0; this._externalTransitionPromise = new Promise((tr) => { this._externalTransitionResolve = tr; @@ -229,7 +242,7 @@ export class CompositeDataSource implements DataSource { return undefined; } - return this._initializers[this._currentPosition].create(); + return this._initializers[this._currentPosition](); } // getting here indicates we are using a synchronizer @@ -242,7 +255,7 @@ export class CompositeDataSource implements DataSource { // this is only possible if no synchronizers were provided return undefined; } - return this._synchronizers[this._currentPosition].create(); + return this._synchronizers[this._currentPosition](); } /** diff --git a/packages/shared/common/src/datasource/index.ts b/packages/shared/common/src/datasource/index.ts index e727947dce..237da787e0 100644 --- a/packages/shared/common/src/datasource/index.ts +++ b/packages/shared/common/src/datasource/index.ts @@ -1,4 +1,5 @@ import { Backoff, DefaultBackoff } from './Backoff'; +import { CompositeDataSource } from './CompositeDataSource'; import { DataSourceErrorKind } from './DataSourceErrorKinds'; import { LDFileDataSourceError, @@ -9,6 +10,7 @@ import { export { Backoff, + CompositeDataSource, DefaultBackoff, DataSourceErrorKind, LDFileDataSourceError, diff --git a/packages/shared/common/src/index.ts b/packages/shared/common/src/index.ts index 7df308f674..060ed1bf5a 100644 --- a/packages/shared/common/src/index.ts +++ b/packages/shared/common/src/index.ts @@ -3,6 +3,7 @@ import Context from './Context'; import ContextFilter from './ContextFilter'; import { Backoff, + CompositeDataSource, DataSourceErrorKind, DefaultBackoff, LDFileDataSourceError, @@ -24,6 +25,7 @@ export { AttributeReference, Context, ContextFilter, + CompositeDataSource, DataSourceErrorKind, Backoff, DefaultBackoff, From bd1ac1dbe5596885bade65cc86963cc1a5d2b65a Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 28 Mar 2025 10:44:26 -0500 Subject: [PATCH 09/10] fixing delayed transitions not cleaning up their own cancel tokens --- .../DataSystem/CompositeDataSource.test.ts | 71 +++++++++++++++++++ .../src/datasource/CompositeDataSource.ts | 14 ++-- 2 files changed, 80 insertions(+), 5 deletions(-) diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts index 5b69a0d40f..a83025808f 100644 --- a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -516,3 +516,74 @@ it('is well behaved with an initializer and no synchronizers configured', async 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', }); }); + +it('consumes cancellation tokens correctly', async () => { + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Interrupted); // report interrupted to schedule automatic transition and create cancellation token + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + { + // pass in transition condition of 0 so that it will thrash, generating cancellation tokens repeatedly + [DataSourceState.Interrupted]: { + durationMS: 100, + transition: 'fallback', + }, + }, + makeZeroBackoff(), + ); + + let dataCallback; + let statusCallback; + let interruptedCount = 0; + await new Promise((resolve) => { + dataCallback = jest.fn(); + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + interruptedCount += 1; + if (interruptedCount > 10) { + // let it thrash for N iterations + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + // @ts-ignore + // eslint-disable-next-line no-underscore-dangle + expect(underTest._cancelTokens.length).toEqual(1); + + underTest.stop(); + + // @ts-ignore + // eslint-disable-next-line no-underscore-dangle + expect(underTest._cancelTokens.length).toEqual(0); +}); diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index e68dc6f7dd..443a18075f 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -135,10 +135,12 @@ export class CompositeDataSource implements DataSource { const { promise, cancel } = this._cancellableDelay(condition.durationMS); cancelScheduledTransition = cancel; this._cancelTokens.push(cancelScheduledTransition); - promise.then(() => { - callbackHandler.disable(); - transitionResolve({ transition: condition.transition }); - }); + promise + .then(() => this._consumeCancelToken(cancel)) + .then(() => { + callbackHandler.disable(); + transitionResolve({ transition: condition.transition }); + }); } else { // this data source state does not have a transition condition, so don't schedule any transition } @@ -176,7 +178,9 @@ export class CompositeDataSource implements DataSource { const delay = this._backoff.fail(); const { promise, cancel: cancelDelay } = this._cancellableDelay(delay); this._cancelTokens.push(cancelDelay); - const delayedTransition = promise.then(() => transitionRequest); + const delayedTransition = promise + .then(() => this._consumeCancelToken(cancelDelay)) + .then(() => transitionRequest); // race the delayed transition and external transition requests to be responsive transitionRequest = await Promise.race([ From 90340c127558e15f7672041633263b981ed37e72 Mon Sep 17 00:00:00 2001 From: Todd Anderson Date: Fri, 28 Mar 2025 10:48:38 -0500 Subject: [PATCH 10/10] adjusting syntax --- .../src/datasource/CompositeDataSource.ts | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts index 443a18075f..d831463124 100644 --- a/packages/shared/common/src/datasource/CompositeDataSource.ts +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -135,12 +135,11 @@ export class CompositeDataSource implements DataSource { const { promise, cancel } = this._cancellableDelay(condition.durationMS); cancelScheduledTransition = cancel; this._cancelTokens.push(cancelScheduledTransition); - promise - .then(() => this._consumeCancelToken(cancel)) - .then(() => { - callbackHandler.disable(); - transitionResolve({ transition: condition.transition }); - }); + promise.then(() => { + this._consumeCancelToken(cancel); + callbackHandler.disable(); + transitionResolve({ transition: condition.transition }); + }); } else { // this data source state does not have a transition condition, so don't schedule any transition } @@ -178,9 +177,10 @@ export class CompositeDataSource implements DataSource { const delay = this._backoff.fail(); const { promise, cancel: cancelDelay } = this._cancellableDelay(delay); this._cancelTokens.push(cancelDelay); - const delayedTransition = promise - .then(() => this._consumeCancelToken(cancelDelay)) - .then(() => transitionRequest); + const delayedTransition = promise.then(() => { + this._consumeCancelToken(cancelDelay); + return transitionRequest; + }); // race the delayed transition and external transition requests to be responsive transitionRequest = await Promise.race([