Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/providers/flagd/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"current-version": "echo $npm_package_version"
},
"peerDependencies": {
"@grpc/grpc-js": "~1.8.0 || ~1.9.0 || ~1.10.0 || ~1.11.0 || ~1.12.0 || ~1.13.0",
"@grpc/grpc-js": "~1.8.0 || ~1.9.0 || ~1.10.0 || ~1.11.0 || ~1.12.0 || ~1.13.0 || ~1.14.0",
"@openfeature/server-sdk": "^1.17.0"
},
"dependencies": {
Expand Down
13 changes: 13 additions & 0 deletions libs/providers/flagd/src/lib/SyncMetadataHook.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import type { Hook, EvaluationContext, BeforeHookContext, HookHints } from '@openfeature/server-sdk';

export class SyncMetadataHook implements Hook {
enrichedContext: () => EvaluationContext;

constructor(enrichedContext: () => EvaluationContext) {
this.enrichedContext = enrichedContext;
}

public before(hookContext: BeforeHookContext, hookHints?: HookHints): EvaluationContext {
return this.enrichedContext();
}
}
46 changes: 33 additions & 13 deletions libs/providers/flagd/src/lib/configuration.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Config, FlagdProviderOptions } from './configuration';
import { getConfig } from './configuration';
import { DEFAULT_MAX_CACHE_SIZE } from './constants';
import type { EvaluationContext } from '@openfeature/server-sdk';

describe('Configuration', () => {
const OLD_ENV = process.env;
Expand All @@ -20,6 +21,7 @@ describe('Configuration', () => {
resolverType: 'rpc',
selector: '',
deadlineMs: 500,
contextEnricher: expect.any(Function),
});
});

Expand All @@ -46,22 +48,39 @@ describe('Configuration', () => {
process.env['FLAGD_OFFLINE_FLAG_SOURCE_PATH'] = offlineFlagSourcePath;
process.env['FLAGD_DEFAULT_AUTHORITY'] = defaultAuthority;

expect(getConfig()).toStrictEqual({
host,
port,
tls,
socketPath,
maxCacheSize,
cache,
resolverType,
selector,
offlineFlagSourcePath,
defaultAuthority,
deadlineMs: 500,
});
expect(getConfig()).toEqual(
expect.objectContaining({
host,
port,
tls,
socketPath,
maxCacheSize,
cache,
resolverType,
selector,
offlineFlagSourcePath,
defaultAuthority,
deadlineMs: 500,
}),
);
});

it('should override context enricher', () => {
const contextEnricher = (syncContext: EvaluationContext | null): EvaluationContext => {
return { ...syncContext, extraKey: 'extraValue' };
};

expect(getConfig({ contextEnricher }).contextEnricher({})).toEqual({ extraKey: 'extraValue' });
});

it('should return identity function', () => {
expect(getConfig().contextEnricher({})).toStrictEqual({});
});

