diff --git a/libs/providers/flagd/package.json b/libs/providers/flagd/package.json index c954452a3..7b54cfa24 100644 --- a/libs/providers/flagd/package.json +++ b/libs/providers/flagd/package.json @@ -7,7 +7,7 @@ "current-version": "echo $npm_package_version" }, "peerDependencies": { - "@grpc/grpc-js": "~1.8.0 || ~1.9.0 || ~1.10.0 || ~1.11.0 || ~1.12.0 || ~1.13.0", + "@grpc/grpc-js": "~1.8.0 || ~1.9.0 || ~1.10.0 || ~1.11.0 || ~1.12.0 || ~1.13.0 || ~1.14.0", "@openfeature/server-sdk": "^1.17.0" }, "dependencies": { diff --git a/libs/providers/flagd/src/lib/SyncMetadataHook.ts b/libs/providers/flagd/src/lib/SyncMetadataHook.ts new file mode 100644 index 000000000..175db2382 --- /dev/null +++ b/libs/providers/flagd/src/lib/SyncMetadataHook.ts @@ -0,0 +1,13 @@ +import type { Hook, EvaluationContext, BeforeHookContext, HookHints } from '@openfeature/server-sdk'; + +export class SyncMetadataHook implements Hook { + enrichedContext: () => EvaluationContext; + + constructor(enrichedContext: () => EvaluationContext) { + this.enrichedContext = enrichedContext; + } + + public before(hookContext: BeforeHookContext, hookHints?: HookHints): EvaluationContext { + return this.enrichedContext(); + } +} diff --git a/libs/providers/flagd/src/lib/configuration.spec.ts b/libs/providers/flagd/src/lib/configuration.spec.ts index c13fe11e8..df63346a5 100644 --- a/libs/providers/flagd/src/lib/configuration.spec.ts +++ b/libs/providers/flagd/src/lib/configuration.spec.ts @@ -1,6 +1,7 @@ import type { Config, FlagdProviderOptions } from './configuration'; import { getConfig } from './configuration'; import { DEFAULT_MAX_CACHE_SIZE } from './constants'; +import type { EvaluationContext } from '@openfeature/server-sdk'; describe('Configuration', () => { const OLD_ENV = process.env; @@ -20,6 +21,7 @@ describe('Configuration', () => { resolverType: 'rpc', selector: '', deadlineMs: 500, + contextEnricher: expect.any(Function), }); }); @@ -46,22 +48,39 @@ describe('Configuration', () => { process.env['FLAGD_OFFLINE_FLAG_SOURCE_PATH'] = offlineFlagSourcePath; process.env['FLAGD_DEFAULT_AUTHORITY'] = defaultAuthority; - expect(getConfig()).toStrictEqual({ - host, - port, - tls, - socketPath, - maxCacheSize, - cache, - resolverType, - selector, - offlineFlagSourcePath, - defaultAuthority, - deadlineMs: 500, - }); + expect(getConfig()).toEqual( + expect.objectContaining({ + host, + port, + tls, + socketPath, + maxCacheSize, + cache, + resolverType, + selector, + offlineFlagSourcePath, + defaultAuthority, + deadlineMs: 500, + }), + ); + }); + + it('should override context enricher', () => { + const contextEnricher = (syncContext: EvaluationContext | null): EvaluationContext => { + return { ...syncContext, extraKey: 'extraValue' }; + }; + + expect(getConfig({ contextEnricher }).contextEnricher({})).toEqual({ extraKey: 'extraValue' }); + }); + + it('should return identity function', () => { + expect(getConfig().contextEnricher({})).toStrictEqual({}); }); it('should use incoming options over defaults and environment variable', () => { + const contextEnricher = (syncContext: EvaluationContext | null): EvaluationContext => { + return { ...syncContext, extraKey: 'extraValue' }; + }; const options: FlagdProviderOptions = { host: 'test', port: 3000, @@ -72,6 +91,7 @@ describe('Configuration', () => { selector: '', defaultAuthority: '', deadlineMs: 500, + contextEnricher: contextEnricher, }; process.env['FLAGD_HOST'] = 'override'; diff --git a/libs/providers/flagd/src/lib/configuration.ts b/libs/providers/flagd/src/lib/configuration.ts index d9076c846..7869aba5e 100644 --- a/libs/providers/flagd/src/lib/configuration.ts +++ b/libs/providers/flagd/src/lib/configuration.ts @@ -1,4 +1,5 @@ import { DEFAULT_MAX_CACHE_SIZE } from './constants'; +import type { EvaluationContext } from '@openfeature/server-sdk'; export type CacheOption = 'lru' | 'disabled'; export type ResolverType = 'rpc' | 'in-process'; @@ -83,20 +84,34 @@ export interface Config { defaultAuthority?: string; } -export type FlagdProviderOptions = Partial; +interface FlagdConfig extends Config { + /** + * Function providing an EvaluationContext to mix into every evaluation. + * The syncContext from the SyncFlagsResponse + * (https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.SyncFlagsResponse), + * represented as a {@link dev.openfeature.sdk.Structure}, is passed as an argument. + * + * This function runs every time the provider (re)connects, and its result is cached and used in every evaluation. + * By default, the entire sync response (as a JSON Object) is used. + */ + contextEnricher: (syncContext: EvaluationContext | null) => EvaluationContext; +} + +export type FlagdProviderOptions = Partial; -const DEFAULT_CONFIG: Omit = { +const DEFAULT_CONFIG: Omit = { deadlineMs: 500, host: 'localhost', tls: false, selector: '', cache: 'lru', maxCacheSize: DEFAULT_MAX_CACHE_SIZE, + contextEnricher: (syncContext: EvaluationContext | null) => syncContext ?? {}, }; -const DEFAULT_RPC_CONFIG: Config = { ...DEFAULT_CONFIG, resolverType: 'rpc', port: 8013 }; +const DEFAULT_RPC_CONFIG: FlagdConfig = { ...DEFAULT_CONFIG, resolverType: 'rpc', port: 8013 }; -const DEFAULT_IN_PROCESS_CONFIG: Config = { ...DEFAULT_CONFIG, resolverType: 'in-process', port: 8015 }; +const DEFAULT_IN_PROCESS_CONFIG: FlagdConfig = { ...DEFAULT_CONFIG, resolverType: 'in-process', port: 8015 }; enum ENV_VAR { FLAGD_HOST = 'FLAGD_HOST', @@ -167,7 +182,7 @@ const getEnvVarConfig = (): Partial => { }; }; -export function getConfig(options: FlagdProviderOptions = {}) { +export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig { const envVarConfig = getEnvVarConfig(); const defaultConfig = options.resolverType == 'in-process' || envVarConfig.resolverType == 'in-process' diff --git a/libs/providers/flagd/src/lib/flagd-provider.ts b/libs/providers/flagd/src/lib/flagd-provider.ts index aff6a5e2e..5507e035c 100644 --- a/libs/providers/flagd/src/lib/flagd-provider.ts +++ b/libs/providers/flagd/src/lib/flagd-provider.ts @@ -5,14 +5,18 @@ import { getConfig } from './configuration'; import { GRPCService } from './service/grpc/grpc-service'; import type { Service } from './service/service'; import { InProcessService } from './service/in-process/in-process-service'; +import type { Hook } from '@openfeature/server-sdk'; +import { SyncMetadataHook } from './SyncMetadataHook'; export class FlagdProvider implements Provider { metadata = { name: 'flagd', }; + readonly hooks?: Hook[]; readonly runsOn = 'server'; readonly events = new OpenFeatureEventEmitter(); + private syncContext: EvaluationContext | null = null; private readonly _service: Service; @@ -30,11 +34,27 @@ export class FlagdProvider implements Provider { ) { const config = getConfig(options); - this._service = service - ? service - : config.resolverType === 'in-process' - ? new InProcessService(config, undefined, logger) - : new GRPCService(config, undefined, logger); + if (service === undefined) { + if (config.resolverType === 'in-process') { + this._service = new InProcessService(config, this.setSyncContext.bind(this), undefined, logger); + + if (config?.offlineFlagSourcePath === undefined) { + this.hooks = [new SyncMetadataHook(() => config.contextEnricher(this.getSyncContext()))]; + } + } else { + this._service = new GRPCService(config, undefined, logger); + } + } else { + this._service = service; + } + } + + setSyncContext(context: EvaluationContext) { + this.syncContext = context; + } + + getSyncContext(): EvaluationContext | null { + return this.syncContext; } async initialize(): Promise { diff --git a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts index ed9035998..7d28b3a5e 100644 --- a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts +++ b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.spec.ts @@ -18,6 +18,7 @@ const disconnectCallback = jest.fn(); const removeAllListeners = jest.fn(); const cancel = jest.fn(); const destroy = jest.fn(); +const setSyncContext = jest.fn(); let onDataCallback: (data: SyncFlagsResponse) => void = () => ({}); let onErrorCallback: (err: Error) => void = () => ({}); @@ -59,11 +60,12 @@ describe('grpc fetch', () => { it('should handle data sync and emit callbacks', (done) => { const flagConfiguration = '{"flags":{}}'; - const fetch = new GrpcFetch(cfg, serviceMock); + const fetch = new GrpcFetch(cfg, setSyncContext, serviceMock); fetch .connect(dataCallback, reconnectCallback, jest.fn(), disconnectCallback) .then(() => { try { + expect(setSyncContext).toHaveBeenCalledTimes(0); expect(dataCallback).toHaveBeenCalledTimes(1); expect(dataCallback).toHaveBeenCalledWith(flagConfiguration); expect(changedCallback).toHaveBeenCalledTimes(0); @@ -80,6 +82,32 @@ describe('grpc fetch', () => { onDataCallback({ flagConfiguration }); }); + it('should handle SyncContext from SyncFlagsResponse', (done) => { + const initFlagConfig = '{"flags":{}}'; + const syncContext = { test: 'example' }; + + const fetch = new GrpcFetch(cfg, setSyncContext, serviceMock); + fetch + .connect(dataCallback, reconnectCallback, changedCallback, disconnectCallback) + .then(() => { + try { + // Callback assertions + expect(setSyncContext).toHaveBeenCalledTimes(1); + expect(setSyncContext).toHaveBeenCalledWith(syncContext); + + done(); + } catch (err) { + done(err); + } + }) + .catch((err) => { + done(err); + }); + + // First connection + onDataCallback({ flagConfiguration: initFlagConfig, syncContext: syncContext }); + }); + it('should handle data sync reconnection', (done) => { const initFlagConfig = '{"flags":{}}'; const updatedFlagConfig = @@ -87,7 +115,7 @@ describe('grpc fetch', () => { const reconnectFlagConfig = '{"flags":{"test":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"on"}}}'; - const fetch = new GrpcFetch(cfg, serviceMock); + const fetch = new GrpcFetch(cfg, jest.fn(), serviceMock); fetch .connect(dataCallback, reconnectCallback, changedCallback, disconnectCallback) .then(() => { @@ -128,7 +156,7 @@ describe('grpc fetch', () => { }); it('should handle error and watch channel for reconnect', (done) => { - const fetch = new GrpcFetch(cfg, serviceMock); + const fetch = new GrpcFetch(cfg, jest.fn(), serviceMock); fetch.connect(jest.fn(), jest.fn(), jest.fn(), disconnectCallback).catch((err) => { try { expect(err).toBeInstanceOf(Error); diff --git a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts index 45c374b3b..2d317b591 100644 --- a/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts +++ b/libs/providers/flagd/src/lib/service/in-process/grpc/grpc-fetch.ts @@ -1,6 +1,6 @@ import type { ClientReadableStream, ServiceError, ClientOptions } from '@grpc/grpc-js'; import { credentials } from '@grpc/grpc-js'; -import type { Logger } from '@openfeature/server-sdk'; +import type { EvaluationContext, Logger } from '@openfeature/server-sdk'; import { GeneralError } from '@openfeature/server-sdk'; import type { SyncFlagsRequest, SyncFlagsResponse } from '../../../../proto/ts/flagd/sync/v1/sync'; import { FlagSyncServiceClient } from '../../../../proto/ts/flagd/sync/v1/sync'; @@ -15,6 +15,7 @@ export class GrpcFetch implements DataFetch { private readonly _syncClient: FlagSyncServiceClient; private readonly _request: SyncFlagsRequest; private _syncStream: ClientReadableStream | undefined; + private readonly _setSyncContext: (syncContext: EvaluationContext) => void; private _logger: Logger | undefined; /** * Initialized will be set to true once the initial connection is successful @@ -29,7 +30,12 @@ export class GrpcFetch implements DataFetch { */ private _isConnected = false; - constructor(config: Config, syncServiceClient?: FlagSyncServiceClient, logger?: Logger) { + constructor( + config: Config, + setSyncContext: (syncContext: EvaluationContext) => void, + syncServiceClient?: FlagSyncServiceClient, + logger?: Logger, + ) { const { host, port, tls, socketPath, selector, defaultAuthority } = config; let clientOptions: ClientOptions | undefined; if (defaultAuthority) { @@ -46,6 +52,7 @@ export class GrpcFetch implements DataFetch { clientOptions, ); + this._setSyncContext = setSyncContext; this._logger = logger; this._request = { providerId: '', selector: selector ? selector : '' }; } @@ -84,6 +91,9 @@ export class GrpcFetch implements DataFetch { this._logger?.debug(`Received sync payload`); try { + if (data.syncContext) { + this._setSyncContext(data.syncContext); + } const changes = dataCallback(data.flagConfiguration); if (this._initialized && changes.length > 0) { changedCallback(changes); diff --git a/libs/providers/flagd/src/lib/service/in-process/in-process-service.spec.ts b/libs/providers/flagd/src/lib/service/in-process/in-process-service.spec.ts index 77bbe7eea..02653f0bc 100644 --- a/libs/providers/flagd/src/lib/service/in-process/in-process-service.spec.ts +++ b/libs/providers/flagd/src/lib/service/in-process/in-process-service.spec.ts @@ -13,7 +13,7 @@ describe('In-process-service', () => { it('should sync and allow to resolve flags', async () => { // given - const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false }, dataFetcher); + const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false }, jest.fn(), dataFetcher); // when await service.connect(jest.fn, jest.fn, jest.fn); @@ -31,7 +31,11 @@ describe('In-process-service', () => { it('should include scope as flag metadata', async () => { // given const selector = 'devFlags'; - const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false, selector }, dataFetcher); + const service = new InProcessService( + { deadlineMs: 500, host: '', port: 0, tls: false, selector }, + jest.fn(), + dataFetcher, + ); // when await service.connect(jest.fn, jest.fn, jest.fn); @@ -44,7 +48,11 @@ describe('In-process-service', () => { it('should not override existing scope in flag metadata', async () => { // given const selector = 'devFlags'; - const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false, selector }, dataFetcher); + const service = new InProcessService( + { deadlineMs: 500, host: '', port: 0, tls: false, selector }, + jest.fn(), + dataFetcher, + ); // when await service.connect(jest.fn, jest.fn, jest.fn); diff --git a/libs/providers/flagd/src/lib/service/in-process/in-process-service.ts b/libs/providers/flagd/src/lib/service/in-process/in-process-service.ts index cdcdb2c96..29e2d1ea0 100644 --- a/libs/providers/flagd/src/lib/service/in-process/in-process-service.ts +++ b/libs/providers/flagd/src/lib/service/in-process/in-process-service.ts @@ -19,6 +19,7 @@ export class InProcessService implements Service { constructor( private readonly config: Config, + setSyncContext: (syncContext: EvaluationContext) => void, dataFetcher?: DataFetch, logger?: Logger, ) { @@ -27,7 +28,7 @@ export class InProcessService implements Service { ? dataFetcher : config.offlineFlagSourcePath ? new FileFetch(config.offlineFlagSourcePath, logger) - : new GrpcFetch(config, undefined, logger); + : new GrpcFetch(config, setSyncContext, undefined, logger); } connect( diff --git a/libs/shared/flagd-core/test-harness b/libs/shared/flagd-core/test-harness index 59c3c3ccf..fdce98780 160000 --- a/libs/shared/flagd-core/test-harness +++ b/libs/shared/flagd-core/test-harness @@ -1 +1 @@ -Subproject commit 59c3c3ccfb018db82281684d231067e332c8103d +Subproject commit fdce98780f5811bd4672fb7f2b56a6be05fc46d2