diff --git a/package.json b/package.json index b7b0bdc24b..53163bd25c 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "test": "echo Please run tests for individual packages.", "coverage": "npm run test -- --coverage", "contract-test-service": "npm --prefix contract-tests install && npm --prefix contract-tests start", - "contract-test-harness": "curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/master/downloader/run.sh \\ | VERSION=v2 PARAMS=\"-url http://localhost:8000 -debug -stop-service-at-end $TEST_HARNESS_PARAMS\" sh", + "contract-test-harness": "curl -s https://raw.githubusercontent.com/launchdarkly/sdk-test-harness/master/downloader/run.sh \\ | VERSION=v2 PARAMS=\"-url http://localhost:8000 -debug --skip-from=./contract-tests/testharness-suppressions.txt -stop-service-at-end $TEST_HARNESS_PARAMS\" sh", "contract-tests": "npm run contract-test-service & npm run contract-test-harness", "prettier": "npx prettier --write \"**/*.{js,ts,tsx,json,yaml,yml,md}\" --log-level warn", "check": "yarn && yarn prettier && yarn lint && tsc && yarn build" diff --git a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts index 3ecdeb3a12..d753931aa8 100644 --- a/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts +++ b/packages/sdk/browser/src/platform/DefaultBrowserEventSource.ts @@ -1,4 +1,5 @@ import { + DefaultBackoff, EventListener, EventName, EventSourceInitDict, @@ -6,8 +7,6 @@ import { EventSource as LDEventSource, } from '@launchdarkly/js-client-sdk-common'; -import Backoff from './Backoff'; - /** * Implementation Notes: * @@ -22,7 +21,7 @@ import Backoff from './Backoff'; */ export default class DefaultBrowserEventSource implements LDEventSource { private _es?: EventSource; - private _backoff: Backoff; + private _backoff: DefaultBackoff; private _errorFilter: (err: HttpErrorResponse) => boolean; // The type of the handle can be platform specific and we treat is opaquely. @@ -34,7 +33,10 @@ export default class DefaultBrowserEventSource implements LDEventSource { private readonly _url: string, options: EventSourceInitDict, ) { - this._backoff = new Backoff(options.initialRetryDelayMillis, options.retryResetIntervalMillis); + this._backoff = new DefaultBackoff( + options.initialRetryDelayMillis, + options.retryResetIntervalMillis, + ); this._errorFilter = options.errorFilter; this._openConnection(); } diff --git a/packages/sdk/browser/__tests__/platform/Backoff.test.ts b/packages/shared/common/__tests__/datasource/Backoff.test.ts similarity index 79% rename from packages/sdk/browser/__tests__/platform/Backoff.test.ts rename to packages/shared/common/__tests__/datasource/Backoff.test.ts index 49cdd901cb..77b09ce0fa 100644 --- a/packages/sdk/browser/__tests__/platform/Backoff.test.ts +++ b/packages/shared/common/__tests__/datasource/Backoff.test.ts @@ -1,29 +1,29 @@ -import Backoff from '../../src/platform/Backoff'; +import { DefaultBackoff } from '../../src/datasource/Backoff'; const noJitter = (): number => 0; const maxJitter = (): number => 1; const defaultResetInterval = 60 * 1000; it.each([1, 1000, 5000])('has the correct starting delay', (initialDelay) => { - const backoff = new Backoff(initialDelay, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(initialDelay, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(initialDelay); }); it.each([1, 1000, 5000])('doubles delay on consecutive failures', (initialDelay) => { - const backoff = new Backoff(initialDelay, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(initialDelay, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(initialDelay); expect(backoff.fail()).toEqual(initialDelay * 2); expect(backoff.fail()).toEqual(initialDelay * 4); }); it('stops increasing delay when the max backoff is encountered', () => { - const backoff = new Backoff(5000, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(5000, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(5000); expect(backoff.fail()).toEqual(10000); expect(backoff.fail()).toEqual(20000); expect(backoff.fail()).toEqual(30000); - const backoff2 = new Backoff(1000, defaultResetInterval, noJitter); + const backoff2 = new DefaultBackoff(1000, defaultResetInterval, noJitter); expect(backoff2.fail()).toEqual(1000); expect(backoff2.fail()).toEqual(2000); expect(backoff2.fail()).toEqual(4000); @@ -33,12 +33,12 @@ it('stops increasing delay when the max backoff is encountered', () => { }); it('handles an initial retry delay longer than the maximum retry delay', () => { - const backoff = new Backoff(40000, defaultResetInterval, noJitter); + const backoff = new DefaultBackoff(40000, defaultResetInterval, noJitter); expect(backoff.fail()).toEqual(30000); }); it('jitters the backoff value', () => { - const backoff = new Backoff(1000, defaultResetInterval, maxJitter); + const backoff = new DefaultBackoff(1000, defaultResetInterval, maxJitter); expect(backoff.fail()).toEqual(500); expect(backoff.fail()).toEqual(1000); expect(backoff.fail()).toEqual(2000); @@ -51,7 +51,7 @@ it.each([10 * 1000, 60 * 1000])( 'resets the delay when the last successful connection was connected greater than the retry reset interval', (retryResetInterval) => { let time = 1000; - const backoff = new Backoff(1000, retryResetInterval, noJitter); + const backoff = new DefaultBackoff(1000, retryResetInterval, noJitter); expect(backoff.fail(time)).toEqual(1000); time += 1; backoff.success(time); @@ -69,7 +69,7 @@ it.each([10 * 1000, 60 * 1000])( it.each([10 * 1000, 60 * 1000])( 'does not reset the delay when the connection did not persist longer than the retry reset interval', (retryResetInterval) => { - const backoff = new Backoff(1000, retryResetInterval, noJitter); + const backoff = new DefaultBackoff(1000, retryResetInterval, noJitter); let time = 1000; expect(backoff.fail(time)).toEqual(1000); diff --git a/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts new file mode 100644 index 0000000000..a83025808f --- /dev/null +++ b/packages/shared/common/__tests__/subsystem/DataSystem/CompositeDataSource.test.ts @@ -0,0 +1,589 @@ +import { + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, +} from '../../../src/api/subsystem/DataSystem/DataSource'; +import { Backoff } from '../../../src/datasource/Backoff'; +import { + CompositeDataSource, + TransitionConditions, +} from '../../../src/datasource/CompositeDataSource'; + +function makeInitializerFactory(internal: DataSystemInitializer): LDInitializerFactory { + return () => internal; +} + +function makeSynchronizerFactory(internal: DataSystemSynchronizer): LDSynchronizerFactory { + return () => internal; +} + +function makeTestTransitionConditions(): TransitionConditions { + return { + [DataSourceState.Initializing]: { + durationMS: 0, + transition: 'fallback', + }, + [DataSourceState.Interrupted]: { + durationMS: 0, + transition: 'fallback', + }, + [DataSourceState.Closed]: { + durationMS: 0, + transition: 'fallback', + }, + [DataSourceState.Valid]: { + durationMS: 0, + transition: 'fallback', + }, + }; +} + +function makeZeroBackoff(): Backoff { + return { + success() { + return 0; + }, + fail() { + return 0; + }, + }; +} + +it('handles initializer getting basis, switching to syncrhonizer', async () => { + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let callback; + await new Promise((resolve) => { + callback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(callback, jest.fn()); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledTimes(2); + expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync1' }); +}); + +it('handles initializer getting basis, switches to synchronizer 1, falls back to synchronizer 2, recovers to synchronizer 1', async () => { + const mockInitializer1: DataSystemInitializer = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + let sync1RunCount = 0; + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + if (sync1RunCount === 0) { + _statusCallback(DataSourceState.Closed, { + name: 'Error', + message: 'I am error...man!', + }); // error that will lead to fallback + } else { + _dataCallback(false, mockSynchronizer1Data); // second start will lead to data + } + sync1RunCount += 1; + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer2Data = { key: 'sync2' }; + const mockSynchronizer2 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer2Data); + _statusCallback(DataSourceState.Valid, null); // this should lead to recovery + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1), makeSynchronizerFactory(mockSynchronizer2)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let callback; + await new Promise((resolve) => { + callback = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + + underTest.start(callback, jest.fn()); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(2); + expect(mockSynchronizer2.start).toHaveBeenCalledTimes(1); + expect(callback).toHaveBeenCalledTimes(3); + expect(callback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(callback).toHaveBeenNthCalledWith(2, false, { key: 'sync2' }); // sync1 errors and fallsback + expect(callback).toHaveBeenNthCalledWith(3, false, { key: 'sync1' }); // sync2 recovers back to sync1 +}); + +it('reports error when all initializers fail', async () => { + const mockInitializer1Error = { + name: 'Error', + message: 'I am initializer1 error!', + }; + const mockInitializer1: DataSystemInitializer = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Closed, mockInitializer1Error); + }, + ), + stop: jest.fn(), + }; + + const mockInitializer2Error = { + name: 'Error', + message: 'I am initializer2 error!', + }; + const mockInitializer2: DataSystemInitializer = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Closed, mockInitializer2Error); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1), makeInitializerFactory(mockInitializer2)], + [], // no synchronizers for this test + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + const dataCallback = jest.fn(); + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((_: DataSourceState, err?: any) => { + if (err?.name === 'ExhaustedDataSources') { + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockInitializer2.start).toHaveBeenCalledTimes(1); + expect(dataCallback).toHaveBeenCalledTimes(0); + expect(statusCallback).toHaveBeenNthCalledWith( + 1, + DataSourceState.Interrupted, + mockInitializer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith( + 2, + DataSourceState.Interrupted, + mockInitializer2Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith(3, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: + 'CompositeDataSource has exhausted all configured datasources (2 initializers, 0 synchronizers).', + }); + expect(statusCallback).toHaveBeenCalledTimes(3); +}); + +it('can be stopped when in thrashing synchronizer fallback loop', async () => { + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Error = { name: 'Error', message: 'I am error...man!' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Closed, mockSynchronizer1Error); // error that will lead to fallback + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], // will continuously fallback onto itself + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + const dataCallback = jest.fn(); + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((state: DataSourceState, _: any) => { + if (state === DataSourceState.Interrupted) { + resolve(); // waiting interruption due to sync error + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(mockInitializer1.start).toHaveBeenCalled(); + expect(mockSynchronizer1.start).toHaveBeenCalled(); + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + underTest.stop(); + + // wait for stop to take effect before checking status is closed + await new Promise((f) => { + setTimeout(f, 100); + }); + + expect(statusCallback).toHaveBeenNthCalledWith( + 1, + DataSourceState.Interrupted, + mockSynchronizer1Error, + ); + expect(statusCallback).toHaveBeenNthCalledWith(2, DataSourceState.Closed, undefined); +}); + +it('can be stopped and restarted', async () => { + const mockInitializer1Data = { key: 'init1' }; + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, mockInitializer1Data); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let callback1; + await new Promise((resolve) => { + callback1 = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + underTest.stop(); + resolve(); + } + }); + // first start + underTest.start(callback1, jest.fn()); + }); + + // check first start triggered underlying data sources + expect(mockInitializer1.start).toHaveBeenCalledTimes(1); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(1); + expect(callback1).toHaveBeenCalledTimes(2); + + // wait a moment for pending awaits to resolve the stop request + await new Promise((f) => { + setTimeout(f, 1); + }); + + let callback2; + await new Promise((resolve) => { + callback2 = jest.fn((_: boolean, data: any) => { + if (data === mockSynchronizer1Data) { + resolve(); + } + }); + // second start + underTest.start(callback2, jest.fn()); + }); + + // check that second start triggers underlying data sources again + expect(mockInitializer1.start).toHaveBeenCalledTimes(2); + expect(mockSynchronizer1.start).toHaveBeenCalledTimes(2); + expect(callback2).toHaveBeenCalledTimes(2); +}); + +it('is well behaved with no initializers and no synchronizers configured', async () => { + const underTest = new CompositeDataSource( + [], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let statusCallback; + await new Promise((resolve) => { + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + resolve(); + }); + + underTest.start(jest.fn(), statusCallback); + }); + + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: + 'CompositeDataSource has exhausted all configured datasources (0 initializers, 0 synchronizers).', + }); +}); + +it('is well behaved with no initializer and synchronizer configured', async () => { + const mockSynchronizer1Data = { key: 'sync1' }; + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(false, mockSynchronizer1Data); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(() => { + resolve(); + }); + + underTest.start(dataCallback, jest.fn()); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, false, { key: 'sync1' }); +}); + +it('is well behaved with an initializer and no synchronizers configured', async () => { + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [], + undefined, + makeTestTransitionConditions(), + makeZeroBackoff(), + ); + + let dataCallback; + let statusCallback; + await new Promise((resolve) => { + dataCallback = jest.fn(); + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + resolve(); + }); + + underTest.start(dataCallback, statusCallback); + }); + + expect(dataCallback).toHaveBeenNthCalledWith(1, true, { key: 'init1' }); + expect(statusCallback).toHaveBeenNthCalledWith(1, DataSourceState.Closed, { + name: 'ExhaustedDataSources', + message: + 'CompositeDataSource has exhausted all configured datasources (1 initializers, 0 synchronizers).', + }); +}); + +it('consumes cancellation tokens correctly', async () => { + const mockInitializer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _dataCallback(true, { key: 'init1' }); + }, + ), + stop: jest.fn(), + }; + + const mockSynchronizer1 = { + start: jest + .fn() + .mockImplementation( + ( + _dataCallback: (basis: boolean, data: any) => void, + _statusCallback: (status: DataSourceState, err?: any) => void, + ) => { + _statusCallback(DataSourceState.Initializing); + _statusCallback(DataSourceState.Interrupted); // report interrupted to schedule automatic transition and create cancellation token + }, + ), + stop: jest.fn(), + }; + + const underTest = new CompositeDataSource( + [makeInitializerFactory(mockInitializer1)], + [makeSynchronizerFactory(mockSynchronizer1)], + undefined, + { + // pass in transition condition of 0 so that it will thrash, generating cancellation tokens repeatedly + [DataSourceState.Interrupted]: { + durationMS: 100, + transition: 'fallback', + }, + }, + makeZeroBackoff(), + ); + + let dataCallback; + let statusCallback; + let interruptedCount = 0; + await new Promise((resolve) => { + dataCallback = jest.fn(); + statusCallback = jest.fn((_1: DataSourceState, _2: any) => { + interruptedCount += 1; + if (interruptedCount > 10) { + // let it thrash for N iterations + resolve(); + } + }); + + underTest.start(dataCallback, statusCallback); + }); + + // @ts-ignore + // eslint-disable-next-line no-underscore-dangle + expect(underTest._cancelTokens.length).toEqual(1); + + underTest.stop(); + + // @ts-ignore + // eslint-disable-next-line no-underscore-dangle + expect(underTest._cancelTokens.length).toEqual(0); +}); diff --git a/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts new file mode 100644 index 0000000000..5ae69fb1c6 --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/CallbackHandler.ts @@ -0,0 +1,37 @@ +import { DataSourceState } from './DataSource'; + +/** + * Handler that connects the current {@link DataSource} to the {@link CompositeDataSource}. A single + * {@link CallbackHandler} should only be given to one {@link DataSource}. Use {@link disable()} to + * prevent additional callback events. + */ +export class CallbackHandler { + private _disabled: boolean = false; + + constructor( + private readonly _dataCallback: (basis: boolean, data: any) => void, + private readonly _statusCallback: (status: DataSourceState, err?: any) => void, + ) {} + + disable() { + this._disabled = true; + } + + async dataHandler(basis: boolean, data: any) { + if (this._disabled) { + return; + } + + // TODO: SDK-1044 track selector for future synchronizer to use + // report data up + this._dataCallback(basis, data); + } + + async statusHandler(status: DataSourceState, err?: any) { + if (this._disabled) { + return; + } + + this._statusCallback(status, err); + } +} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts new file mode 100644 index 0000000000..f7a5834761 --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/DataSource.ts @@ -0,0 +1,43 @@ +// TODO: refactor client-sdk to use this enum +export enum DataSourceState { + // Spinning up to make first connection attempt + Initializing, + // Positive confirmation of connection/data receipt + Valid, + // Transient issue, automatic retry is expected + Interrupted, + // Permanent issue, external intervention required + Closed, +} + +export interface DataSource { + /** + * May be called any number of times, if already started, has no effect + * @param dataCallback that will be called when data arrives, may be called multiple times. + * @param statusCallback that will be called when data source state changes or an unrecoverable error + * has been encountered. + */ + start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: DataSourceState, err?: any) => void, + ): void; + + /** + * May be called any number of times, if already stopped, has no effect. + */ + stop(): void; +} + +export type LDInitializerFactory = () => DataSystemInitializer; + +export type LDSynchronizerFactory = () => DataSystemSynchronizer; + +/** + * A data source that can be used to fetch the basis. + */ +export interface DataSystemInitializer extends DataSource {} + +/** + * A data source that can be used to fetch the basis or ongoing data changes. + */ +export interface DataSystemSynchronizer extends DataSource {} diff --git a/packages/shared/common/src/api/subsystem/DataSystem/index.ts b/packages/shared/common/src/api/subsystem/DataSystem/index.ts new file mode 100644 index 0000000000..3ad2737e2d --- /dev/null +++ b/packages/shared/common/src/api/subsystem/DataSystem/index.ts @@ -0,0 +1,8 @@ +export { + DataSource, + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, +} from './DataSource'; diff --git a/packages/shared/common/src/api/subsystem/index.ts b/packages/shared/common/src/api/subsystem/index.ts index 000f60f686..7dc2af969b 100644 --- a/packages/shared/common/src/api/subsystem/index.ts +++ b/packages/shared/common/src/api/subsystem/index.ts @@ -1,9 +1,23 @@ +import { + DataSource, + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, +} from './DataSystem'; import LDContextDeduplicator from './LDContextDeduplicator'; import LDEventProcessor from './LDEventProcessor'; import LDEventSender, { LDDeliveryStatus, LDEventSenderResult, LDEventType } from './LDEventSender'; import { LDStreamProcessor } from './LDStreamProcessor'; export { + DataSource, + DataSourceState, + DataSystemInitializer, + DataSystemSynchronizer, + LDInitializerFactory, + LDSynchronizerFactory, LDEventProcessor, LDContextDeduplicator, LDEventSender, diff --git a/packages/sdk/browser/src/platform/Backoff.ts b/packages/shared/common/src/datasource/Backoff.ts similarity index 96% rename from packages/sdk/browser/src/platform/Backoff.ts rename to packages/shared/common/src/datasource/Backoff.ts index ce0e931ee5..8b2a7f8be9 100644 --- a/packages/sdk/browser/src/platform/Backoff.ts +++ b/packages/shared/common/src/datasource/Backoff.ts @@ -1,6 +1,11 @@ const MAX_RETRY_DELAY = 30 * 1000; // Maximum retry delay 30 seconds. const JITTER_RATIO = 0.5; // Delay should be 50%-100% of calculated time. +export interface Backoff { + success(): void; + fail(): number; +} + /** * Implements exponential backoff and jitter. This class tracks successful connections and failures * and produces a retry delay. @@ -11,7 +16,7 @@ const JITTER_RATIO = 0.5; // Delay should be 50%-100% of calculated time. * initialRetryDelayMillis and capping at MAX_RETRY_DELAY. If RESET_INTERVAL has elapsed after a * success, without an intervening faulure, then the backoff is reset to initialRetryDelayMillis. */ -export default class Backoff { +export class DefaultBackoff { private _retryCount: number = 0; private _activeSince?: number; private _initialRetryDelayMillis: number; diff --git a/packages/shared/common/src/datasource/CompositeDataSource.ts b/packages/shared/common/src/datasource/CompositeDataSource.ts new file mode 100644 index 0000000000..d831463124 --- /dev/null +++ b/packages/shared/common/src/datasource/CompositeDataSource.ts @@ -0,0 +1,306 @@ +/* eslint-disable no-await-in-loop */ +import { LDLogger } from '../api/logging'; +import { CallbackHandler } from '../api/subsystem/DataSystem/CallbackHandler'; +import { + DataSource, + DataSourceState, + LDInitializerFactory, + LDSynchronizerFactory, +} from '../api/subsystem/DataSystem/DataSource'; +import { Backoff, DefaultBackoff } from './Backoff'; + +const DEFAULT_FALLBACK_TIME_MS = 2 * 60 * 1000; +const DEFAULT_RECOVERY_TIME_MS = 5 * 60 * 1000; + +/** + * Represents a transition between data sources. + */ +export type Transition = 'none' | 'switchToSync' | 'fallback' | 'recover' | 'stop'; + +/** + * Given a {@link DataSourceState}, how long to wait before transitioning. + */ +export type TransitionConditions = { + [k in DataSourceState]?: { durationMS: number; transition: Transition }; +}; + +interface TransitionRequest { + transition: Transition; + err?: Error; +} + +/** + * The {@link CompositeDataSource} can combine a number of {@link DataSystemInitializer}s and {@link DataSystemSynchronizer}s + * into a single {@link DataSource}, implementing fallback and recovery logic internally to choose where data is sourced from. + */ +export class CompositeDataSource implements DataSource { + // TODO: SDK-856 async notification if initializer takes too long + // TODO: SDK-1044 utilize selector from initializers + + private _initPhaseActive: boolean; + private _currentPosition: number; + + private _stopped: boolean = true; + private _externalTransitionPromise: Promise; + private _externalTransitionResolve?: (value: TransitionRequest) => void; + private _cancelTokens: (() => void)[] = []; + + /** + * @param _initializers factories to create {@link DataSystemInitializer}s, in priority order. + * @param _synchronizers factories to create {@link DataSystemSynchronizer}s, in priority order. + */ + constructor( + private readonly _initializers: LDInitializerFactory[], + private readonly _synchronizers: LDSynchronizerFactory[], + private readonly _logger?: LDLogger, + private readonly _transitionConditions: TransitionConditions = { + [DataSourceState.Valid]: { + durationMS: DEFAULT_RECOVERY_TIME_MS, + transition: 'recover', + }, + [DataSourceState.Interrupted]: { + durationMS: DEFAULT_FALLBACK_TIME_MS, + transition: 'fallback', + }, + }, + private readonly _backoff: Backoff = new DefaultBackoff( + 1000, // TODO SDK-1137: handle blacklisting perpetually failing sources + 30000, + ), + ) { + this._externalTransitionPromise = new Promise((resolveTransition) => { + this._externalTransitionResolve = resolveTransition; + }); + this._initPhaseActive = _initializers.length > 0; // init phase if we have initializers + this._currentPosition = 0; + } + + async start( + dataCallback: (basis: boolean, data: any) => void, + statusCallback: (status: DataSourceState, err?: any) => void, + ): Promise { + if (!this._stopped) { + // don't allow multiple simultaneous runs + this._logger?.info('CompositeDataSource already running. Ignoring call to start.'); + return; + } + this._stopped = false; + + this._logger?.debug( + `CompositeDataSource starting with (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + ); + let lastTransition: Transition = 'none'; + // eslint-disable-next-line no-constant-condition + while (true) { + const currentDS: DataSource | undefined = this._pickDataSource(lastTransition); + const internalTransitionPromise = new Promise((transitionResolve) => { + if (currentDS) { + // these local variables are used for handling automatic transition related to data source status (ex: recovering to primary after + // secondary has been valid for N many seconds) + let lastState: DataSourceState | undefined; + let cancelScheduledTransition: () => void = () => {}; + + // this callback handler can be disabled and ensures only one transition request occurs + const callbackHandler = new CallbackHandler( + (basis: boolean, data: any) => { + this._backoff.success(); + dataCallback(basis, data); + if (basis && this._initPhaseActive) { + // transition to sync if we get basis during init + callbackHandler.disable(); + this._consumeCancelToken(cancelScheduledTransition); + transitionResolve({ transition: 'switchToSync' }); + } + }, + (state: DataSourceState, err?: any) => { + // When we get a status update, we want to fallback if it is an error. We also want to schedule a transition for some + // time in the future if this status remains for some duration (ex: Recover to primary synchronizer after the secondary + // synchronizer has been Valid for some time). These scheduled transitions are configurable in the constructor. + this._logger?.debug( + `CompositeDataSource received state ${state} from underlying data source.`, + ); + if (err || state === DataSourceState.Closed) { + callbackHandler.disable(); + statusCallback(DataSourceState.Interrupted, err); // underlying errors or closed states are masked as interrupted while we transition + this._consumeCancelToken(cancelScheduledTransition); + transitionResolve({ transition: 'fallback', err }); // unrecoverable error has occurred, so fallback + } else { + statusCallback(state, null); // report the status upward + if (state !== lastState) { + lastState = state; + this._consumeCancelToken(cancelScheduledTransition); // cancel previously scheduled status transition if one was scheduled + const excludeRecovery = this._currentPosition === 0; // primary source cannot recover to itself, so exclude it + const condition = this._lookupTransitionCondition(state, excludeRecovery); + if (condition) { + const { promise, cancel } = this._cancellableDelay(condition.durationMS); + cancelScheduledTransition = cancel; + this._cancelTokens.push(cancelScheduledTransition); + promise.then(() => { + this._consumeCancelToken(cancel); + callbackHandler.disable(); + transitionResolve({ transition: condition.transition }); + }); + } else { + // this data source state does not have a transition condition, so don't schedule any transition + } + } + } + }, + ); + currentDS.start( + (basis, data) => callbackHandler.dataHandler(basis, data), + (status, err) => callbackHandler.statusHandler(status, err), + ); + } else { + // we don't have a data source to use! + transitionResolve({ + transition: 'stop', + err: { + name: 'ExhaustedDataSources', + message: `CompositeDataSource has exhausted all configured datasources (${this._initializers.length} initializers, ${this._synchronizers.length} synchronizers).`, + }, + }); + } + }); + + // await transition triggered by internal data source or an external stop request + let transitionRequest = await Promise.race([ + internalTransitionPromise, + this._externalTransitionPromise, + ]); + + // stop the underlying datasource before transitioning to next state + currentDS?.stop(); + + if (transitionRequest.err && transitionRequest.transition !== 'stop') { + // if the transition was due to an error, throttle the transition + const delay = this._backoff.fail(); + const { promise, cancel: cancelDelay } = this._cancellableDelay(delay); + this._cancelTokens.push(cancelDelay); + const delayedTransition = promise.then(() => { + this._consumeCancelToken(cancelDelay); + return transitionRequest; + }); + + // race the delayed transition and external transition requests to be responsive + transitionRequest = await Promise.race([ + delayedTransition, + this._externalTransitionPromise, + ]); + + // consume the delay cancel token (even if it resolved, need to stop tracking its token) + this._consumeCancelToken(cancelDelay); + } + + if (transitionRequest.transition === 'stop') { + // exit the loop + statusCallback(DataSourceState.Closed, transitionRequest.err); + lastTransition = transitionRequest.transition; + break; + } + + lastTransition = transitionRequest.transition; + } + + // reset so that run can be called again in the future + this._reset(); + } + + async stop() { + this._cancelTokens.forEach((cancel) => cancel()); + this._cancelTokens = []; + this._externalTransitionResolve?.({ transition: 'stop' }); + } + + private _reset() { + this._stopped = true; + this._initPhaseActive = this._initializers.length > 0; // init phase if we have initializers; + this._currentPosition = 0; + this._externalTransitionPromise = new Promise((tr) => { + this._externalTransitionResolve = tr; + }); + // intentionally not resetting the backoff to avoid a code path that could circumvent throttling + } + + private _pickDataSource(transition: Transition | undefined): DataSource | undefined { + switch (transition) { + case 'switchToSync': + this._initPhaseActive = false; // one way toggle to false, unless this class is reset() + this._currentPosition = 0; + break; + case 'fallback': + this._currentPosition += 1; + break; + case 'recover': + this._currentPosition = 0; + break; + case 'none': + default: + // don't do anything in this case + break; + } + + if (this._initPhaseActive) { + // We don't loop back through initializers, so if outside range of initializers, instead return undefined. + if (this._currentPosition > this._initializers.length - 1) { + return undefined; + } + + return this._initializers[this._currentPosition](); + } + // getting here indicates we are using a synchronizer + + // if no synchronizers, return undefined + if (this._synchronizers.length <= 0) { + return undefined; + } + this._currentPosition %= this._synchronizers.length; // modulate position to loop back to start if necessary + if (this._currentPosition > this._synchronizers.length - 1) { + // this is only possible if no synchronizers were provided + return undefined; + } + return this._synchronizers[this._currentPosition](); + } + + /** + * @returns the transition condition for the provided data source state or undefined + * if there is no transition condition + */ + private _lookupTransitionCondition( + state: DataSourceState, + excludeRecover: boolean, + ): { durationMS: number; transition: Transition } | undefined { + const condition = this._transitionConditions[state]; + + // exclude recovery can happen for certain initializers/synchronizers (ex: the primary synchronizer shouldn't recover to itself) + if (!condition || (excludeRecover && condition.transition === 'recover')) { + return undefined; + } + + return condition; + } + + private _cancellableDelay = (delayMS: number) => { + let timeout: ReturnType | undefined; + const promise = new Promise((res, _) => { + timeout = setTimeout(res, delayMS); + }); + return { + promise, + cancel() { + if (timeout) { + clearTimeout(timeout); + timeout = undefined; + } + }, + }; + }; + + private _consumeCancelToken(cancel: () => void) { + cancel(); + const index = this._cancelTokens.indexOf(cancel, 0); + if (index > -1) { + this._cancelTokens.splice(index, 1); + } + } +} diff --git a/packages/shared/common/src/datasource/index.ts b/packages/shared/common/src/datasource/index.ts index fe4b250c5e..237da787e0 100644 --- a/packages/shared/common/src/datasource/index.ts +++ b/packages/shared/common/src/datasource/index.ts @@ -1,3 +1,5 @@ +import { Backoff, DefaultBackoff } from './Backoff'; +import { CompositeDataSource } from './CompositeDataSource'; import { DataSourceErrorKind } from './DataSourceErrorKinds'; import { LDFileDataSourceError, @@ -7,6 +9,9 @@ import { } from './errors'; export { + Backoff, + CompositeDataSource, + DefaultBackoff, DataSourceErrorKind, LDFileDataSourceError, LDPollingError, diff --git a/packages/shared/common/src/index.ts b/packages/shared/common/src/index.ts index ae3f9daf08..060ed1bf5a 100644 --- a/packages/shared/common/src/index.ts +++ b/packages/shared/common/src/index.ts @@ -2,7 +2,10 @@ import AttributeReference from './AttributeReference'; import Context from './Context'; import ContextFilter from './ContextFilter'; import { + Backoff, + CompositeDataSource, DataSourceErrorKind, + DefaultBackoff, LDFileDataSourceError, LDPollingError, LDStreamingError, @@ -22,7 +25,10 @@ export { AttributeReference, Context, ContextFilter, + CompositeDataSource, DataSourceErrorKind, + Backoff, + DefaultBackoff, LDPollingError, LDStreamingError, StreamingErrorHandler, diff --git a/packages/shared/sdk-server/src/options/Configuration.ts b/packages/shared/sdk-server/src/options/Configuration.ts index 77baf0de39..fe14a43f53 100644 --- a/packages/shared/sdk-server/src/options/Configuration.ts +++ b/packages/shared/sdk-server/src/options/Configuration.ts @@ -247,10 +247,6 @@ export default class Configuration { this.pollInterval = validatedOptions.pollInterval; this.proxyOptions = validatedOptions.proxyOptions; - this.offline = validatedOptions.offline; - this.stream = validatedOptions.stream; - this.streamInitialReconnectDelay = validatedOptions.streamInitialReconnectDelay; - this.useLdd = validatedOptions.useLdd; this.sendEvents = validatedOptions.sendEvents; this.allAttributesPrivate = validatedOptions.allAttributesPrivate; this.privateAttributes = validatedOptions.privateAttributes; @@ -264,6 +260,11 @@ export default class Configuration { this.tags = new ApplicationTags(validatedOptions); this.diagnosticRecordingInterval = validatedOptions.diagnosticRecordingInterval; + this.offline = validatedOptions.offline; + this.stream = validatedOptions.stream; + this.streamInitialReconnectDelay = validatedOptions.streamInitialReconnectDelay; + this.useLdd = validatedOptions.useLdd; + if (TypeValidators.Function.is(validatedOptions.updateProcessor)) { // @ts-ignore this.updateProcessorFactory = validatedOptions.updateProcessor;