Skip to content

Commit e493501

Browse files
authored
Merge 5777ab3 into f04de07
2 parents f04de07 + 5777ab3 commit e493501

File tree

4 files changed

+293
-1
lines changed

4 files changed

+293
-1
lines changed

lib/core/decision_service/cmab/cmab_service.spec.ts

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,29 @@
1-
import { describe, it, expect, vi, Mocked, Mock, MockInstance, beforeEach, afterEach } from 'vitest';
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { describe, it, expect, vi } from 'vitest';
218

319
import { DefaultCmabService } from './cmab_service';
420
import { getMockSyncCache } from '../../../tests/mock/mock_cache';
521
import { ProjectConfig } from '../../../project_config/project_config';
622
import { OptimizelyDecideOption, UserAttributes } from '../../../shared_types';
723
import OptimizelyUserContext from '../../../optimizely_user_context';
824
import { validate as uuidValidate } from 'uuid';
25+
import { resolvablePromise } from '../../../utils/promise/resolvablePromise';
26+
import { exhaustMicrotasks } from '../../../tests/testUtils';
927

1028
const mockProjectConfig = (): ProjectConfig => ({
1129
experimentIdMap: {
@@ -418,4 +436,75 @@ describe('DefaultCmabService', () => {
418436

419437
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(2);
420438
});
439+
440+
it('should serialize concurrent calls to getDecision with the same userId and ruleId', async () => {
441+
const nCall = 10;
442+
let currentVar = 123;
443+
const fetchPromises = Array.from({ length: nCall }, () => resolvablePromise());
444+
445+
let callCount = 0;
446+
const mockCmabClient = {
447+
fetchDecision: vi.fn().mockImplementation(async () => {
448+
const variation = `${currentVar++}`;
449+
await fetchPromises[callCount++];
450+
return variation;
451+
}),
452+
};
453+
454+
const cmabService = new DefaultCmabService({
455+
cmabCache: getMockSyncCache(),
456+
cmabClient: mockCmabClient,
457+
});
458+
459+
const projectConfig = mockProjectConfig();
460+
const userContext = mockUserContext('user123', {});
461+
462+
const resultPromises = [];
463+
for (let i = 0; i < nCall; i++) {
464+
resultPromises.push(cmabService.getDecision(projectConfig, userContext, '1234', {}));
465+
}
466+
467+
await exhaustMicrotasks();
468+
469+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1);
470+
471+
for(let i = 0; i < nCall; i++) {
472+
fetchPromises[i].resolve('');
473+
await exhaustMicrotasks();
474+
const result = await resultPromises[i];
475+
expect(result.variationId).toBe('123');
476+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(1);
477+
}
478+
});
479+
480+
it('should not serialize calls to getDecision with different userId or ruleId', async () => {
481+
let currentVar = 123;
482+
const mockCmabClient = {
483+
fetchDecision: vi.fn().mockImplementation(() => Promise.resolve(`${currentVar++}`)),
484+
};
485+
486+
const cmabService = new DefaultCmabService({
487+
cmabCache: getMockSyncCache(),
488+
cmabClient: mockCmabClient,
489+
});
490+
491+
const projectConfig = mockProjectConfig();
492+
const userContext1 = mockUserContext('user123', {});
493+
const userContext2 = mockUserContext('user456', {});
494+
495+
const resultPromises = [];
496+
resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '1234', {}));
497+
resultPromises.push(cmabService.getDecision(projectConfig, userContext1, '5678', {}));
498+
resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '1234', {}));
499+
resultPromises.push(cmabService.getDecision(projectConfig, userContext2, '5678', {}));
500+
501+
await exhaustMicrotasks();
502+
503+
expect(mockCmabClient.fetchDecision).toHaveBeenCalledTimes(4);
504+
505+
for(let i = 0; i < resultPromises.length; i++) {
506+
const result = await resultPromises[i];
507+
expect(result.variationId).toBe(`${123 + i}`);
508+
}
509+
});
421510
});

