Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/client_factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ export type OptimizelyFactoryConfig = Config & {
requestHandler: RequestHandler;
}

export const getOptimizelyInstance = (config: OptimizelyFactoryConfig): Client => {
export const getOptimizelyInstance = (config: OptimizelyFactoryConfig): Optimizely => {
const {
clientEngine,
clientVersion,
Expand Down
86 changes: 79 additions & 7 deletions lib/event_processor/batch_event_processor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,7 +875,7 @@ describe('BatchEventProcessor', async () => {
});

describe('retryFailedEvents', () => {
it('should disptach only failed events from the store and not dispatch queued events', async () => {
it('should dispatch only failed events from the store and not dispatch queued events', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});
Expand Down Expand Up @@ -921,7 +921,7 @@ describe('BatchEventProcessor', async () => {
]));
});

it('should disptach only failed events from the store and not dispatch events that are being dispatched', async () => {
it('should dispatch only failed events from the store and not dispatch events that are being dispatched', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
const mockResult1 = resolvablePromise();
Expand Down Expand Up @@ -977,7 +977,7 @@ describe('BatchEventProcessor', async () => {
]));
});

it('should disptach events in correct batch size and separate events with differnt contexts in separate batch', async () => {
it('should dispatch events in correct batch size and separate events with differnt contexts in separate batch', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});
Expand Down Expand Up @@ -1023,7 +1023,7 @@ describe('BatchEventProcessor', async () => {
});

describe('when failedEventRepeater is fired', () => {
it('should disptach only failed events from the store and not dispatch queued events', async () => {
it('should dispatch only failed events from the store and not dispatch queued events', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});
Expand Down Expand Up @@ -1071,7 +1071,7 @@ describe('BatchEventProcessor', async () => {
]));
});

it('should disptach only failed events from the store and not dispatch events that are being dispatched', async () => {
it('should dispatch only failed events from the store and not dispatch events that are being dispatched', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
const mockResult1 = resolvablePromise();
Expand Down Expand Up @@ -1129,7 +1129,7 @@ describe('BatchEventProcessor', async () => {
]));
});

it('should disptach events in correct batch size and separate events with differnt contexts in separate batch', async () => {
it('should dispatch events in correct batch size and separate events with differnt contexts in separate batch', async () => {
const eventDispatcher = getMockDispatcher();
const mockDispatch: MockInstance<typeof eventDispatcher.dispatchEvent> = eventDispatcher.dispatchEvent;
mockDispatch.mockResolvedValue({});
Expand Down Expand Up @@ -1277,7 +1277,7 @@ describe('BatchEventProcessor', async () => {
expect(failedEventRepeater.stop).toHaveBeenCalledOnce();
});

it('should disptach the events in queue using the closing dispatcher if available', async () => {
it('should dispatch the events in queue using the closing dispatcher if available', async () => {
const eventDispatcher = getMockDispatcher();
const closingEventDispatcher = getMockDispatcher();
closingEventDispatcher.dispatchEvent.mockResolvedValue({});
Expand Down Expand Up @@ -1408,4 +1408,76 @@ describe('BatchEventProcessor', async () => {
await expect(processor.onTerminated()).resolves.not.toThrow();
});
});

describe('flushImmediately', () => {
it('should dispatch the events in queue using the closing dispatcher if available', async () => {
const eventDispatcher = getMockDispatcher();
const closingEventDispatcher = getMockDispatcher();
closingEventDispatcher.dispatchEvent.mockResolvedValue({});

const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
closingEventDispatcher,
dispatchRepeater,
failedEventRepeater,
batchSize: 100,
});

processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
for(let i = 0; i < 10; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);
expect(closingEventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);

processor.flushImmediately();
expect(closingEventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(closingEventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events));

expect(processor.isRunning()).toBe(true);
});


it('should dispatch the events in queue using eventDispatcher if closingEventDispatcher is not available', async () => {
const eventDispatcher = getMockDispatcher();
eventDispatcher.dispatchEvent.mockResolvedValue({});

const dispatchRepeater = getMockRepeater();
const failedEventRepeater = getMockRepeater();

const processor = new BatchEventProcessor({
eventDispatcher,
dispatchRepeater,
failedEventRepeater,
batchSize: 100,
});

processor.start();
await processor.onRunning();

const events: ProcessableEvent[] = [];
for(let i = 0; i < 10; i++) {
const event = createImpressionEvent(`id-${i}`);
events.push(event);
await processor.process(event);
}

expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(0);

processor.flushImmediately();
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledTimes(1);
expect(eventDispatcher.dispatchEvent).toHaveBeenCalledWith(buildLogEvent(events));

expect(processor.isRunning()).toBe(true);
});
});
});
11 changes: 9 additions & 2 deletions lib/event_processor/batch_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,14 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
});
}

private async flush(closing = false): Promise<void> {
private async flush(useClosingDispatcher = false): Promise<void> {
const batch = this.createNewBatch();
if (!batch) {
return;
}

this.dispatchRepeater.reset();
this.dispatchBatch(batch, closing);
this.dispatchBatch(batch, useClosingDispatcher);
}

async process(event: ProcessableEvent): Promise<void> {
Expand Down Expand Up @@ -332,6 +332,13 @@ export class BatchEventProcessor extends BaseService implements EventProcessor {
}
}

flushImmediately(): Promise<unknown> {
if (!this.isRunning()) {
return Promise.resolve();
}
return this.flush(true);
}

stop(): void {
if (this.isDone()) {
return;
Expand Down
2 changes: 1 addition & 1 deletion lib/event_processor/event_builder/log_event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ function makeVisitor(data: ImpressionEvent | ConversionEvent): Visitor {

export function buildLogEvent(events: UserEvent[]): LogEvent {
const region = events[0]?.context.region || 'US';
const url = logxEndpoint[region];
const url = logxEndpoint[region] || logxEndpoint['US'];

return {
url,
Expand Down
1 change: 1 addition & 0 deletions lib/event_processor/event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ export interface EventProcessor extends Service {
process(event: ProcessableEvent): Promise<unknown>;
onDispatch(handler: Consumer<LogEvent>): Fn;
setLogger(logger: LoggerFacade): void;
flushImmediately(): Promise<unknown>;
}
4 changes: 4 additions & 0 deletions lib/event_processor/forwarding_event_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,8 @@ export class ForwardingEventProcessor extends BaseService implements EventProces
onDispatch(handler: Consumer<LogEvent>): Fn {
return this.eventEmitter.on('dispatch', handler);
}

flushImmediately(): Promise<unknown> {
return Promise.resolve();
}
}
2 changes: 1 addition & 1 deletion lib/index.browser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export const createInstance = function(config: Config): Client {
window.addEventListener(
unloadEvent,
() => {
client.close();
client.flushImmediately();
},
);
}
Expand Down
41 changes: 41 additions & 0 deletions lib/odp/event_manager/odp_event_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1010,4 +1010,45 @@ describe('DefaultOdpEventManager', () => {
await expect(odpEventManager.onTerminated()).resolves.not.toThrow();
expect(odpEventManager.getState()).toBe(ServiceState.Terminated);
});

it('should flush the queue when flushImmediately() is called in running state', async () => {
const repeater = getMockRepeater();

const apiManager = getMockApiManager();
apiManager.sendEvents.mockResolvedValue({ statusCode: 200 });

const odpEventManager = new DefaultOdpEventManager({
repeater: repeater,
apiManager: apiManager,
batchSize: 30,
retryConfig: {
maxRetries: 3,
backoffProvider: vi.fn(),
},
});

odpEventManager.updateConfig({
integrated: true,
odpConfig: config,
});

odpEventManager.start();
await expect(odpEventManager.onRunning()).resolves.not.toThrow();

const events: OdpEvent[] = [];
for(let i = 0; i < 10; i++) {
events.push(makeEvent(i));
odpEventManager.sendEvent(events[i]);
}

await exhaustMicrotasks();
expect(apiManager.sendEvents).not.toHaveBeenCalled();

odpEventManager.flushImmediately();
await exhaustMicrotasks();

expect(apiManager.sendEvents).toHaveBeenCalledTimes(1);
expect(apiManager.sendEvents).toHaveBeenCalledWith(config, events);
expect(odpEventManager.isRunning()).toBe(true);
});
});
8 changes: 8 additions & 0 deletions lib/odp/event_manager/odp_event_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ export interface OdpEventManager extends Service {
updateConfig(odpIntegrationConfig: OdpIntegrationConfig): void;
sendEvent(event: OdpEvent): void;
setLogger(logger: LoggerFacade): void;
flushImmediately(): Promise<unknown>;
}

export type RetryConfig = {
Expand Down Expand Up @@ -160,6 +161,13 @@ export class DefaultOdpEventManager extends BaseService implements OdpEventManag
this.startPromise.resolve();
}

flushImmediately(): Promise<unknown> {
if (!this.isRunning()) {
return Promise.resolve();
}
return this.flush();
}

stop(): void {
if (this.isDone()) {
return;
Expand Down
24 changes: 24 additions & 0 deletions lib/odp/odp_manager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const getMockOdpEventManager = () => {
sendEvent: vi.fn(),
makeDisposable: vi.fn(),
setLogger: vi.fn(),
flushImmediately: vi.fn(),
};
};

Expand Down Expand Up @@ -780,6 +781,29 @@ describe('DefaultOdpManager', () => {
odpManager.makeDisposable();

expect(eventManager.makeDisposable).toHaveBeenCalled();
});

it('should call flushImmediately() on eventManager when flushImmediately() is called on odpManager', async () => {
const eventManager = getMockOdpEventManager();
eventManager.onRunning.mockResolvedValue({});
const segmentManager = getMockOdpSegmentManager();

eventManager.flushImmediately.mockResolvedValue({});

const odpManager = new DefaultOdpManager({
segmentManager,
eventManager,
});

odpManager.updateConfig({ integrated: true, odpConfig: config });
odpManager.start();

await odpManager.onRunning();

odpManager.flushImmediately();

expect(eventManager.flushImmediately).toHaveBeenCalledOnce();
expect(odpManager.isRunning()).toBe(true);
})
});

8 changes: 8 additions & 0 deletions lib/odp/odp_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export interface OdpManager extends Service {
setClientInfo(clientEngine: string, clientVersion: string): void;
setVuid(vuid: string): void;
setLogger(logger: LoggerFacade): void;
flushImmediately(): Promise<unknown>;
}

export type OdpManagerConfig = {
Expand Down Expand Up @@ -145,6 +146,13 @@ export class DefaultOdpManager extends BaseService implements OdpManager {
this.stopPromise.reject(error);
}

flushImmediately(): Promise<unknown> {
if (!this.isRunning()) {
return Promise.resolve();
}
return this.eventManager.flushImmediately();
}

stop(): void {
if (this.isDone()) {
return;
Expand Down
33 changes: 33 additions & 0 deletions lib/optimizely/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -872,4 +872,37 @@ describe('Optimizely', () => {
});
});
});

it('should flush eventProcessor and odpManager on flushImmediately()', async () => {
const projectConfigManager = getMockProjectConfigManager({
initConfig: createProjectConfig(testData.getTestProjectConfig()),
});

const eventProcessor = getForwardingEventProcessor(eventDispatcher);
const odpManager = extractOdpManager(createOdpManager({}));

const optimizely = new Optimizely({
clientEngine: 'node-sdk',
projectConfigManager,
jsonSchemaValidator,
logger,
eventProcessor,
odpManager,
disposable: true,
cmabService: {} as any
});

odpManager?.updateConfig({ integrated: false });
await optimizely.onReady();

const eventProcessorFlushSpy = vi.spyOn(eventProcessor, 'flushImmediately').mockResolvedValue(Promise.resolve());
const odpManagerFlushSpy = vi.spyOn(odpManager!, 'flushImmediately').mockResolvedValue(Promise.resolve());

await optimizely.flushImmediately();

expect(eventProcessorFlushSpy).toHaveBeenCalled();
expect(odpManagerFlushSpy).toHaveBeenCalled();

expect(optimizely.isRunning()).toBe(true);
});
});
Loading
Loading