Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion packages/event-processor/CHANGELOG.MD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
## [Unreleased]
Changes that have landed but are not yet released.

### New Features
- In `AbstractEventProcessor`, validate maxQueueSize and flushInterval; ignore & use default values when invalid
- `AbstractEventProcessor` can be constructed with a `notificationCenter`. When `notificationCenter` is provided, it triggers a log event notification after the event is sent to the event dispatcher

### Changed
- Removed transformers, interceptors, and callbacks from `AbstractEventProcessor`

## [0.2.1] - June 6, 2019

- Wrap the `callback` in `try/catch` when implementing a custom `eventDispatcher`. This ensures invoking the `callback` will always cleanup any pending retry tasks.
Expand All @@ -18,4 +25,4 @@ events that did not send successfully due to page navigation

## [0.1.0] - March 1, 2019

Initial release
Initial release
47 changes: 47 additions & 0 deletions packages/event-processor/__tests__/v1EventProcessor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -369,4 +369,51 @@ describe('LogTierV1EventProcessor', () => {
expect(notificationCenter.sendNotifications).toBeCalledWith(NOTIFICATION_TYPES.LOG_EVENT, event)
})
})

describe('invalid flushInterval or maxQueueSize', () => {
it('should ignore a flushInterval of 0 and use the default', () => {
const processor = new LogTierV1EventProcessor({
dispatcher: stubDispatcher,
flushInterval: 0,
maxQueueSize: 10,
})
processor.start()

const impressionEvent1 = createImpressionEvent()
processor.process(impressionEvent1)
expect(dispatchStub).toHaveBeenCalledTimes(0)
jest.advanceTimersByTime(30000)
expect(dispatchStub).toHaveBeenCalledTimes(1)
expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
params: makeBatchedEventV1([impressionEvent1]),
})
})

it('should ignore a maxQueueSize of 0 and use the default', () => {
const processor = new LogTierV1EventProcessor({
dispatcher: stubDispatcher,
flushInterval: 30000,
maxQueueSize: 0,
})
processor.start()

const impressionEvent1 = createImpressionEvent()
processor.process(impressionEvent1)
expect(dispatchStub).toHaveBeenCalledTimes(0)
const impressionEvents = [impressionEvent1]
for (let i = 0; i < 9; i++) {
const evt = createImpressionEvent()
processor.process(evt)
impressionEvents.push(evt)
}
expect(dispatchStub).toHaveBeenCalledTimes(1)
expect(dispatchStub).toHaveBeenCalledWith({
url: 'https://logx.optimizely.com/v1/events',
httpVerb: 'POST',
params: makeBatchedEventV1(impressionEvents),
})
})
})
})
30 changes: 22 additions & 8 deletions packages/event-processor/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
// TODO change this to use Managed from js-sdk-models when available
import { Managed } from './managed'
import { ConversionEvent, ImpressionEvent } from './events'
import {
EventDispatcher,
EventV1Request,
} from './eventDispatcher'
import { EventDispatcher, EventV1Request } from './eventDispatcher'
import { EventQueue, DefaultEventQueue, SingleEventQueue } from './eventQueue'
import { getLogger } from '@optimizely/js-sdk-logging'
import { NOTIFICATION_TYPES, NotificationCenter } from '@optimizely/js-sdk-utils'
Expand All @@ -34,7 +31,9 @@ export interface EventProcessor extends Managed {
process(event: ProcessableEvents): void
}

const MIN_FLUSH_INTERVAL = 100
const DEFAULT_FLUSH_INTERVAL = 30000 // Unit is ms - default flush interval is 30s
const DEFAULT_MAX_QUEUE_SIZE = 10

export abstract class AbstractEventProcessor implements EventProcessor {
protected dispatcher: EventDispatcher
protected queue: EventQueue<ProcessableEvents>
Expand All @@ -53,10 +52,25 @@ export abstract class AbstractEventProcessor implements EventProcessor {
}) {
this.dispatcher = dispatcher

if (flushInterval <= 0) {
logger.warn(
`Invalid flushInterval ${flushInterval}, defaulting to ${DEFAULT_FLUSH_INTERVAL}`,
)
flushInterval = DEFAULT_FLUSH_INTERVAL
}

maxQueueSize = Math.floor(maxQueueSize)
if (maxQueueSize < 1) {
logger.warn(
`Invalid maxQueueSize ${maxQueueSize}, defaulting to ${DEFAULT_MAX_QUEUE_SIZE}`,
)
maxQueueSize = DEFAULT_MAX_QUEUE_SIZE
}

maxQueueSize = Math.max(1, maxQueueSize)
if (maxQueueSize > 1) {
this.queue = new DefaultEventQueue({
flushInterval: Math.max(flushInterval, MIN_FLUSH_INTERVAL),
flushInterval,
maxQueueSize,
sink: buffer => this.drainQueue(buffer),
})
Expand All @@ -74,15 +88,15 @@ export abstract class AbstractEventProcessor implements EventProcessor {
const promises = this.groupEvents(buffer).map(eventGroup => {
const formattedEvent = this.formatEvents(eventGroup)

return new Promise((resolve) => {
return new Promise(resolve => {
this.dispatcher.dispatchEvent(formattedEvent, () => {
resolve()
})

if (this.notificationCenter) {
this.notificationCenter.sendNotifications(
NOTIFICATION_TYPES.LOG_EVENT,
formattedEvent
formattedEvent,
)
}
})
Expand Down