diff --git a/packages/sdk/browser/__tests__/BrowserDataManager.test.ts b/packages/sdk/browser/__tests__/BrowserDataManager.test.ts index 8266e600e9..36b58b56bf 100644 --- a/packages/sdk/browser/__tests__/BrowserDataManager.test.ts +++ b/packages/sdk/browser/__tests__/BrowserDataManager.test.ts @@ -144,18 +144,24 @@ describe('given a BrowserDataManager with mocked dependencies', () => { browserConfig, () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/context`; + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/meval/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/meval`; + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), baseHeaders, @@ -177,18 +183,24 @@ describe('given a BrowserDataManager with mocked dependencies', () => { validateOptions({ streaming: true }, logger), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/context`; + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/meval/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/meval`; + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), baseHeaders, @@ -215,18 +227,24 @@ describe('given a BrowserDataManager with mocked dependencies', () => { validateOptions({ streaming: true }, logger), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/context`; + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/meval/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/meval`; + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), baseHeaders, @@ -242,7 +260,7 @@ describe('given a BrowserDataManager with mocked dependencies', () => { await dataManager.identify(identifyResolve, identifyReject, context, identifyOptions); expect(platform.requests.createEventSource).toHaveBeenCalledWith( - '/meval/eyJraW5kIjoidXNlciIsImtleSI6InRlc3QtdXNlciJ9?h=potato&withReasons=true', + '/path/get/eyJraW5kIjoidXNlciIsImtleSI6InRlc3QtdXNlciJ9?h=potato&withReasons=true', expect.anything(), ); }); @@ -256,18 +274,24 @@ describe('given a BrowserDataManager with mocked dependencies', () => { validateOptions({ streaming: false }, logger), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/contexts/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/msdk/evalx/context`; + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { - return `/meval/${base64UrlEncode(_plainContextString, encoding)}`; + return `/path/get/${base64UrlEncode(_plainContextString, encoding)}`; + }, + pathReport(encoding: Encoding, _plainContextString: string): string { + return `/path/report/${base64UrlEncode(_plainContextString, encoding)}`; }, - pathReport(_encoding: Encoding, _plainContextString: string): string { - return `/meval`; + pathPing(encoding: Encoding, _plainContextString: string): string { + return `/path/ping/${base64UrlEncode(_plainContextString, encoding)}`; }, }), baseHeaders, @@ -283,7 +307,7 @@ describe('given a BrowserDataManager with mocked dependencies', () => { await dataManager.identify(identifyResolve, identifyReject, context, identifyOptions); expect(platform.requests.fetch).toHaveBeenCalledWith( - '/msdk/evalx/contexts/eyJraW5kIjoidXNlciIsImtleSI6InRlc3QtdXNlciJ9?withReasons=true&h=potato', + '/path/get/eyJraW5kIjoidXNlciIsImtleSI6InRlc3QtdXNlciJ9?withReasons=true&h=potato', expect.anything(), ); }); diff --git a/packages/sdk/browser/src/BrowserClient.ts b/packages/sdk/browser/src/BrowserClient.ts index 454b01f8d6..892ed688d1 100644 --- a/packages/sdk/browser/src/BrowserClient.ts +++ b/packages/sdk/browser/src/BrowserClient.ts @@ -140,6 +140,11 @@ export class BrowserClient extends LDClientImpl implements LDClient { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/sdk/evalx/${clientSideId}/context`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + // Note: if you are seeing this error, it is a coding error. This DataSourcePaths implementation is for polling endpoints. /ping is not currently + // used in a polling situation. It is probably the case that this was called by streaming logic erroneously. + throw new Error('Ping for polling unsupported.'); + }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { @@ -148,6 +153,9 @@ export class BrowserClient extends LDClientImpl implements LDClient { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/eval/${clientSideId}`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return `/ping/${clientSideId}`; + }, }), baseHeaders, emitter, diff --git a/packages/sdk/browser/src/BrowserDataManager.ts b/packages/sdk/browser/src/BrowserDataManager.ts index 4a00d769d9..00f777f5e5 100644 --- a/packages/sdk/browser/src/BrowserDataManager.ts +++ b/packages/sdk/browser/src/BrowserDataManager.ts @@ -6,13 +6,12 @@ import { DataSourcePaths, DataSourceState, FlagManager, - getPollingUri, internal, LDEmitter, LDHeaders, LDIdentifyOptions, + makeRequestor, Platform, - Requestor, } from '@launchdarkly/js-client-sdk-common'; import { readFlagsFromBootstrap } from './bootstrap'; @@ -92,23 +91,35 @@ export default class BrowserDataManager extends BaseDataManager { if (await this.flagManager.loadCached(context)) { this._debugLog('Identify - Flags loaded from cache. Continuing to initialize via a poll.'); } - const plainContextString = JSON.stringify(Context.toLDContext(context)); - const requestor = this._getRequestor(plainContextString); - await this._finishIdentifyFromPoll(requestor, context, identifyResolve, identifyReject); - } + await this._finishIdentifyFromPoll(context, identifyResolve, identifyReject); + } this._updateStreamingState(); } private async _finishIdentifyFromPoll( - requestor: Requestor, context: Context, identifyResolve: () => void, identifyReject: (err: Error) => void, ) { try { this.dataSourceStatusManager.requestStateUpdate(DataSourceState.Initializing); - const payload = await requestor.requestPayload(); + + const plainContextString = JSON.stringify(Context.toLDContext(context)); + const pollingRequestor = makeRequestor( + plainContextString, + this.config.serviceEndpoints, + this.getPollingPaths(), + this.platform.requests, + this.platform.encoding!, + this.baseHeaders, + [], + this.config.withReasons, + this.config.useReport, + this._secureModeHash, + ); + + const payload = await pollingRequestor.requestPayload(); try { const listeners = this.createStreamListeners(context, identifyResolve); const putListener = listeners.get('put'); @@ -196,35 +207,29 @@ export default class BrowserDataManager extends BaseDataManager { const rawContext = Context.toLDContext(context)!; this.updateProcessor?.close(); - this.createStreamingProcessor(rawContext, context, identifyResolve, identifyReject); - this.updateProcessor!.start(); - } - - private _getRequestor(plainContextString: string): Requestor { - const paths = this.getPollingPaths(); - const path = this.config.useReport - ? paths.pathReport(this.platform.encoding!, plainContextString) - : paths.pathGet(this.platform.encoding!, plainContextString); - - const parameters: { key: string; value: string }[] = []; - if (this.config.withReasons) { - parameters.push({ key: 'withReasons', value: 'true' }); - } - if (this._secureModeHash) { - parameters.push({ key: 'h', value: this._secureModeHash }); - } + const plainContextString = JSON.stringify(Context.toLDContext(context)); + const pollingRequestor = makeRequestor( + plainContextString, + this.config.serviceEndpoints, + this.getPollingPaths(), + this.platform.requests, + this.platform.encoding!, + this.baseHeaders, + [], + this.config.withReasons, + this.config.useReport, + this._secureModeHash, + ); - const headers: { [key: string]: string } = { ...this.baseHeaders }; - let body; - let method = 'GET'; - if (this.config.useReport) { - method = 'REPORT'; - headers['content-type'] = 'application/json'; - body = plainContextString; // context is in body for REPORT - } + this.createStreamingProcessor( + rawContext, + context, + pollingRequestor, + identifyResolve, + identifyReject, + ); - const uri = getPollingUri(this.config.serviceEndpoints, path, parameters); - return new Requestor(this.platform.requests, uri, headers, method, body); + this.updateProcessor!.start(); } } diff --git a/packages/sdk/react-native/__tests__/MobileDataManager.test.ts b/packages/sdk/react-native/__tests__/MobileDataManager.test.ts index 9fdb9fa02f..27ac0de488 100644 --- a/packages/sdk/react-native/__tests__/MobileDataManager.test.ts +++ b/packages/sdk/react-native/__tests__/MobileDataManager.test.ts @@ -133,6 +133,11 @@ describe('given a MobileDataManager with mocked dependencies', () => { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/msdk/evalx/context`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + // Note: if you are seeing this error, it is a coding error. This DataSourcePaths implementation is for polling endpoints. /ping is not currently + // used in a polling situation. It is probably the case that this was called by streaming logic erroneously. + throw new Error('Ping for polling unsupported.'); + }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { @@ -141,6 +146,9 @@ describe('given a MobileDataManager with mocked dependencies', () => { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/meval`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return `/mping`; + }, }), baseHeaders, emitter, diff --git a/packages/sdk/react-native/src/MobileDataManager.ts b/packages/sdk/react-native/src/MobileDataManager.ts index 5eecd2c811..bf50fdea00 100644 --- a/packages/sdk/react-native/src/MobileDataManager.ts +++ b/packages/sdk/react-native/src/MobileDataManager.ts @@ -9,6 +9,7 @@ import { LDEmitter, LDHeaders, LDIdentifyOptions, + makeRequestor, Platform, } from '@launchdarkly/js-client-sdk-common'; @@ -95,13 +96,38 @@ export default class MobileDataManager extends BaseDataManager { ) { const rawContext = Context.toLDContext(context)!; + const plainContextString = JSON.stringify(Context.toLDContext(context)); + const requestor = makeRequestor( + plainContextString, + this.config.serviceEndpoints, + this.getPollingPaths(), + this.platform.requests, + this.platform.encoding!, + this.baseHeaders, + [], + this.config.useReport, + this.config.withReasons, + ); + this.updateProcessor?.close(); switch (this.connectionMode) { case 'streaming': - this.createStreamingProcessor(rawContext, context, identifyResolve, identifyReject); + this.createStreamingProcessor( + rawContext, + context, + requestor, + identifyResolve, + identifyReject, + ); break; case 'polling': - this.createPollingProcessor(rawContext, context, identifyResolve, identifyReject); + this.createPollingProcessor( + rawContext, + context, + requestor, + identifyResolve, + identifyReject, + ); break; default: break; diff --git a/packages/sdk/react-native/src/ReactNativeLDClient.ts b/packages/sdk/react-native/src/ReactNativeLDClient.ts index 7e46abc2f3..4fcd1b6770 100644 --- a/packages/sdk/react-native/src/ReactNativeLDClient.ts +++ b/packages/sdk/react-native/src/ReactNativeLDClient.ts @@ -89,6 +89,11 @@ export default class ReactNativeLDClient extends LDClientImpl { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/msdk/evalx/context`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + // Note: if you are seeing this error, it is a coding error. This DataSourcePaths implementation is for polling endpoints. /ping is not currently + // used in a polling situation. It is probably the case that this was called by streaming logic erroneously. + throw new Error('Ping for polling unsupported.'); + }, }), () => ({ pathGet(encoding: Encoding, _plainContextString: string): string { @@ -97,6 +102,9 @@ export default class ReactNativeLDClient extends LDClientImpl { pathReport(_encoding: Encoding, _plainContextString: string): string { return `/meval`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return `/mping`; + }, }), baseHeaders, emitter, diff --git a/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts b/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts index cdbe32cb2d..b760e94104 100644 --- a/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts +++ b/packages/shared/sdk-client/__tests__/LDClientImpl.test.ts @@ -273,8 +273,9 @@ describe('sdk-client object', () => { const carContext: LDContext = { kind: 'car', key: 'test-car' }; await expect(ldc.identify(carContext)).rejects.toThrow('test-error'); - expect(logger.error).toHaveBeenCalledTimes(1); - expect(logger.error).toHaveBeenCalledWith(expect.stringMatching(/^error:.*test-error/)); + expect(logger.error).toHaveBeenCalledTimes(2); + expect(logger.error).toHaveBeenNthCalledWith(1, expect.stringMatching(/^error:.*test-error/)); + expect(logger.error).toHaveBeenNthCalledWith(2, expect.stringContaining('Received error 404')); }); test('identify change and error listeners', async () => { diff --git a/packages/shared/sdk-client/__tests__/TestDataManager.ts b/packages/shared/sdk-client/__tests__/TestDataManager.ts index b7d09e410a..25ccbd4a1a 100644 --- a/packages/shared/sdk-client/__tests__/TestDataManager.ts +++ b/packages/shared/sdk-client/__tests__/TestDataManager.ts @@ -10,9 +10,10 @@ import { import { LDIdentifyOptions } from '../src/api'; import { Configuration } from '../src/configuration/Configuration'; import { BaseDataManager, DataManagerFactory } from '../src/DataManager'; +import { DataSourcePaths } from '../src/datasource/DataSourceConfig'; +import { makeRequestor } from '../src/datasource/Requestor'; import { FlagManager } from '../src/flag-manager/FlagManager'; import LDEmitter from '../src/LDEmitter'; -import { DataSourcePaths } from '../src/streaming/DataSourceConfig'; export default class TestDataManager extends BaseDataManager { constructor( @@ -75,7 +76,18 @@ export default class TestDataManager extends BaseDataManager { this.updateProcessor?.close(); - this.createStreamingProcessor(rawContext, context, identifyResolve, identifyReject); + const requestor = makeRequestor( + JSON.stringify(Context.toLDContext(context)), + this.config.serviceEndpoints, + this.getPollingPaths(), + this.platform.requests, + this.platform.encoding!, + this.baseHeaders, + [], + this.config.useReport, + this.config.withReasons, + ); + this.createStreamingProcessor(rawContext, context, requestor, identifyResolve, identifyReject); this.updateProcessor!.start(); } @@ -107,6 +119,9 @@ export function makeTestDataManagerFactory( pathReport(_encoding: Encoding, _plainContextString: string): string { return `/msdk/evalx/context`; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return `/mping`; + }, }), () => ({ pathGet(_encoding: Encoding, _plainContextString: string): string { @@ -115,6 +130,9 @@ export function makeTestDataManagerFactory( pathReport(_encoding: Encoding, _plainContextString: string): string { return '/stream/path/report'; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/ping'; + }, }), baseHeaders, emitter, diff --git a/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts b/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts index 52ea9f188c..ab9956d7df 100644 --- a/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts +++ b/packages/shared/sdk-client/__tests__/polling/PollingProcessor.test.ts @@ -5,12 +5,15 @@ import { EventSource, EventSourceCapabilities, EventSourceInitDict, + LDHeaders, Requests, Response, + ServiceEndpoints, } from '@launchdarkly/js-sdk-common'; +import Requestor, { makeRequestor } from '../../src/datasource/Requestor'; import PollingProcessor from '../../src/polling/PollingProcessor'; -import { PollingDataSourceConfig } from '../../src/streaming'; +import { DataSourcePaths } from '../../src/streaming'; function mockResponse(value: string, statusCode: number) { const response: Response = { @@ -55,12 +58,6 @@ function makeRequests(): Requests { }; } -function makeEncoding(): Encoding { - return { - btoa: jest.fn(), - }; -} - const serviceEndpoints = { events: 'mockEventsEndpoint', polling: 'mockPollingEndpoint', @@ -71,39 +68,51 @@ const serviceEndpoints = { payloadFilterKey: 'testPayloadFilterKey', }; -function makeConfig( - pollInterval: number, - withReasons: boolean, - useReport: boolean, - queryParameters?: { key: string; value: string }[], -): PollingDataSourceConfig { - return { - credential: 'the-sdk-key', - serviceEndpoints, - paths: { +function makeTestRequestor(options: { + requests: Requests; + plainContextString?: string; + serviceEndpoints?: ServiceEndpoints; + paths?: DataSourcePaths; + encoding?: Encoding; + baseHeaders?: LDHeaders; + baseQueryParams?: { key: string; value: string }[]; + useReport?: boolean; + withReasons?: boolean; + secureModeHash?: string; +}): Requestor { + return makeRequestor( + options.plainContextString ?? 'mockContextString', + options.serviceEndpoints ?? serviceEndpoints, + options.paths ?? { pathGet(_encoding: Encoding, _plainContextString: string): string { return '/poll/path/get'; }, pathReport(_encoding: Encoding, _plainContextString: string): string { return '/poll/path/report'; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return '/poll/path/ping'; + }, }, - baseHeaders: {}, - withReasons, - useReport, - pollInterval, - queryParameters, - }; + options.requests, + options.encoding ?? { + btoa: jest.fn(), + }, + options.baseHeaders, + options.baseQueryParams, + options.withReasons ?? true, + options.useReport ?? false, + ); } it('makes no requests until it is started', () => { const requests = makeRequests(); // eslint-disable-next-line no-new new PollingProcessor( - 'mockContextString', - makeConfig(1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 1, (_flags) => {}, (_error) => {}, ); @@ -115,13 +124,14 @@ it('includes custom query parameters when specified', () => { const requests = makeRequests(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1, true, false, [ - { key: 'custom', value: 'value' }, - { key: 'custom2', value: 'value2' }, - ]), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + baseQueryParams: [ + { key: 'custom', value: 'value' }, + { key: 'custom2', value: 'value2' }, + ], + }), + 1, (_flags) => {}, (_error) => {}, ); @@ -138,10 +148,10 @@ it('works without any custom query parameters', () => { const requests = makeRequests(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 1, (_flags) => {}, (_error) => {}, ); @@ -158,10 +168,10 @@ it('polls immediately when started', () => { const requests = makeRequests(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 1, (_flags) => {}, (_error) => {}, ); @@ -177,10 +187,10 @@ it('calls callback on success', async () => { const errorCallback = jest.fn(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1000, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 1000, dataCallback, errorCallback, ); @@ -197,10 +207,10 @@ it('polls repeatedly', async () => { requests.fetch = mockFetch('{ "flagA": true }', 200); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.1, dataCallback, errorCallback, ); @@ -234,10 +244,10 @@ it('stops polling when stopped', (done) => { const errorCallback = jest.fn(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.01, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.01, dataCallback, errorCallback, ); @@ -253,18 +263,15 @@ it('stops polling when stopped', (done) => { it('includes the correct headers on requests', () => { const requests = makeRequests(); - - const config = makeConfig(1, true, false); - config.baseHeaders = { - authorization: 'the-sdk-key', - 'user-agent': 'AnSDK/42', - }; - const polling = new PollingProcessor( - 'mockContextString', - config, - requests, - makeEncoding(), + makeTestRequestor({ + requests, + baseHeaders: { + authorization: 'the-sdk-key', + 'user-agent': 'AnSDK/42', + }, + }), + 1, (_flags) => {}, (_error) => {}, ); @@ -286,10 +293,10 @@ it('defaults to using the "GET" method', () => { const requests = makeRequests(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1000, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 1000, (_flags) => {}, (_error) => {}, ); @@ -309,10 +316,11 @@ it('can be configured to use the "REPORT" method', () => { const requests = makeRequests(); const polling = new PollingProcessor( - 'mockContextString', - makeConfig(1000, true, true), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + useReport: true, + }), + 1000, (_flags) => {}, (_error) => {}, ); @@ -343,10 +351,10 @@ it('continues polling after receiving bad JSON', async () => { }; const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.1, dataCallback, errorCallback, logger, @@ -376,10 +384,10 @@ it('continues polling after an exception thrown during a request', async () => { }; const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.1, dataCallback, errorCallback, logger, @@ -412,10 +420,10 @@ it('can handle recoverable http errors', async () => { }; const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.1, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.1, dataCallback, errorCallback, logger, @@ -444,10 +452,10 @@ it('stops polling on unrecoverable error codes', (done) => { }; const polling = new PollingProcessor( - 'mockContextString', - makeConfig(0.01, true, false), - requests, - makeEncoding(), + makeTestRequestor({ + requests, + }), + 0.01, dataCallback, errorCallback, logger, diff --git a/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts b/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts index 532a7d4cdb..72f5334ac9 100644 --- a/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts +++ b/packages/shared/sdk-client/__tests__/streaming/StreamingProcessor.test.ts @@ -5,13 +5,21 @@ import { EventName, Info, internal, + LDHeaders, LDLogger, LDStreamingError, Platform, ProcessStreamResponse, + Requests, + ServiceEndpoints, } from '@launchdarkly/js-sdk-common'; -import { StreamingDataSourceConfig, StreamingProcessor } from '../../src/streaming'; +import Requestor, { makeRequestor } from '../../src/datasource/Requestor'; +import { + DataSourcePaths, + StreamingDataSourceConfig, + StreamingProcessor, +} from '../../src/streaming'; import { createBasicPlatform } from '../createBasicPlatform'; let logger: LDLogger; @@ -28,15 +36,17 @@ const serviceEndpoints = { const dateNowString = '2023-08-10'; const sdkKey = 'my-sdk-key'; -const event = { - data: { - flags: { - flagkey: { key: 'flagkey', version: 1 }, - }, - segments: { - segkey: { key: 'segkey', version: 2 }, - }, + +const flagData = { + flags: { + flagkey: { key: 'flagkey', version: 1 }, }, + segments: { + segkey: { key: 'segkey', version: 2 }, + }, +}; +const event = { + data: flagData, }; let basicPlatform: Platform; @@ -57,6 +67,9 @@ function getStreamingDataSourceConfig( pathReport(_encoding: Encoding, _plainContextString: string): string { return '/stream/path/report'; }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return '/stream/path/ping'; + }, }, baseHeaders: { authorization: 'my-sdk-key', @@ -88,6 +101,43 @@ const createMockEventSource = (streamUri: string = '', options: any = {}) => ({ close: jest.fn(), }); +function makeTestRequestor(options: { + requests: Requests; + plainContextString?: string; + serviceEndpoints?: ServiceEndpoints; + paths?: DataSourcePaths; + encoding?: Encoding; + baseHeaders?: LDHeaders; + baseQueryParams?: { key: string; value: string }[]; + useReport?: boolean; + withReasons?: boolean; + secureModeHash?: string; +}): Requestor { + return makeRequestor( + options.plainContextString ?? 'mockContextString', + options.serviceEndpoints ?? serviceEndpoints, + options.paths ?? { + pathGet(_encoding: Encoding, _plainContextString: string): string { + return '/polling/path/get'; + }, + pathReport(_encoding: Encoding, _plainContextString: string): string { + return '/polling/path/report'; + }, + pathPing(_encoding: Encoding, _plainContextString: string): string { + return '/polling/path/ping'; + }, + }, + options.requests, + options.encoding ?? { + btoa: jest.fn(), + }, + options.baseHeaders, + options.baseQueryParams, + options.withReasons ?? true, + options.useReport ?? false, + ); +} + describe('given a stream processor', () => { let info: Info; let streamingProcessor: StreamingProcessor; @@ -97,6 +147,7 @@ describe('given a stream processor', () => { let mockListener: ProcessStreamResponse; let mockErrorHandler: jest.Mock; let simulatePutEvent: (e?: any) => void; + let simulatePingEvent: () => void; let simulateError: (e: { status: number; message: string }) => boolean; beforeAll(() => { @@ -123,9 +174,13 @@ describe('given a stream processor', () => { headers: true, customMethod: true, })), + fetch: jest.fn(), } as any; simulatePutEvent = (e: any = event) => { - mockEventSource.addEventListener.mock.calls[0][1](e); + mockEventSource.addEventListener.mock.calls[0][1](e); // put listener is at position 0 + }; + simulatePingEvent = () => { + mockEventSource.addEventListener.mock.calls[2][1](); // ping listener is at position 2 }; simulateError = (e: { status: number; message: string }): boolean => mockEventSource.options.errorFilter(e); @@ -139,28 +194,29 @@ describe('given a stream processor', () => { listeners.set('patch', mockListener); diagnosticsManager = new internal.DiagnosticsManager(sdkKey, basicPlatform, {}); + }); + + afterEach(() => { + streamingProcessor.close(); + jest.resetAllMocks(); + }); + it('uses expected uri and eventSource init args', () => { streamingProcessor = new StreamingProcessor( 'mockContextString', getStreamingDataSourceConfig(), listeners, basicPlatform.requests, basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), diagnosticsManager, mockErrorHandler, logger, ); - - jest.spyOn(streamingProcessor, 'stop'); streamingProcessor.start(); - }); - afterEach(() => { - streamingProcessor.close(); - jest.resetAllMocks(); - }); - - it('uses expected uri and eventSource init args', () => { expect(basicPlatform.requests.createEventSource).toBeCalledWith( `${serviceEndpoints.streaming}/stream/path/get?filter=testPayloadFilterKey`, { @@ -180,6 +236,9 @@ describe('given a stream processor', () => { listeners, basicPlatform.requests, basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), diagnosticsManager, mockErrorHandler, ); @@ -204,6 +263,9 @@ describe('given a stream processor', () => { listeners, basicPlatform.requests, basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), diagnosticsManager, mockErrorHandler, ); @@ -230,6 +292,9 @@ describe('given a stream processor', () => { listeners, basicPlatform.requests, basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), diagnosticsManager, mockErrorHandler, ); @@ -248,6 +313,21 @@ describe('given a stream processor', () => { }); it('adds listeners', () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + expect(mockEventSource.addEventListener).toHaveBeenNthCalledWith( 1, 'put', @@ -261,6 +341,21 @@ describe('given a stream processor', () => { }); it('executes listeners', () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + simulatePutEvent(); const patchHandler = mockEventSource.addEventListener.mock.calls[1][1]; patchHandler(event); @@ -270,6 +365,21 @@ describe('given a stream processor', () => { }); it('passes error to callback if json data is malformed', async () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + (mockListener.deserializeData as jest.Mock).mockReturnValue(false); simulatePutEvent(); @@ -279,6 +389,21 @@ describe('given a stream processor', () => { }); it('calls error handler if event.data prop is missing', async () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + simulatePutEvent({ flags: {} }); expect(mockListener.deserializeData).not.toBeCalled(); @@ -287,6 +412,22 @@ describe('given a stream processor', () => { }); it('closes and stops', async () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + + jest.spyOn(streamingProcessor, 'stop'); + streamingProcessor.start(); streamingProcessor.close(); expect(streamingProcessor.stop).toBeCalled(); @@ -296,6 +437,21 @@ describe('given a stream processor', () => { }); it('creates a stream init event', async () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + const startTime = Date.now(); simulatePutEvent(); @@ -309,6 +465,21 @@ describe('given a stream processor', () => { describe.each([400, 408, 429, 500, 503])('given recoverable http errors', (status) => { it(`continues retrying after error: ${status}`, () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + const startTime = Date.now(); const testError = { status, message: 'retry. recoverable.' }; const willRetry = simulateError(testError); @@ -330,6 +501,21 @@ describe('given a stream processor', () => { describe.each([401, 403])('given irrecoverable http errors', (status) => { it(`stops retrying after error: ${status}`, () => { + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(), + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), + diagnosticsManager, + mockErrorHandler, + logger, + ); + streamingProcessor.start(); + const startTime = Date.now(); const testError = { status, message: 'stopping. irrecoverable.' }; const willRetry = simulateError(testError); @@ -350,6 +536,50 @@ describe('given a stream processor', () => { expect(si.durationMillis).toBeGreaterThanOrEqual(0); }); }); + + it('it uses ping stream and polling when use REPORT and eventsource lacks custom method support', async () => { + basicPlatform.requests.getEventSourceCapabilities = jest.fn(() => ({ + readTimeout: true, + headers: true, + customMethod: false, // simulating event source does not support REPORT + })); + + basicPlatform.requests.fetch = jest.fn().mockResolvedValue({ + headers: jest.doMock, + status: 200, + text: jest.fn().mockResolvedValue(JSON.stringify(flagData)), + }); + + streamingProcessor = new StreamingProcessor( + 'mockContextString', + getStreamingDataSourceConfig(true, true), // use report to true + listeners, + basicPlatform.requests, + basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + useReport: true, + }), + diagnosticsManager, + mockErrorHandler, + ); + streamingProcessor.start(); + + simulatePingEvent(); + + expect(basicPlatform.requests.createEventSource).toHaveBeenLastCalledWith( + `${serviceEndpoints.streaming}/stream/path/ping?withReasons=true&filter=testPayloadFilterKey`, + expect.anything(), + ); + + expect(basicPlatform.requests.fetch).toHaveBeenCalledWith( + '/polling/path/report?withReasons=true&filter=testPayloadFilterKey', + expect.objectContaining({ + method: 'REPORT', + body: 'mockContextString', + }), + ); + }); }); it('includes custom query parameters', () => { @@ -381,6 +611,9 @@ it('includes custom query parameters', () => { listeners, basicPlatform.requests, basicPlatform.encoding!, + makeTestRequestor({ + requests: basicPlatform.requests, + }), diagnosticsManager, () => {}, logger, diff --git a/packages/shared/sdk-client/src/DataManager.ts b/packages/shared/sdk-client/src/DataManager.ts index bf5524280c..14b9e866a8 100644 --- a/packages/shared/sdk-client/src/DataManager.ts +++ b/packages/shared/sdk-client/src/DataManager.ts @@ -16,6 +16,7 @@ import { Configuration } from './configuration/Configuration'; import DataSourceEventHandler from './datasource/DataSourceEventHandler'; import { DataSourceState } from './datasource/DataSourceStatus'; import DataSourceStatusManager from './datasource/DataSourceStatusManager'; +import Requestor from './datasource/Requestor'; import { FlagManager } from './flag-manager/FlagManager'; import LDEmitter from './LDEmitter'; import PollingProcessor from './polling/PollingProcessor'; @@ -107,23 +108,13 @@ export abstract class BaseDataManager implements DataManager { protected createPollingProcessor( context: LDContext, checkedContext: Context, + requestor: Requestor, identifyResolve?: () => void, identifyReject?: (err: Error) => void, ) { const processor = new PollingProcessor( - JSON.stringify(context), - { - credential: this.credential, - serviceEndpoints: this.config.serviceEndpoints, - paths: this.getPollingPaths(), - baseHeaders: this.baseHeaders, - pollInterval: this.config.pollInterval, - withReasons: this.config.withReasons, - useReport: this.config.useReport, - queryParameters: this._connectionParams?.queryParameters, - }, - this.platform.requests, - this.platform.encoding!, + requestor, + this.config.pollInterval, async (flags) => { await this._dataSourceEventHandler.handlePut(checkedContext, flags); identifyResolve?.(); @@ -133,6 +124,7 @@ export abstract class BaseDataManager implements DataManager { this._dataSourceEventHandler.handlePollingError(err); identifyReject?.(err); }, + this.logger, ); this.updateProcessor = this._decorateProcessorWithStatusReporting( @@ -144,6 +136,7 @@ export abstract class BaseDataManager implements DataManager { protected createStreamingProcessor( context: LDContext, checkedContext: Context, + pollingRequestor: Requestor, identifyResolve?: () => void, identifyReject?: (err: Error) => void, ) { @@ -162,12 +155,14 @@ export abstract class BaseDataManager implements DataManager { this.createStreamListeners(checkedContext, identifyResolve), this.platform.requests, this.platform.encoding!, + pollingRequestor, this.diagnosticsManager, (e) => { this.emitter.emit('error', context, e); this._dataSourceEventHandler.handleStreamingError(e); identifyReject?.(e); }, + this.logger, ); this.updateProcessor = this._decorateProcessorWithStatusReporting( diff --git a/packages/shared/sdk-client/src/streaming/DataSourceConfig.ts b/packages/shared/sdk-client/src/datasource/DataSourceConfig.ts similarity index 73% rename from packages/shared/sdk-client/src/streaming/DataSourceConfig.ts rename to packages/shared/sdk-client/src/datasource/DataSourceConfig.ts index 01fc6f9038..9abe66d50e 100644 --- a/packages/shared/sdk-client/src/streaming/DataSourceConfig.ts +++ b/packages/shared/sdk-client/src/datasource/DataSourceConfig.ts @@ -19,6 +19,10 @@ export interface StreamingDataSourceConfig extends DataSourceConfig { } export interface DataSourcePaths { + // Returns the path to get flag data via GET request pathGet(encoding: Encoding, plainContextString: string): string; + // Returns the path to get flag data via REPORT request pathReport(encoding: Encoding, plainContextString: string): string; + // Returns the path to get ping stream notifications when flag data changes + pathPing(encoding: Encoding, plainContextString: string): string; } diff --git a/packages/shared/sdk-client/src/polling/Requestor.ts b/packages/shared/sdk-client/src/datasource/Requestor.ts similarity index 50% rename from packages/shared/sdk-client/src/polling/Requestor.ts rename to packages/shared/sdk-client/src/datasource/Requestor.ts index 449e202cf5..2e88837bf4 100644 --- a/packages/shared/sdk-client/src/polling/Requestor.ts +++ b/packages/shared/sdk-client/src/datasource/Requestor.ts @@ -1,5 +1,14 @@ // eslint-disable-next-line max-classes-per-file -import { HttpErrorResponse, Requests } from '@launchdarkly/js-sdk-common'; +import { + Encoding, + getPollingUri, + HttpErrorResponse, + LDHeaders, + Requests, + ServiceEndpoints, +} from '@launchdarkly/js-sdk-common'; + +import { DataSourcePaths } from './DataSourceConfig'; function isOk(status: number) { return status >= 200 && status <= 299; @@ -47,3 +56,41 @@ export default class Requestor { throw new LDRequestError(`Unexpected status code: ${status}`, status); } } + +export function makeRequestor( + plainContextString: string, + serviceEndpoints: ServiceEndpoints, + paths: DataSourcePaths, + requests: Requests, + encoding: Encoding, + baseHeaders?: LDHeaders, + baseQueryParams?: { key: string; value: string }[], + withReasons?: boolean, + useReport?: boolean, + secureModeHash?: string, +) { + let body; + let method = 'GET'; + const headers: { [key: string]: string } = { ...baseHeaders }; + + if (useReport) { + method = 'REPORT'; + headers['content-type'] = 'application/json'; + body = plainContextString; // context is in body for REPORT + } + + const path = useReport + ? paths.pathReport(encoding, plainContextString) + : paths.pathGet(encoding, plainContextString); + + const parameters: { key: string; value: string }[] = [...(baseQueryParams ?? [])]; + if (withReasons) { + parameters.push({ key: 'withReasons', value: 'true' }); + } + if (secureModeHash) { + parameters.push({ key: 'h', value: secureModeHash }); + } + + const uri = getPollingUri(serviceEndpoints, path, parameters); + return new Requestor(requests, uri, headers, method, body); +} diff --git a/packages/shared/sdk-client/src/index.ts b/packages/shared/sdk-client/src/index.ts index fb356a9449..29f65ac129 100644 --- a/packages/shared/sdk-client/src/index.ts +++ b/packages/shared/sdk-client/src/index.ts @@ -1,9 +1,9 @@ import { LDClientInternalOptions } from './configuration/Configuration'; import DataSourceStatus, { DataSourceState } from './datasource/DataSourceStatus'; import DataSourceStatusErrorInfo from './datasource/DataSourceStatusErrorInfo'; +import Requestor, { makeRequestor } from './datasource/Requestor'; import LDClientImpl from './LDClientImpl'; import LDEmitter, { EventName } from './LDEmitter'; -import Requestor from './polling/Requestor'; export * from '@launchdarkly/js-sdk-common'; @@ -40,7 +40,7 @@ export type { Flag } from './types'; export { DataSourcePaths } from './streaming'; export { BaseDataManager } from './DataManager'; -export { Requestor }; +export { makeRequestor, Requestor }; export { DataSourceStatus, diff --git a/packages/shared/sdk-client/src/polling/PollingProcessor.ts b/packages/shared/sdk-client/src/polling/PollingProcessor.ts index a37a81121f..9366e6d319 100644 --- a/packages/shared/sdk-client/src/polling/PollingProcessor.ts +++ b/packages/shared/sdk-client/src/polling/PollingProcessor.ts @@ -1,19 +1,15 @@ import { DataSourceErrorKind, - Encoding, - getPollingUri, httpErrorMessage, HttpErrorResponse, isHttpRecoverable, LDLogger, LDPollingError, - Requests, subsystem, } from '@launchdarkly/js-sdk-common'; -import { PollingDataSourceConfig } from '../streaming/DataSourceConfig'; +import Requestor, { LDRequestError } from '../datasource/Requestor'; import { Flags } from '../types'; -import Requestor, { LDRequestError } from './Requestor'; export type PollingErrorHandler = (err: LDPollingError) => void; @@ -23,46 +19,15 @@ export type PollingErrorHandler = (err: LDPollingError) => void; export default class PollingProcessor implements subsystem.LDStreamProcessor { private _stopped = false; - private _pollInterval: number; - private _timeoutHandle: any; - private _requestor: Requestor; - constructor( - private readonly _plainContextString: string, - private readonly _dataSourceConfig: PollingDataSourceConfig, - requests: Requests, - encoding: Encoding, + private readonly _requestor: Requestor, + private readonly _pollIntervalSeconds: number, private readonly _dataHandler: (flags: Flags) => void, private readonly _errorHandler?: PollingErrorHandler, private readonly _logger?: LDLogger, - ) { - const path = _dataSourceConfig.useReport - ? _dataSourceConfig.paths.pathReport(encoding, _plainContextString) - : _dataSourceConfig.paths.pathGet(encoding, _plainContextString); - - const parameters: { key: string; value: string }[] = [ - ...(_dataSourceConfig.queryParameters ?? []), - ]; - if (this._dataSourceConfig.withReasons) { - parameters.push({ key: 'withReasons', value: 'true' }); - } - - const uri = getPollingUri(_dataSourceConfig.serviceEndpoints, path, parameters); - this._pollInterval = _dataSourceConfig.pollInterval; - - let method = 'GET'; - const headers: { [key: string]: string } = { ..._dataSourceConfig.baseHeaders }; - let body; - if (_dataSourceConfig.useReport) { - method = 'REPORT'; - headers['content-type'] = 'application/json'; - body = _plainContextString; // context is in body for REPORT - } - - this._requestor = new Requestor(requests, uri, headers, method, body); - } + ) {} private async _poll() { if (this._stopped) { @@ -115,7 +80,7 @@ export default class PollingProcessor implements subsystem.LDStreamProcessor { } const elapsed = Date.now() - startTime; - const sleepFor = Math.max(this._pollInterval * 1000 - elapsed, 0); + const sleepFor = Math.max(this._pollIntervalSeconds * 1000 - elapsed, 0); this._logger?.debug('Elapsed: %d ms, sleeping for %d ms', elapsed, sleepFor); diff --git a/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts index f12c375a3b..f6aad31afb 100644 --- a/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts +++ b/packages/shared/sdk-client/src/streaming/StreamingProcessor.ts @@ -8,6 +8,7 @@ import { HttpErrorResponse, internal, LDLogger, + LDPollingError, LDStreamingError, ProcessStreamResponse, Requests, @@ -15,7 +16,8 @@ import { subsystem, } from '@launchdarkly/js-sdk-common'; -import { StreamingDataSourceConfig } from './DataSourceConfig'; +import { StreamingDataSourceConfig } from '../datasource/DataSourceConfig'; +import Requestor, { LDRequestError } from '../datasource/Requestor'; const reportJsonError = ( type: string, @@ -43,21 +45,19 @@ class StreamingProcessor implements subsystem.LDStreamProcessor { private readonly _listeners: Map, private readonly _requests: Requests, encoding: Encoding, + private readonly _pollingRequestor: Requestor, private readonly _diagnosticsManager?: internal.DiagnosticsManager, private readonly _errorHandler?: internal.StreamingErrorHandler, private readonly _logger?: LDLogger, ) { - // TODO: SC-255969 Implement better REPORT fallback logic + let path: string; if (_dataSourceConfig.useReport && !_requests.getEventSourceCapabilities().customMethod) { - _logger?.error( - "Configuration option useReport is true, but platform's EventSource does not support custom HTTP methods. Streaming may not work.", - ); + path = _dataSourceConfig.paths.pathPing(encoding, _plainContextString); + } else { + path = _dataSourceConfig.useReport + ? _dataSourceConfig.paths.pathReport(encoding, _plainContextString) + : _dataSourceConfig.paths.pathGet(encoding, _plainContextString); } - - const path = _dataSourceConfig.useReport - ? _dataSourceConfig.paths.pathReport(encoding, _plainContextString) - : _dataSourceConfig.paths.pathGet(encoding, _plainContextString); - const parameters: { key: string; value: string }[] = [ ...(_dataSourceConfig.queryParameters ?? []), ]; @@ -178,6 +178,41 @@ class StreamingProcessor implements subsystem.LDStreamProcessor { } }); }); + + // here we set up a listener that will poll when ping is received + eventSource.addEventListener('ping', async () => { + this._logger?.debug('Got PING, going to poll LaunchDarkly for feature flag updates'); + try { + const res = await this._pollingRequestor.requestPayload(); + try { + const payload = JSON.parse(res); + try { + // forward the payload on to the PUT listener + this._listeners.get('put')?.processJson(payload); + } catch (err) { + this._logger?.error(`Exception from data handler: ${err}`); + } + } catch { + this._logger?.error('Polling after ping received invalid data'); + this._logger?.debug(`Invalid JSON follows: ${res}`); + this._errorHandler?.( + new LDPollingError( + DataSourceErrorKind.InvalidData, + 'Malformed JSON data in ping polling response', + ), + ); + } + } catch (err) { + const requestError = err as LDRequestError; + this._errorHandler?.( + new LDPollingError( + DataSourceErrorKind.ErrorResponse, + requestError.message, + requestError.status, + ), + ); + } + }); } stop() { diff --git a/packages/shared/sdk-client/src/streaming/index.ts b/packages/shared/sdk-client/src/streaming/index.ts index 172e6c5f7a..cb1074706e 100644 --- a/packages/shared/sdk-client/src/streaming/index.ts +++ b/packages/shared/sdk-client/src/streaming/index.ts @@ -2,7 +2,7 @@ import { DataSourcePaths, PollingDataSourceConfig, StreamingDataSourceConfig, -} from './DataSourceConfig'; +} from '../datasource/DataSourceConfig'; import StreamingProcessor from './StreamingProcessor'; export { DataSourcePaths, PollingDataSourceConfig, StreamingProcessor, StreamingDataSourceConfig };