From ecd7a4a058d7f36ceb4fd4195e140955884d2f2d Mon Sep 17 00:00:00 2001 From: Raju Ahmed Date: Fri, 3 Oct 2025 23:45:41 +0600 Subject: [PATCH 1/2] [FSSDK-11898] serialize concurrent cmab service calls The cmab service caches the results of a cmab prediction retrieve from the server and returns it for subsequent call. This ensures a consistent value is returned for getDecision() within the cache ttl. However, when there is no cached value, if there is concurrent calls to gertDecision() for same userId and ruleId combination, all of these will cause a call to the server and may potentially return different values. The solution is to run concurrent calls for same userId and ruleId combinations one after another. To achieve this, we put each (userId, ruleId) combination in one of the predefined bucktes by hashing the (userId, ruleId) combination and serialize all calls for that particular hash % (num_buckets). --- .../cmab/cmab_service.spec.ts | 91 +++++++- .../decision_service/cmab/cmab_service.ts | 24 +++ lib/utils/executor/serial_runner.spec.ts | 195 ++++++++++++++++++ lib/utils/executor/serial_runner.ts | 36 ++++ 4 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 lib/utils/executor/serial_runner.spec.ts create mode 100644 lib/utils/executor/serial_runner.ts diff --git a/lib/core/decision_service/cmab/cmab_service.spec.ts b/lib/core/decision_service/cmab/cmab_service.spec.ts index dce84f6e1..38ee205e4 100644 --- a/lib/core/decision_service/cmab/cmab_service.spec.ts +++ b/lib/core/decision_service/cmab/cmab_service.spec.ts @@ -1,4 +1,20 @@ -import { describe, it, expect, vi, Mocked, Mock, MockInstance, beforeEach, afterEach } from 'vitest'; +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { describe, it, expect, vi } from 'vitest'; import { DefaultCmabService } from './cmab_service'; import { getMockSyncCache } from '../../../tests/mock/mock_cache'; @@ -6,6 +22,8 @@ import { ProjectConfig } from '../../../project_config/project_config'; import { OptimizelyDecideOption, UserAttributes } from '../../../shared_types'; import OptimizelyUserContext from '../../../optimizely_user_context'; import { validate as uuidValidate } from 'uuid'; +import { resolvablePromise } from '../../../utils/promise/resolvablePromise'; +import { exhaustMicrotasks } from '../../../tests/testUtils'; const mockProjectConfig = (): ProjectConfig => ({ experimentIdMap: { @@ -418,4 +436,75 @@ describe('DefaultCmabService', () => { expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(2); }); + + it('should serialize concurrent calls to getDecision with the same userId and ruleId', async () => { + const nCall = 10; + let currentVar = 123; + const fetchPromises = Array.from({ length: nCall }, () => resolvablePromise()); + + let callCount = 0; + const mockCmabClient = { + fetchDecision: vi.fn().mockImplementation(async () => { + const variation = `${currentVar++}`; + await fetchPromises[callCount++]; + return variation; + }), + }; + + const cmabService = new DefaultCmabService({ + cmabCache: getMockSyncCache(), + cmabClient: mockCmabClient, + }); + + const projectConfig = mockProjectConfig(); + const userContext = mockUserContext('user123', {}); + + const resultPromises = []; + for (let i = 0; i < nCall; i++) { + resultPromises.push(cmabService.getDecision(projectConfig, userContext, '1234', {})); + } + + await exhaustMicrotasks(); + + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1); + + for(let i = 0; i < nCall; i++) { + fetchPromises[i].resolve(''); + await exhaustMicrotasks(); + const result = await resultPromises[i]; + expect(result.variationId).toBe('123'); + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1); + } + }); + + it('should not serialize calls to getDecision with different userId or ruleId', async () => { + let currentVar = 123; + const mockCmabClient = { + fetchDecision: vi.fn().mockImplementation(() => Promise.resolve(`${currentVar++}`)), + }; + + const cmabService = new DefaultCmabService({ + cmabCache: getMockSyncCache(), + cmabClient: mockCmabClient, + }); + + const projectConfig = mockProjectConfig(); + const userContext1 = mockUserContext('user123', {}); + const userContext2 = mockUserContext('user456', {}); + + const resultPromises = []; + resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '1234', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '5678', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '1234', {})); + resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '5678', {})); + + await exhaustMicrotasks(); + + expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(4); + + for(let i = 0; i < resultPromises.length; i++) { + const result = await resultPromises[i]; + expect(result.variationId).toBe(`${123 + i}`); + } + }); }); diff --git a/lib/core/decision_service/cmab/cmab_service.ts b/lib/core/decision_service/cmab/cmab_service.ts index 094e10bbb..cd3ab99ea 100644 --- a/lib/core/decision_service/cmab/cmab_service.ts +++ b/lib/core/decision_service/cmab/cmab_service.ts @@ -23,6 +23,7 @@ import { CmabClient } from "./cmab_client"; import { v4 as uuidV4 } from 'uuid'; import murmurhash from "murmurhash"; import { DecideOptionsMap } from ".."; +import { SerialRunner } from "../../../utils/executor/serial_runner"; export type CmabDecision = { variationId: string, @@ -57,10 +58,15 @@ export type CmabServiceOptions = { cmabClient: CmabClient; } +const SERIALIZER_BUCKETS = 1000; + export class DefaultCmabService implements CmabService { private cmabCache: CacheWithRemove; private cmabClient: CmabClient; private logger?: LoggerFacade; + private serializers: SerialRunner[] = Array.from( + { length: SERIALIZER_BUCKETS }, () => new SerialRunner() + ); constructor(options: CmabServiceOptions) { this.cmabCache = options.cmabCache; @@ -68,11 +74,29 @@ export class DefaultCmabService implements CmabService { this.logger = options.logger; } + private getSerializerIndex(userId: string, experimentId: string): number { + const key = this.getCacheKey(userId, experimentId); + const hash = murmurhash.v3(key); + return Math.abs(hash) % SERIALIZER_BUCKETS; + } + async getDecision( projectConfig: ProjectConfig, userContext: IOptimizelyUserContext, ruleId: string, options: DecideOptionsMap, + ): Promise { + const serializerIndex = this.getSerializerIndex(userContext.getUserId(), ruleId); + return this.serializers[serializerIndex].run(() => + this.getDecisionInternal(projectConfig, userContext, ruleId, options) + ); + } + + private async getDecisionInternal( + projectConfig: ProjectConfig, + userContext: IOptimizelyUserContext, + ruleId: string, + options: DecideOptionsMap, ): Promise { const filteredAttributes = this.filterAttributes(projectConfig, userContext, ruleId); diff --git a/lib/utils/executor/serial_runner.spec.ts b/lib/utils/executor/serial_runner.spec.ts new file mode 100644 index 000000000..51bc6eece --- /dev/null +++ b/lib/utils/executor/serial_runner.spec.ts @@ -0,0 +1,195 @@ +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { SerialRunner } from './serial_runner'; +import { resolvablePromise } from '../promise/resolvablePromise'; +import { exhaustMicrotasks } from '../../tests/testUtils'; +import { Maybe } from '../type'; + +describe('SerialRunner', () => { + let serialRunner: SerialRunner; + + beforeEach(() => { + serialRunner = new SerialRunner(); + }); + + it('should return result from a single async function', async () => { + const fn = () => Promise.resolve('result'); + + const result = await serialRunner.run(fn); + + expect(result).toBe('result'); + }); + + it('should reject with same error when the passed function rejects', async () => { + const error = new Error('test error'); + const fn = () => Promise.reject(error); + + await expect(serialRunner.run(fn)).rejects.toThrow(error); + }); + + it('should execute multiple async functions in order', async () => { + // events to track execution order + // begin_1 means call 1 started + // end_1 means call 1 ended ... + const events: string[] = []; + + const nCall = 10; + + const promises = Array.from({ length: nCall }, () => resolvablePromise()); + const getFn = (i: number) => { + return async (): Promise => { + events.push(`begin_${i}`); + await promises[i]; + events.push(`end_${i}`); + return i; + } + } + + const resultPromises = []; + for (let i = 0; i < nCall; i++) { + resultPromises.push(serialRunner.run(getFn(i))); + } + + await exhaustMicrotasks(); + + const expectedEvents = ['begin_0']; + + expect(events).toEqual(expectedEvents); + + for(let i = 0; i < nCall - 1; i++) { + promises[i].resolve(''); + await exhaustMicrotasks(); + + expectedEvents.push(`end_${i}`); + expectedEvents.push(`begin_${i+1}`); + + expect(events).toEqual(expectedEvents); + } + + promises[nCall - 1].resolve(''); + await exhaustMicrotasks(); + + expectedEvents.push(`end_${nCall - 1}`); + expect(events).toEqual(expectedEvents); + + for(let i = 0; i < nCall; i++) { + await expect(resultPromises[i]).resolves.toBe(i); + } + }); + + it('should continue execution even if one function throws an error', async () => { + const events: string[] = []; + + const nCall = 5; + const err = [false, true, false, true, true]; + + const promises = Array.from({ length: nCall }, () => resolvablePromise()); + + const getFn = (i: number) => { + return async (): Promise => { + events.push(`begin_${i}`); + let err = false; + try { + await promises[i]; + } catch(e) { + err = true; + } + + events.push(`end_${i}`); + if (err) { + throw new Error(`error_${i}`); + } + return i; + } + } + + const resultPromises = []; + for (let i = 0; i < nCall; i++) { + resultPromises.push(serialRunner.run(getFn(i))); + } + + await exhaustMicrotasks(); + + const expectedEvents = ['begin_0']; + + expect(events).toEqual(expectedEvents); + + const endFn = (i: number) => { + if (err[i]) { + promises[i].reject(new Error('error')); + } else { + promises[i].resolve(''); + } + } + + for(let i = 0; i < nCall - 1; i++) { + endFn(i); + + await exhaustMicrotasks(); + + expectedEvents.push(`end_${i}`); + expectedEvents.push(`begin_${i+1}`); + + expect(events).toEqual(expectedEvents); + } + + endFn(nCall - 1); + await exhaustMicrotasks(); + + expectedEvents.push(`end_${nCall - 1}`); + expect(events).toEqual(expectedEvents); + + for(let i = 0; i < nCall; i++) { + if (err[i]) { + await expect(resultPromises[i]).rejects.toThrow(`error_${i}`); + } else { + await expect(resultPromises[i]).resolves.toBe(i); + } + } + }); + + it('should handle functions that return different types', async () => { + const numberFn = () => Promise.resolve(42); + const stringFn = () => Promise.resolve('hello'); + const objectFn = () => Promise.resolve({ key: 'value' }); + const arrayFn = () => Promise.resolve([1, 2, 3]); + const booleanFn = () => Promise.resolve(true); + const nullFn = () => Promise.resolve(null); + const undefinedFn = () => Promise.resolve(undefined); + + const results = await Promise.all([ + serialRunner.run(numberFn), + serialRunner.run(stringFn), + serialRunner.run(objectFn), + serialRunner.run(arrayFn), + serialRunner.run(booleanFn), + serialRunner.run(nullFn), + serialRunner.run(undefinedFn), + ]); + + expect(results).toEqual([42, 'hello', { key: 'value' }, [1, 2, 3], true, null, undefined]); + }); + + it('should handle empty function that returns undefined', async () => { + const emptyFn = () => Promise.resolve(undefined); + + const result = await serialRunner.run(emptyFn); + + expect(result).toBeUndefined(); + }); +}); \ No newline at end of file diff --git a/lib/utils/executor/serial_runner.ts b/lib/utils/executor/serial_runner.ts new file mode 100644 index 000000000..243cae0b1 --- /dev/null +++ b/lib/utils/executor/serial_runner.ts @@ -0,0 +1,36 @@ +/** + * Copyright 2025, Optimizely + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { AsyncProducer } from "../type"; + +class SerialRunner { + private waitPromise: Promise = Promise.resolve(); + + // each call to serialize adds a new function to the end of the promise chain + // the function is called when the previous promise resolves + // if the function throws, the error is caught and ignored to allow the chain to continue + // the result of the function is returned as a promise + // if multiple calls to serialize are made, they will be executed in order + // even if some of them throw errors + + run(fn: AsyncProducer): Promise { + const resultPromise = this.waitPromise.then(fn); + this.waitPromise = resultPromise.catch(() => {}); + return resultPromise; + } +} + +export { SerialRunner }; From 5777ab3159ee8b738758c6cde2aa7f0bd488241d Mon Sep 17 00:00:00 2001 From: Raju Ahmed Date: Tue, 7 Oct 2025 00:31:32 +0600 Subject: [PATCH 2/2] update --- lib/utils/executor/serial_runner.spec.ts | 156 ++++++++--------------- 1 file changed, 52 insertions(+), 104 deletions(-) diff --git a/lib/utils/executor/serial_runner.spec.ts b/lib/utils/executor/serial_runner.spec.ts index 51bc6eece..6456735a9 100644 --- a/lib/utils/executor/serial_runner.spec.ts +++ b/lib/utils/executor/serial_runner.spec.ts @@ -13,12 +13,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { describe, it, expect, beforeEach } from 'vitest'; import { SerialRunner } from './serial_runner'; import { resolvablePromise } from '../promise/resolvablePromise'; import { exhaustMicrotasks } from '../../tests/testUtils'; -import { Maybe } from '../type'; describe('SerialRunner', () => { let serialRunner: SerialRunner; @@ -42,125 +41,74 @@ describe('SerialRunner', () => { await expect(serialRunner.run(fn)).rejects.toThrow(error); }); - it('should execute multiple async functions in order', async () => { - // events to track execution order - // begin_1 means call 1 started - // end_1 means call 1 ended ... - const events: string[] = []; - - const nCall = 10; - - const promises = Array.from({ length: nCall }, () => resolvablePromise()); - const getFn = (i: number) => { - return async (): Promise => { - events.push(`begin_${i}`); - await promises[i]; - events.push(`end_${i}`); - return i; - } - } - - const resultPromises = []; - for (let i = 0; i < nCall; i++) { - resultPromises.push(serialRunner.run(getFn(i))); - } + it('should execute multiple async functions in order', async () => { + const executionOrder: number[] = []; + const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()]; - await exhaustMicrotasks(); - - const expectedEvents = ['begin_0']; + const createTask = (id: number) => async () => { + executionOrder.push(id); + await promises[id]; + return id; + }; - expect(events).toEqual(expectedEvents); + const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))]; - for(let i = 0; i < nCall - 1; i++) { - promises[i].resolve(''); - await exhaustMicrotasks(); + // only first task should have started + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0]); - expectedEvents.push(`end_${i}`); - expectedEvents.push(`begin_${i+1}`); - - expect(events).toEqual(expectedEvents); - } + // Resolve first task - second should start + promises[0].resolve(''); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1]); - promises[nCall - 1].resolve(''); + // Resolve second task - third should start + promises[1].resolve(''); await exhaustMicrotasks(); - - expectedEvents.push(`end_${nCall - 1}`); - expect(events).toEqual(expectedEvents); + expect(executionOrder).toEqual([0, 1, 2]); + + // Resolve third task - all done + promises[2].resolve(''); - for(let i = 0; i < nCall; i++) { - await expect(resultPromises[i]).resolves.toBe(i); - } + // Verify all results are correct + expect(await results[0]).toBe(0); + expect(await results[1]).toBe(1); + expect(await results[2]).toBe(2); }); it('should continue execution even if one function throws an error', async () => { - const events: string[] = []; - - const nCall = 5; - const err = [false, true, false, true, true]; - - const promises = Array.from({ length: nCall }, () => resolvablePromise()); - - const getFn = (i: number) => { - return async (): Promise => { - events.push(`begin_${i}`); - let err = false; - try { - await promises[i]; - } catch(e) { - err = true; - } - - events.push(`end_${i}`); - if (err) { - throw new Error(`error_${i}`); - } - return i; - } - } - - const resultPromises = []; - for (let i = 0; i < nCall; i++) { - resultPromises.push(serialRunner.run(getFn(i))); - } + const executionOrder: number[] = []; + const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()]; - await exhaustMicrotasks(); - - const expectedEvents = ['begin_0']; + const createTask = (id: number) => async () => { + executionOrder.push(id); + await promises[id]; + return id; + }; - expect(events).toEqual(expectedEvents); + const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))]; - const endFn = (i: number) => { - if (err[i]) { - promises[i].reject(new Error('error')); - } else { - promises[i].resolve(''); - } - } + // only first task should have started + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0]); - for(let i = 0; i < nCall - 1; i++) { - endFn(i); + // reject first task - second should still start + promises[0].reject(new Error('first error')); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1]); - await exhaustMicrotasks(); + // reject second task - third should still start + promises[1].reject(new Error('second error')); + await exhaustMicrotasks(); + expect(executionOrder).toEqual([0, 1, 2]); - expectedEvents.push(`end_${i}`); - expectedEvents.push(`begin_${i+1}`); - - expect(events).toEqual(expectedEvents); - } + // Resolve third task - all done + promises[2].resolve(''); - endFn(nCall - 1); - await exhaustMicrotasks(); - - expectedEvents.push(`end_${nCall - 1}`); - expect(events).toEqual(expectedEvents); - - for(let i = 0; i < nCall; i++) { - if (err[i]) { - await expect(resultPromises[i]).rejects.toThrow(`error_${i}`); - } else { - await expect(resultPromises[i]).resolves.toBe(i); - } - } + // Verify results - first and third succeed, second fails + await expect(results[0]).rejects.toThrow('first error'); + await expect(results[1]).rejects.toThrow('second error'); + await expect(results[2]).resolves.toBe(2); }); it('should handle functions that return different types', async () => {