it('should use incoming options over defaults and environment variable', () => {
const contextEnricher = (syncContext: EvaluationContext | null): EvaluationContext => {
return { ...syncContext, extraKey: 'extraValue' };
};
const options: FlagdProviderOptions = {
host: 'test',
port: 3000,
Expand All @@ -72,6 +91,7 @@ describe('Configuration', () => {
selector: '',
defaultAuthority: '',
deadlineMs: 500,
contextEnricher: contextEnricher,
};

process.env['FLAGD_HOST'] = 'override';
Expand Down
25 changes: 20 additions & 5 deletions libs/providers/flagd/src/lib/configuration.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { DEFAULT_MAX_CACHE_SIZE } from './constants';
import type { EvaluationContext } from '@openfeature/server-sdk';

export type CacheOption = 'lru' | 'disabled';
export type ResolverType = 'rpc' | 'in-process';
Expand Down Expand Up @@ -83,20 +84,34 @@ export interface Config {
defaultAuthority?: string;
}

export type FlagdProviderOptions = Partial<Config>;
interface FlagdConfig extends Config {
/**
* Function providing an EvaluationContext to mix into every evaluation.
* The syncContext from the SyncFlagsResponse
* (https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.SyncFlagsResponse),
* represented as a {@link dev.openfeature.sdk.Structure}, is passed as an argument.
*
* This function runs every time the provider (re)connects, and its result is cached and used in every evaluation.
* By default, the entire sync response (as a JSON Object) is used.
*/
contextEnricher: (syncContext: EvaluationContext | null) => EvaluationContext;
}

export type FlagdProviderOptions = Partial<FlagdConfig>;

const DEFAULT_CONFIG: Omit<Config, 'port' | 'resolverType'> = {
const DEFAULT_CONFIG: Omit<FlagdConfig, 'port' | 'resolverType'> = {
deadlineMs: 500,
host: 'localhost',
tls: false,
selector: '',
cache: 'lru',
maxCacheSize: DEFAULT_MAX_CACHE_SIZE,
contextEnricher: (syncContext: EvaluationContext | null) => syncContext ?? {},
};

const DEFAULT_RPC_CONFIG: Config = { ...DEFAULT_CONFIG, resolverType: 'rpc', port: 8013 };
const DEFAULT_RPC_CONFIG: FlagdConfig = { ...DEFAULT_CONFIG, resolverType: 'rpc', port: 8013 };

const DEFAULT_IN_PROCESS_CONFIG: Config = { ...DEFAULT_CONFIG, resolverType: 'in-process', port: 8015 };
const DEFAULT_IN_PROCESS_CONFIG: FlagdConfig = { ...DEFAULT_CONFIG, resolverType: 'in-process', port: 8015 };

enum ENV_VAR {
FLAGD_HOST = 'FLAGD_HOST',
Expand Down Expand Up @@ -167,7 +182,7 @@ const getEnvVarConfig = (): Partial<Config> => {
};
};

export function getConfig(options: FlagdProviderOptions = {}) {
export function getConfig(options: FlagdProviderOptions = {}): FlagdConfig {
const envVarConfig = getEnvVarConfig();
const defaultConfig =
options.resolverType == 'in-process' || envVarConfig.resolverType == 'in-process'
Expand Down
30 changes: 25 additions & 5 deletions libs/providers/flagd/src/lib/flagd-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import { getConfig } from './configuration';
import { GRPCService } from './service/grpc/grpc-service';
import type { Service } from './service/service';
import { InProcessService } from './service/in-process/in-process-service';
import type { Hook } from '@openfeature/server-sdk';
import { SyncMetadataHook } from './SyncMetadataHook';

export class FlagdProvider implements Provider {
metadata = {
name: 'flagd',
};

readonly hooks?: Hook[];
readonly runsOn = 'server';
readonly events = new OpenFeatureEventEmitter();
private syncContext: EvaluationContext | null = null;

private readonly _service: Service;

Expand All @@ -30,11 +34,27 @@ export class FlagdProvider implements Provider {
) {
const config = getConfig(options);

this._service = service
? service
: config.resolverType === 'in-process'
? new InProcessService(config, undefined, logger)
: new GRPCService(config, undefined, logger);
if (service === undefined) {
if (config.resolverType === 'in-process') {
this._service = new InProcessService(config, this.setSyncContext.bind(this), undefined, logger);

if (config?.offlineFlagSourcePath === undefined) {
this.hooks = [new SyncMetadataHook(() => config.contextEnricher(this.getSyncContext()))];
}
} else {
this._service = new GRPCService(config, undefined, logger);
}
} else {
this._service = service;
}
}

setSyncContext(context: EvaluationContext) {
this.syncContext = context;
}

getSyncContext(): EvaluationContext | null {
return this.syncContext;
}

async initialize(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const disconnectCallback = jest.fn();
const removeAllListeners = jest.fn();
const cancel = jest.fn();
const destroy = jest.fn();
const setSyncContext = jest.fn();

let onDataCallback: (data: SyncFlagsResponse) => void = () => ({});
let onErrorCallback: (err: Error) => void = () => ({});
Expand Down Expand Up @@ -59,11 +60,12 @@ describe('grpc fetch', () => {

it('should handle data sync and emit callbacks', (done) => {
const flagConfiguration = '{"flags":{}}';
const fetch = new GrpcFetch(cfg, serviceMock);
const fetch = new GrpcFetch(cfg, setSyncContext, serviceMock);
fetch
.connect(dataCallback, reconnectCallback, jest.fn(), disconnectCallback)
.then(() => {
try {
expect(setSyncContext).toHaveBeenCalledTimes(0);
expect(dataCallback).toHaveBeenCalledTimes(1);
expect(dataCallback).toHaveBeenCalledWith(flagConfiguration);
expect(changedCallback).toHaveBeenCalledTimes(0);
Expand All @@ -80,14 +82,40 @@ describe('grpc fetch', () => {
onDataCallback({ flagConfiguration });
});

it('should handle SyncContext from SyncFlagsResponse', (done) => {
const initFlagConfig = '{"flags":{}}';
const syncContext = { test: 'example' };

const fetch = new GrpcFetch(cfg, setSyncContext, serviceMock);
fetch
.connect(dataCallback, reconnectCallback, changedCallback, disconnectCallback)
.then(() => {
try {
// Callback assertions
expect(setSyncContext).toHaveBeenCalledTimes(1);
expect(setSyncContext).toHaveBeenCalledWith(syncContext);

done();
} catch (err) {
done(err);
}
})
.catch((err) => {
done(err);
});

// First connection
onDataCallback({ flagConfiguration: initFlagConfig, syncContext: syncContext });
});

it('should handle data sync reconnection', (done) => {
const initFlagConfig = '{"flags":{}}';
const updatedFlagConfig =
'{"flags":{"test":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"off"}}}';
const reconnectFlagConfig =
'{"flags":{"test":{"state":"ENABLED","variants":{"on":true,"off":false},"defaultVariant":"on"}}}';

const fetch = new GrpcFetch(cfg, serviceMock);
const fetch = new GrpcFetch(cfg, jest.fn(), serviceMock);
fetch
.connect(dataCallback, reconnectCallback, changedCallback, disconnectCallback)
.then(() => {
Expand Down Expand Up @@ -128,7 +156,7 @@ describe('grpc fetch', () => {
});

it('should handle error and watch channel for reconnect', (done) => {
const fetch = new GrpcFetch(cfg, serviceMock);
const fetch = new GrpcFetch(cfg, jest.fn(), serviceMock);
fetch.connect(jest.fn(), jest.fn(), jest.fn(), disconnectCallback).catch((err) => {
try {
expect(err).toBeInstanceOf(Error);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ClientReadableStream, ServiceError, ClientOptions } from '@grpc/grpc-js';
import { credentials } from '@grpc/grpc-js';
import type { Logger } from '@openfeature/server-sdk';
import type { EvaluationContext, Logger } from '@openfeature/server-sdk';
import { GeneralError } from '@openfeature/server-sdk';
import type { SyncFlagsRequest, SyncFlagsResponse } from '../../../../proto/ts/flagd/sync/v1/sync';
import { FlagSyncServiceClient } from '../../../../proto/ts/flagd/sync/v1/sync';
Expand All @@ -15,6 +15,7 @@ export class GrpcFetch implements DataFetch {
private readonly _syncClient: FlagSyncServiceClient;
private readonly _request: SyncFlagsRequest;
private _syncStream: ClientReadableStream<SyncFlagsResponse> | undefined;
private readonly _setSyncContext: (syncContext: EvaluationContext) => void;
private _logger: Logger | undefined;
/**
* Initialized will be set to true once the initial connection is successful
Expand All @@ -29,7 +30,12 @@ export class GrpcFetch implements DataFetch {
*/
private _isConnected = false;

constructor(config: Config, syncServiceClient?: FlagSyncServiceClient, logger?: Logger) {
constructor(
config: Config,
setSyncContext: (syncContext: EvaluationContext) => void,
syncServiceClient?: FlagSyncServiceClient,
logger?: Logger,
) {
const { host, port, tls, socketPath, selector, defaultAuthority } = config;
let clientOptions: ClientOptions | undefined;
if (defaultAuthority) {
Expand All @@ -46,6 +52,7 @@ export class GrpcFetch implements DataFetch {
clientOptions,
);

this._setSyncContext = setSyncContext;
this._logger = logger;
this._request = { providerId: '', selector: selector ? selector : '' };
}
Expand Down Expand Up @@ -84,6 +91,9 @@ export class GrpcFetch implements DataFetch {
this._logger?.debug(`Received sync payload`);

try {
if (data.syncContext) {
this._setSyncContext(data.syncContext);
}
const changes = dataCallback(data.flagConfiguration);
if (this._initialized && changes.length > 0) {
changedCallback(changes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ describe('In-process-service', () => {

it('should sync and allow to resolve flags', async () => {
// given
const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false }, dataFetcher);
const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false }, jest.fn(), dataFetcher);

// when
await service.connect(jest.fn, jest.fn, jest.fn);
Expand All @@ -31,7 +31,11 @@ describe('In-process-service', () => {
it('should include scope as flag metadata', async () => {
// given
const selector = 'devFlags';
const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false, selector }, dataFetcher);
const service = new InProcessService(
{ deadlineMs: 500, host: '', port: 0, tls: false, selector },
jest.fn(),
dataFetcher,
);

// when
await service.connect(jest.fn, jest.fn, jest.fn);
Expand All @@ -44,7 +48,11 @@ describe('In-process-service', () => {
it('should not override existing scope in flag metadata', async () => {
// given
const selector = 'devFlags';
const service = new InProcessService({ deadlineMs: 500, host: '', port: 0, tls: false, selector }, dataFetcher);
const service = new InProcessService(
{ deadlineMs: 500, host: '', port: 0, tls: false, selector },
jest.fn(),
dataFetcher,
);

// when
await service.connect(jest.fn, jest.fn, jest.fn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class InProcessService implements Service {

constructor(
private readonly config: Config,
setSyncContext: (syncContext: EvaluationContext) => void,
dataFetcher?: DataFetch,
logger?: Logger,
) {
Expand All @@ -27,7 +28,7 @@ export class InProcessService implements Service {
? dataFetcher
: config.offlineFlagSourcePath
? new FileFetch(config.offlineFlagSourcePath, logger)
: new GrpcFetch(config, undefined, logger);
: new GrpcFetch(config, setSyncContext, undefined, logger);
}

connect(
Expand Down