diff --git a/lib/core/decision_service/cmab/cmab_service.spec.ts b/lib/core/decision_service/cmab/cmab_service.spec.ts index dce84f6e1..38ee205e4 100644 --- a/lib/core/decision_service/cmab/cmab_service.spec.ts +++ b/lib/core/decision_service/cmab/cmab_service.spec.ts @@ -1,4 +1,20 @@ -import { describe, it, expect, vi, Mocked, Mock, MockInstance, beforeEach, afterEach } from 'vitest'; +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { describe, it, expect, vi } from 'vitest'; import { DefaultCmabService } from './cmab_service'; import { getMockSyncCache } from '../../../tests/mock/mock_cache'; @@ -6,6 +22,8 @@ import { ProjectConfig } from '../../../project_config/project_config'; import { OptimizelyDecideOption, UserAttributes } from '../../../shared_types'; import OptimizelyUserContext from '../../../optimizely_user_context'; import { validate as uuidValidate } from 'uuid'; +import { resolvablePromise } from '../../../utils/promise/resolvablePromise'; +import { exhaustMicrotasks } from '../../../tests/testUtils'; const mockProjectConfig = (): ProjectConfig => ({ experimentIdMap: { @@ -418,4 +436,75 @@ describe('DefaultCmabService', () => { expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(2); }); + + it('should serialize concurrent calls to getDecision with the same userId and ruleId', async () => { + const nCall = 10; + let currentVar = 123; + const fetchPromises = Array.from({ length: nCall }, () => resolvablePromise()); + + let callCount = 0; + const mockCmabClient = { + fetchDecision: vi.fn().mockImplementation(async () => { + const variation = `${currentVar++}`; + await fetchPromises[callCount++]; + return variation; + }), + }; + + const cmabService = new DefaultCmabService({ + cmabCache: getMockSyncCache(), + cmabClient: mockCmabClient, + }); + + const projectConfig = mockProjectConfig(); + const userContext = mockUserContext('user123', {}); + + const resultPromises = []; + for (let i = 0; i < nCall; i++) { + resultPromises.push(cmabService.getDecision(projectConfig, userContext, '1234', {})); + } + + await exhaustMicrotasks(); + + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1); + + for(let i = 0; i < nCall; i++) { + fetchPromises[i].resolve(''); + await exhaustMicrotasks(); + const result = await resultPromises[i]; + expect(result.variationId).toBe('123'); + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1); + } + }); + + it('should not serialize calls to getDecision with different userId or ruleId', async () => { + let currentVar = 123; + const mockCmabClient = { + fetchDecision: vi.fn().mockImplementation(() => Promise.resolve(`${currentVar++}`)), + }; + + const cmabService = new DefaultCmabService({ + cmabCache: getMockSyncCache(), + cmabClient: mockCmabClient, + }); + + const projectConfig = mockProjectConfig(); + const userContext1 = mockUserContext('user123', {}); + const userContext2 = mockUserContext('user456', {}); + + const resultPromises = []; + resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '1234', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '5678', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '1234', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '5678', {})); + + await exhaustMicrotasks(); + + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(4); + + for(let i = 0; i < resultPromises.length; i++) { + const result = await resultPromises[i]; + expect(result.variationId).toBe(`${123 + i}`); + } + }); }); diff --git a/lib/core/decision_service/cmab/cmab_service.ts b/lib/core/decision_service/cmab/cmab_service.ts index 094e10bbb..cd3ab99ea 100644 --- a/lib/core/decision_service/cmab/cmab_service.ts +++ b/lib/core/decision_service/cmab/cmab_service.ts @@ -23,6 +23,7 @@ import { CmabClient } from "./cmab_client"; import { v4 as uuidV4 } from 'uuid'; import murmurhash from "murmurhash"; import { DecideOptionsMap } from ".."; +import { SerialRunner } from "../../../utils/executor/serial_runner"; export type CmabDecision = { variationId: string, @@ -57,10 +58,15 @@ export type CmabServiceOptions = { cmabClient: CmabClient; } +const SERIALIZER_BUCKETS = 1000; + export class DefaultCmabService implements CmabService { private cmabCache: CacheWithRemove; private cmabClient: CmabClient; private logger?: LoggerFacade; + private serializers: SerialRunner[] = Array.from( + { length: SERIALIZER_BUCKETS }, () => new SerialRunner() + ); constructor(options: CmabServiceOptions) { this.cmabCache = options.cmabCache; @@ -68,11 +74,29 @@ export class DefaultCmabService implements CmabService { this.logger = options.logger; } + private getSerializerIndex(userId: string, experimentId: string): number { + const key = this.getCacheKey(userId, experimentId); + const hash = murmurhash.v3(key); + return Math.abs(hash) % SERIALIZER_BUCKETS; + } + async getDecision( projectConfig: ProjectConfig, userContext: IOptimizelyUserContext, ruleId: string, options: DecideOptionsMap, + ): Promise { + const serializerIndex = this.getSerializerIndex(userContext.getUserId(), ruleId); + return this.serializers[serializerIndex].run(() => + this.getDecisionInternal(projectConfig, userContext, ruleId, options) + ); + } + + private async getDecisionInternal( + projectConfig: ProjectConfig, + userContext: IOptimizelyUserContext, + ruleId: string, + options: DecideOptionsMap, ): Promise { const filteredAttributes = this.filterAttributes(projectConfig, userContext, ruleId); diff --git a/lib/utils/executor/serial_runner.spec.ts b/lib/utils/executor/serial_runner.spec.ts new file mode 100644 index 000000000..6456735a9 --- /dev/null +++ b/lib/utils/executor/serial_runner.spec.ts @@ -0,0 +1,143 @@ +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { describe, it, expect, beforeEach } from 'vitest'; + +import { SerialRunner } from './serial_runner'; +import { resolvablePromise } from '../promise/resolvablePromise'; +import { exhaustMicrotasks } from '../../tests/testUtils'; + +describe('SerialRunner', () => { + let serialRunner: SerialRunner; + + beforeEach(() => { + serialRunner = new SerialRunner(); + }); + + it('should return result from a single async function', async () => { + const fn = () => Promise.resolve('result'); + + const result = await serialRunner.run(fn); + + expect(result).toBe('result'); + }); + + it('should reject with same error when the passed function rejects', async () => { + const error = new Error('test error'); + const fn = () => Promise.reject(error); + + await expect(serialRunner.run(fn)).rejects.toThrow(error); + }); + + it('should execute multiple async functions in order', async () => { + const executionOrder: number[] = []; + const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()]; + + const createTask = (id: number) => async () => { + executionOrder.push(id); + await promises[id]; + return id; + }; + + const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))]; + + // only first task should have started + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0]); + + // Resolve first task - second should start + promises[0].resolve(''); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1]); + + // Resolve second task - third should start + promises[1].resolve(''); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1, 2]); + + // Resolve third task - all done + promises[2].resolve(''); + + // Verify all results are correct + expect(await results[0]).toBe(0); + expect(await results[1]).toBe(1); + expect(await results[2]).toBe(2); + }); + + it('should continue execution even if one function throws an error', async () => { + const executionOrder: number[] = []; + const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()]; + + const createTask = (id: number) => async () => { + executionOrder.push(id); + await promises[id]; + return id; + }; + + const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))]; + + // only first task should have started + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0]); + + // reject first task - second should still start + promises[0].reject(new Error('first error')); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1]); + + // reject second task - third should still start + promises[1].reject(new Error('second error')); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1, 2]); + + // Resolve third task - all done + promises[2].resolve(''); + + // Verify results - first and third succeed, second fails + await expect(results[0]).rejects.toThrow('first error'); + await expect(results[1]).rejects.toThrow('second error'); + await expect(results[2]).resolves.toBe(2); + }); + + it('should handle functions that return different types', async () => { + const numberFn = () => Promise.resolve(42); + const stringFn = () => Promise.resolve('hello'); + const objectFn = () => Promise.resolve({ key: 'value' }); + const arrayFn = () => Promise.resolve([1, 2, 3]); + const booleanFn = () => Promise.resolve(true); + const nullFn = () => Promise.resolve(null); + const undefinedFn = () => Promise.resolve(undefined); + + const results = await Promise.all([ + serialRunner.run(numberFn), + serialRunner.run(stringFn), + serialRunner.run(objectFn), + serialRunner.run(arrayFn), + serialRunner.run(booleanFn), + serialRunner.run(nullFn), + serialRunner.run(undefinedFn), + ]); + + expect(results).toEqual([42, 'hello', { key: 'value' }, [1, 2, 3], true, null, undefined]); + }); + + it('should handle empty function that returns undefined', async () => { + const emptyFn = () => Promise.resolve(undefined); + + const result = await serialRunner.run(emptyFn); + + expect(result).toBeUndefined(); + }); +}); \ No newline at end of file diff --git a/lib/utils/executor/serial_runner.ts b/lib/utils/executor/serial_runner.ts new file mode 100644 index 000000000..243cae0b1 --- /dev/null +++ b/lib/utils/executor/serial_runner.ts @@ -0,0 +1,36 @@ +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { AsyncProducer } from "../type"; + +class SerialRunner { + private waitPromise: Promise = Promise.resolve(); + + // each call to serialize adds a new function to the end of the promise chain + // the function is called when the previous promise resolves + // if the function throws, the error is caught and ignored to allow the chain to continue + // the result of the function is returned as a promise + // if multiple calls to serialize are made, they will be executed in order + // even if some of them throw errors + + run(fn: AsyncProducer): Promise { + const resultPromise = this.waitPromise.then(fn); + this.waitPromise = resultPromise.catch(() => {}); + return resultPromise; + } +} + +export { SerialRunner };