From bc7310a090a6dc701a97193d4ef9e57b37601ffd Mon Sep 17 00:00:00 2001 From: Matt Carroll Date: Wed, 7 Aug 2019 09:21:13 -0700 Subject: [PATCH] Remove plugin interfaces from event processor --- .../__tests__/v1EventProcessor.spec.ts | 333 +----------------- .../event-processor/src/eventProcessor.ts | 91 +---- 2 files changed, 18 insertions(+), 406 deletions(-) diff --git a/packages/event-processor/__tests__/v1EventProcessor.spec.ts b/packages/event-processor/__tests__/v1EventProcessor.spec.ts index 639617274..5484e15e4 100644 --- a/packages/event-processor/__tests__/v1EventProcessor.spec.ts +++ b/packages/event-processor/__tests__/v1EventProcessor.spec.ts @@ -24,14 +24,6 @@ import { import { EventProcessor } from '../src/eventProcessor' import { buildImpressionEventV1, makeBatchedEventV1 } from '../src/v1/buildEventV1' -function sleep(time = 0): Promise { - return new Promise(resolve => { - setTimeout(() => { - resolve() - }, time) - }) -} - function createImpressionEvent() { return { type: 'impression' as 'impression', @@ -163,7 +155,7 @@ describe('LogTierV1EventProcessor', () => { }) const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) + processor.process(impressionEvent) processor.stop().then(() => { done() @@ -192,7 +184,7 @@ describe('LogTierV1EventProcessor', () => { }) const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) + processor.process(impressionEvent) processor.stop().then(() => { done() @@ -222,8 +214,8 @@ describe('LogTierV1EventProcessor', () => { const impressionEvent1 = createImpressionEvent() const impressionEvent2 = createImpressionEvent() impressionEvent2.context.revision = '2' - processor.process(impressionEvent1, testProjectConfig) - processor.process(impressionEvent2, testProjectConfig) + processor.process(impressionEvent1) + processor.process(impressionEvent2) processor.stop().then(() => { expect(dispatchStub).toBeCalledTimes(2) @@ -249,7 +241,7 @@ describe('LogTierV1EventProcessor', () => { it('should immediately flush events as they are processed', () => { const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) + processor.process(impressionEvent) expect(dispatchStub).toHaveBeenCalledTimes(1) expect(dispatchStub).toHaveBeenCalledWith({ @@ -280,12 +272,12 @@ describe('LogTierV1EventProcessor', () => { const impressionEvent2 = createImpressionEvent() const impressionEvent3 = createImpressionEvent() - processor.process(impressionEvent1, testProjectConfig) - processor.process(impressionEvent2, testProjectConfig) + processor.process(impressionEvent1) + processor.process(impressionEvent2) expect(dispatchStub).toHaveBeenCalledTimes(0) - processor.process(impressionEvent3, testProjectConfig) + processor.process(impressionEvent3) expect(dispatchStub).toHaveBeenCalledTimes(1) expect(dispatchStub).toHaveBeenCalledWith({ @@ -306,12 +298,12 @@ describe('LogTierV1EventProcessor', () => { impressionEvent2.context.revision = '2' - processor.process(impressionEvent1, testProjectConfig) - processor.process(impressionEvent2, testProjectConfig) + processor.process(impressionEvent1) + processor.process(impressionEvent2) expect(dispatchStub).toHaveBeenCalledTimes(0) - processor.process(conversionEvent, testProjectConfig) + processor.process(conversionEvent) expect(dispatchStub).toHaveBeenCalledTimes(2) expect(dispatchStub).toHaveBeenCalledWith({ @@ -330,7 +322,7 @@ describe('LogTierV1EventProcessor', () => { it('should flush the queue when the flush interval happens', () => { const impressionEvent1 = createImpressionEvent() - processor.process(impressionEvent1, testProjectConfig) + processor.process(impressionEvent1) expect(dispatchStub).toHaveBeenCalledTimes(0) @@ -343,307 +335,10 @@ describe('LogTierV1EventProcessor', () => { params: makeBatchedEventV1([impressionEvent1]), }) - processor.process(createImpressionEvent(), testProjectConfig) - processor.process(createImpressionEvent(), testProjectConfig) + processor.process(createImpressionEvent()) + processor.process(createImpressionEvent()) // flushing should reset queue, at this point only has two events expect(dispatchStub).toHaveBeenCalledTimes(1) }) }) - - describe('plugins', () => { - let processor: EventProcessor - - describe('transformers', () => { - beforeEach(() => { - jest.useRealTimers() - }) - - afterEach(() => { - processor.stop() - }) - - it('should should invoke the transformer with the event and projectConfig', async () => { - const transformer = jest.fn() - processor = new LogTierV1EventProcessor({ - transformers: [ - async (event, projectConfig) => { - transformer(event, projectConfig) - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(transformer).toHaveBeenCalledTimes(1) - expect(transformer).toHaveBeenCalledWith(impressionEvent, testProjectConfig) - }) - - it('should allow augmentation of the Event', async () => { - processor = new LogTierV1EventProcessor({ - transformers: [ - async (event, projectConfig) => { - event.uuid = 'new uuid' - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process( - // spread here for dereference - { - ...impressionEvent, - }, - testProjectConfig, - ) - - // sleep to let async functions run - await sleep(0) - - const modifiedEvent = { - ...impressionEvent, - uuid: 'new uuid', - } - expect(dispatchStub).toHaveBeenCalledTimes(1) - expect(dispatchStub).toHaveBeenCalledWith({ - url: 'https://logx.optimizely.com/v1/events', - httpVerb: 'POST', - params: makeBatchedEventV1([modifiedEvent]), - }) - }) - - it('should continue with the event if a transformer throws an error', async () => { - processor = new LogTierV1EventProcessor({ - transformers: [ - async (event, projectConfig) => { - throw new Error('transformer error') - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(dispatchStub).toHaveBeenCalledTimes(1) - expect(dispatchStub).toHaveBeenCalledWith({ - url: 'https://logx.optimizely.com/v1/events', - httpVerb: 'POST', - params: makeBatchedEventV1([impressionEvent]), - }) - }) - }) - - describe('interceptors', () => { - beforeEach(() => { - jest.useRealTimers() - }) - - afterEach(() => { - processor.stop() - }) - - it('should should invoke the interceptor with the event and projectConfig', async () => { - const interceptor = jest.fn() - processor = new LogTierV1EventProcessor({ - interceptors: [ - async (event, projectConfig) => { - interceptor(event, projectConfig) - return true - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(interceptor).toHaveBeenCalledTimes(1) - expect(interceptor).toHaveBeenCalledWith(impressionEvent, testProjectConfig) - }) - - it('should continue with the event if a interceptor throws an error', async () => { - processor = new LogTierV1EventProcessor({ - interceptors: [ - async (event, projectConfig) => { - throw new Error('interceptor error') - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(dispatchStub).toHaveBeenCalledTimes(1) - expect(dispatchStub).toHaveBeenCalledWith({ - url: 'https://logx.optimizely.com/v1/events', - httpVerb: 'POST', - params: makeBatchedEventV1([impressionEvent]), - }) - }) - - it('should drop the event if a interceptor returns false', async () => { - processor = new LogTierV1EventProcessor({ - interceptors: [ - async (event, projectConfig) => { - return false - }, - ], - dispatcher: stubDispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent = createImpressionEvent() - processor.process(impressionEvent, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(dispatchStub).toHaveBeenCalledTimes(0) - }) - }) - - describe('callbacks', () => { - beforeEach(() => { - jest.useRealTimers() - }) - - afterEach(() => { - processor.stop() - }) - - it('should invoke the callback with the result of dispatcher and the event', async () => { - const callback = jest.fn() - processor = new LogTierV1EventProcessor({ - callbacks: [callback], - dispatcher: stubDispatcher, - maxQueueSize: 3, - }) - processor.start() - - const impressionEvent1 = createImpressionEvent() - const impressionEvent2 = createImpressionEvent() - const impressionEvent3 = createImpressionEvent() - processor.process(impressionEvent1, testProjectConfig) - processor.process(impressionEvent2, testProjectConfig) - processor.process(impressionEvent3, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(callback).toHaveBeenCalledTimes(3) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent1, - result: true, - }) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent2, - result: true, - }) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent3, - result: true, - }) - }) - - it('should invoke the callback with result = false event if the dispatcher doesnt provide statusCode', async () => { - const callback = jest.fn() - - stubDispatcher = { - dispatchEvent(event: EventV1Request, callback: EventDispatcherCallback): void { - dispatchStub(event) - // @ts-ignore - callback() - }, - } - processor = new LogTierV1EventProcessor({ - callbacks: [callback], - dispatcher: stubDispatcher, - maxQueueSize: 3, - }) - processor.start() - - const impressionEvent1 = createImpressionEvent() - const impressionEvent2 = createImpressionEvent() - const impressionEvent3 = createImpressionEvent() - processor.process(impressionEvent1, testProjectConfig) - processor.process(impressionEvent2, testProjectConfig) - processor.process(impressionEvent3, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(callback).toHaveBeenCalledTimes(3) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent1, - result: false, - }) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent2, - result: false, - }) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent3, - result: false, - }) - }) - - it('should return result == false when the dispatcher returns a non 200 response', async () => { - const callback = jest.fn() - const dispatcher: EventDispatcher = { - dispatchEvent(event: EventV1Request, callback: EventDispatcherCallback): void { - dispatchStub(event) - callback({ - statusCode: 400, - }) - }, - } - - processor = new LogTierV1EventProcessor({ - callbacks: [callback], - dispatcher, - maxQueueSize: 1, - }) - processor.start() - - const impressionEvent1 = createImpressionEvent() - processor.process(impressionEvent1, testProjectConfig) - - // sleep to let async functions run - await sleep(0) - - expect(callback).toHaveBeenCalledTimes(1) - expect(callback).toHaveBeenCalledWith({ - event: impressionEvent1, - result: false, - }) - }) - }) - }) }) diff --git a/packages/event-processor/src/eventProcessor.ts b/packages/event-processor/src/eventProcessor.ts index 33ebf4e4f..62f8b4597 100644 --- a/packages/event-processor/src/eventProcessor.ts +++ b/packages/event-processor/src/eventProcessor.ts @@ -19,7 +19,6 @@ import { ConversionEvent, ImpressionEvent } from './events' import { EventDispatcher, EventV1Request, - EventDispatcherResponse, } from './eventDispatcher' import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue' import { getLogger } from '@optimizely/js-sdk-logging' @@ -30,45 +29,21 @@ export type ProcessableEvents = ConversionEvent | ImpressionEvent export type EventDispatchResult = { result: boolean; event: ProcessableEvents } -export type EventCallback = (result: EventDispatchResult) => void - -export type EventTransformer = ( - event: ProcessableEvents, - // TODO change this to ProjectConfig when js-sdk-models is available - projectConfig: any, -) => Promise - -export type EventInterceptor = ( - event: ProcessableEvents, - // TODO change this to ProjectConfig when js-sdk-models is available - projectConfig: any, -) => Promise - export interface EventProcessor extends Managed { - // TODO change this to ProjectConfig when js-sdk-models is available - process(event: ProcessableEvents, projectConfig: any): void + process(event: ProcessableEvents): void } const MIN_FLUSH_INTERVAL = 100 export abstract class AbstractEventProcessor implements EventProcessor { - protected transformers: EventTransformer[] - protected interceptors: EventInterceptor[] - protected callbacks: EventCallback[] protected dispatcher: EventDispatcher protected queue: EventQueue constructor({ dispatcher, - transformers = [], - interceptors = [], - callbacks = [], flushInterval = 30000, maxQueueSize = 3000, }: { dispatcher: EventDispatcher - transformers?: EventTransformer[] - interceptors?: EventInterceptor[] - callbacks?: EventCallback[] flushInterval?: number maxQueueSize?: number }) { @@ -86,10 +61,6 @@ export abstract class AbstractEventProcessor implements EventProcessor { sink: buffer => this.drainQueue(buffer), }) } - - this.transformers = transformers - this.interceptors = interceptors - this.callbacks = callbacks } drainQueue(buffer: ProcessableEvents[]): Promise { @@ -98,19 +69,8 @@ export abstract class AbstractEventProcessor implements EventProcessor { const promises = this.groupEvents(buffer).map(eventGroup => { const formattedEvent = this.formatEvents(eventGroup) - return new Promise((resolve, reject) => { - this.dispatcher.dispatchEvent(formattedEvent, response => { - // loop through every event in the group and run the callback handler - // with result - eventGroup.forEach(event => { - this.callbacks.forEach(handler => { - handler({ - result: isResponseSuccess(response), - event, - }) - }) - }) - + return new Promise((resolve) => { + this.dispatcher.dispatchEvent(formattedEvent, () => { resolve() }) }) @@ -119,33 +79,7 @@ export abstract class AbstractEventProcessor implements EventProcessor { return Promise.all(promises) } - // TODO change this to ProjectConfig when js-sdk-models is available - async process(event: ProcessableEvents, projectConfig: any): Promise { - // loop and apply all transformers - for (let transformer of this.transformers) { - try { - await transformer(event, projectConfig) - } catch (ex) { - // swallow error and move on - logger.error('eventTransformer threw error', ex.message, ex) - } - } - Object.freeze(event) - - // loop and apply all interceptors - for (let interceptor of this.interceptors) { - let result - try { - result = await interceptor(event, projectConfig) - } catch (ex) { - // swallow and continue - logger.error('eventInterceptor threw error', ex.message, ex) - } - if (result === false) { - return - } - } - + process(event: ProcessableEvents): void { this.queue.enqueue(event) } @@ -167,20 +101,3 @@ export abstract class AbstractEventProcessor implements EventProcessor { protected abstract formatEvents(events: ProcessableEvents[]): EventV1Request } - -function isResponseSuccess(response: EventDispatcherResponse): boolean { - try { - let statusCode: number - if ('statusCode' in response) { - statusCode = response.statusCode - } else if ('status' in response) { - statusCode = response.status - } else { - return false - } - - return statusCode >= 200 && statusCode < 300 - } catch (e) { - return false - } -}