lib/core/decision_service/cmab/cmab_service.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { CmabClient } from "./cmab_client";
2323
import { v4 as uuidV4 } from 'uuid';
2424
import murmurhash from "murmurhash";
2525
import { DecideOptionsMap } from "..";
26+
import { SerialRunner } from "../../../utils/executor/serial_runner";
2627

2728
export type CmabDecision = {
2829
variationId: string,
@@ -57,22 +58,45 @@ export type CmabServiceOptions = {
5758
cmabClient: CmabClient;
5859
}
5960

61+
const SERIALIZER_BUCKETS = 1000;
62+
6063
export class DefaultCmabService implements CmabService {
6164
private cmabCache: CacheWithRemove<CmabCacheValue>;
6265
private cmabClient: CmabClient;
6366
private logger?: LoggerFacade;
67+
private serializers: SerialRunner[] = Array.from(
68+
{ length: SERIALIZER_BUCKETS }, () => new SerialRunner()
69+
);
6470

6571
constructor(options: CmabServiceOptions) {
6672
this.cmabCache = options.cmabCache;
6773
this.cmabClient = options.cmabClient;
6874
this.logger = options.logger;
6975
}
7076

77+
private getSerializerIndex(userId: string, experimentId: string): number {
78+
const key = this.getCacheKey(userId, experimentId);
79+
const hash = murmurhash.v3(key);
80+
return Math.abs(hash) % SERIALIZER_BUCKETS;
81+
}
82+
7183
async getDecision(
7284
projectConfig: ProjectConfig,
7385
userContext: IOptimizelyUserContext,
7486
ruleId: string,
7587
options: DecideOptionsMap,
88+
): Promise<CmabDecision> {
89+
const serializerIndex = this.getSerializerIndex(userContext.getUserId(), ruleId);
90+
return this.serializers[serializerIndex].run(() =>
91+
this.getDecisionInternal(projectConfig, userContext, ruleId, options)
92+
);
93+
}
94+
95+
private async getDecisionInternal(
96+
projectConfig: ProjectConfig,
97+
userContext: IOptimizelyUserContext,
98+
ruleId: string,
99+
options: DecideOptionsMap,
76100
): Promise<CmabDecision> {
77101
const filteredAttributes = this.filterAttributes(projectConfig, userContext, ruleId);
78102

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
import { describe, it, expect, beforeEach } from 'vitest';
17+
18+
import { SerialRunner } from './serial_runner';
19+
import { resolvablePromise } from '../promise/resolvablePromise';
20+
import { exhaustMicrotasks } from '../../tests/testUtils';
21+
22+
describe('SerialRunner', () => {
23+
let serialRunner: SerialRunner;
24+
25+
beforeEach(() => {
26+
serialRunner = new SerialRunner();
27+
});
28+
29+
it('should return result from a single async function', async () => {
30+
const fn = () => Promise.resolve('result');
31+
32+
const result = await serialRunner.run(fn);
33+
34+
expect(result).toBe('result');
35+
});
36+
37+
it('should reject with same error when the passed function rejects', async () => {
38+
const error = new Error('test error');
39+
const fn = () => Promise.reject(error);
40+
41+
await expect(serialRunner.run(fn)).rejects.toThrow(error);
42+
});
43+
44+
it('should execute multiple async functions in order', async () => {
45+
const executionOrder: number[] = [];
46+
const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()];
47+
48+
const createTask = (id: number) => async () => {
49+
executionOrder.push(id);
50+
await promises[id];
51+
return id;
52+
};
53+
54+
const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))];
55+
56+
// only first task should have started
57+
await exhaustMicrotasks();
58+
expect(executionOrder).toEqual([0]);
59+
60+
// Resolve first task - second should start
61+
promises[0].resolve('');
62+
await exhaustMicrotasks();
63+
expect(executionOrder).toEqual([0, 1]);
64+
65+
// Resolve second task - third should start
66+
promises[1].resolve('');
67+
await exhaustMicrotasks();
68+
expect(executionOrder).toEqual([0, 1, 2]);
69+
70+
// Resolve third task - all done
71+
promises[2].resolve('');
72+
73+
// Verify all results are correct
74+
expect(await results[0]).toBe(0);
75+
expect(await results[1]).toBe(1);
76+
expect(await results[2]).toBe(2);
77+
});
78+
79+
it('should continue execution even if one function throws an error', async () => {
80+
const executionOrder: number[] = [];
81+
const promises = [resolvablePromise(), resolvablePromise(), resolvablePromise()];
82+
83+
const createTask = (id: number) => async () => {
84+
executionOrder.push(id);
85+
await promises[id];
86+
return id;
87+
};
88+
89+
const results = [serialRunner.run(createTask(0)), serialRunner.run(createTask(1)), serialRunner.run(createTask(2))];
90+
91+
// only first task should have started
92+
await exhaustMicrotasks();
93+
expect(executionOrder).toEqual([0]);
94+
95+
// reject first task - second should still start
96+
promises[0].reject(new Error('first error'));
97+
await exhaustMicrotasks();
98+
expect(executionOrder).toEqual([0, 1]);
99+
100+
// reject second task - third should still start
101+
promises[1].reject(new Error('second error'));
102+
await exhaustMicrotasks();
103+
expect(executionOrder).toEqual([0, 1, 2]);
104+
105+
// Resolve third task - all done
106+
promises[2].resolve('');
107+
108+
// Verify results - first and third succeed, second fails
109+
await expect(results[0]).rejects.toThrow('first error');
110+
await expect(results[1]).rejects.toThrow('second error');
111+
await expect(results[2]).resolves.toBe(2);
112+
});
113+
114+
it('should handle functions that return different types', async () => {
115+
const numberFn = () => Promise.resolve(42);
116+
const stringFn = () => Promise.resolve('hello');
117+
const objectFn = () => Promise.resolve({ key: 'value' });
118+
const arrayFn = () => Promise.resolve([1, 2, 3]);
119+
const booleanFn = () => Promise.resolve(true);
120+
const nullFn = () => Promise.resolve(null);
121+
const undefinedFn = () => Promise.resolve(undefined);
122+
123+
const results = await Promise.all([
124+
serialRunner.run(numberFn),
125+
serialRunner.run(stringFn),
126+
serialRunner.run(objectFn),
127+
serialRunner.run(arrayFn),
128+
serialRunner.run(booleanFn),
129+
serialRunner.run(nullFn),
130+
serialRunner.run(undefinedFn),
131+
]);
132+
133+
expect(results).toEqual([42, 'hello', { key: 'value' }, [1, 2, 3], true, null, undefined]);
134+
});
135+
136+
it('should handle empty function that returns undefined', async () => {
137+
const emptyFn = () => Promise.resolve(undefined);
138+
139+
const result = await serialRunner.run(emptyFn);
140+
141+
expect(result).toBeUndefined();
142+
});
143+
});
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2025, Optimizely
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import { AsyncProducer } from "../type";
18+
19+
class SerialRunner {
20+
private waitPromise: Promise<unknown> = Promise.resolve();
21+
22+
// each call to serialize adds a new function to the end of the promise chain
23+
// the function is called when the previous promise resolves
24+
// if the function throws, the error is caught and ignored to allow the chain to continue
25+
// the result of the function is returned as a promise
26+
// if multiple calls to serialize are made, they will be executed in order
27+
// even if some of them throw errors
28+
29+
run<T>(fn: AsyncProducer<T>): Promise<T> {
30+
const resultPromise = this.waitPromise.then(fn);
31+
this.waitPromise = resultPromise.catch(() => {});
32+
return resultPromise;
33+
}
34+
}
35+
36+
export { SerialRunner };

0 commit comments

Comments
 (0)