Skip to content

Commit

Permalink
Added durability to batched sink
Browse files Browse the repository at this point in the history
  • Loading branch information
Wedvich committed Feb 1, 2017
1 parent e74edde commit 23ff5c8
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 18 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -289,10 +289,11 @@ class MySink extends BatchedSink {
}
```

The `options` object is optional, but can be used to modify the batching thresholds.
The `options` object is optional, but can be used to modify the batching thresholds or add durability to the sink.
It supports the following properties:

|Key|Description|Default|
|---|---|---|
|`durableStore`|An instance implementing the [Web Storage API](https://developer.mozilla.org/en-US/docs/Web/API/Web_Storage_API) interface (such as `localStorage` in the browser, or [node-localstorage](https://github.com/lmaccherone/node-localstorage) for Node.js applications). If this is set, it will be used as an intermediate store for events until they have been successfully flushed through the pipeline.|`null`|
|`maxSize`|The maximum number of events in a single batch. The sink will be flushed immediately when this limit is hit.|`100`|
|`period`|The interval for autmoatic flushing of batches, in seconds.|`10`|
35 changes: 33 additions & 2 deletions dist/structured-log.es6.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dist/structured-log.es6.js.map

Large diffs are not rendered by default.

35 changes: 33 additions & 2 deletions dist/structured-log.js

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion dist/structured-log.js.map

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "structured-log",
"version": "0.1.0",
"version": "0.2.0",
"description": "A structured logging framework for JavaScript, inspired by Serilog.",
"main": "dist/structured-log.js",
"jsnext:main": "dist/structured-log.es6.js",
Expand Down
50 changes: 44 additions & 6 deletions src/batchedSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Sink } from './sink';
import { MessageTemplate } from './messageTemplate';

export interface BatchedSinkOptions {

/**
* Maximum number of events to be sent in a single batch.
*/
Expand All @@ -13,38 +12,65 @@ export interface BatchedSinkOptions {
* Number of seconds to wait between checking for batches.
*/
period?: number;

/**
* {Storage} instance to be used for durable storage of log events.
*/
durableStore?: Storage;
}

const defaultBatchedSinkOptions: BatchedSinkOptions = {
export const defaultBatchedSinkOptions: BatchedSinkOptions = {
maxSize: 100,
period: 10
period: 5,
durableStore: null
};

export class BatchedSink implements Sink {
protected durableStorageKey: string = 'structured-log-batched-sink-durable-cache';

protected options: BatchedSinkOptions;
protected innerSink: Sink;
protected batchedEvents: LogEvent[];
private batchTimeout;
private batchKey;

constructor(innerSink?: Sink, options?: BatchedSinkOptions) {
this.innerSink = innerSink || null;
this.options = {
this.options = {
...defaultBatchedSinkOptions,
...(options || {})
};
this.batchedEvents = [];
this.cycleBatch();
if (this.options.durableStore) {
let initialBatch = [];
for (const key in this.options.durableStore) {
if (key.indexOf(this.durableStorageKey) === 0) {
const storedEvents = JSON.parse(this.options.durableStore.getItem(key))
.map(e => {
e.messageTemplate = new MessageTemplate(e.messageTemplate.raw);
return e;
});
initialBatch = initialBatch.concat(storedEvents);
this.options.durableStore.removeItem(key);
}
}
this.emit(initialBatch);
}
}

emit(events: LogEvent[]) {
if (this.batchedEvents.length + events.length <= this.options.maxSize) {
this.batchedEvents.push(...events);
this.storeEvents();
} else {
let cursor = this.options.maxSize - this.batchedEvents.length;
this.batchedEvents.push(...events.slice(0, cursor));
this.storeEvents();
while (cursor < events.length) {
this.cycleBatch();
this.batchedEvents.push(...events.slice(cursor, cursor = cursor + this.options.maxSize));
this.storeEvents();
}
}

Expand All @@ -57,7 +83,7 @@ export class BatchedSink implements Sink {
return corePromise instanceof Promise ? corePromise : Promise.resolve();
}

protected emitCore(events: LogEvent[]) {
protected emitCore(events: LogEvent[]): any {
return this.innerSink.emit(events);
}

Expand All @@ -68,11 +94,23 @@ export class BatchedSink implements Sink {
protected cycleBatch() {
clearTimeout(this.batchTimeout);
if (this.batchedEvents.length) {
this.emitCore(this.batchedEvents.slice(0));
const emitPromise = this.emitCore(this.batchedEvents.slice(0));
if (this.options.durableStore) {
const previousBatchKey = this.batchKey;
(emitPromise instanceof Promise ? emitPromise : Promise.resolve())
.then(() => this.options.durableStore.removeItem(previousBatchKey));
}
this.batchedEvents.length = 0;
}
this.batchKey = `${this.durableStorageKey}-${new Date().getTime()}`;
if (!isNaN(this.options.period) && this.options.period > 0) {
this.batchTimeout = setTimeout(() => this.cycleBatch(), this.options.period * 1000);
}
}

private storeEvents() {
if (this.options.durableStore) {
this.options.durableStore.setItem(this.batchKey, JSON.stringify(this.batchedEvents));
}
}
}
70 changes: 66 additions & 4 deletions test/batchedSink.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@ import * as TypeMoq from 'typemoq';
import { Logger } from '../src/logger';
import { LogEvent, LogEventLevel } from '../src/logEvent';
import { MessageTemplate } from '../src/messageTemplate';
import { BatchedSink } from '../src/batchedSink';
import { ConcreteSink } from './helpers';
import { BatchedSink, defaultBatchedSinkOptions } from '../src/batchedSink';
import { ConcreteSink, ConcreteStorage } from './helpers';

describe('BatchedSink', () => {
describe('constructor()', () => {
it('uses the default options if no options are passed', () => {
const innerSink = TypeMoq.Mock.ofType(ConcreteSink);
const batchedSink = new BatchedSink(innerSink.object);

expect(batchedSink).to.have.deep.property('options.maxSize', 100);
expect(batchedSink).to.have.deep.property('options.period', 10);
expect(batchedSink).to.have.deep.property('options.maxSize', defaultBatchedSinkOptions.maxSize);
expect(batchedSink).to.have.deep.property('options.period', defaultBatchedSinkOptions.period);
});

it('uses the passed options', () => {
Expand Down Expand Up @@ -182,4 +182,66 @@ describe('BatchedSink', () => {
});
});
});

describe('when it has a durable store', () => {
it('flushes any previously stored events on creation', () => {
const durableStore = new ConcreteStorage();
const initialEvents = [
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 1')),
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 2'))
];
durableStore.setItem('structured-log-batched-sink-durable-cache-123', JSON.stringify(initialEvents));

const emittedBatches = [];
const innerSink = TypeMoq.Mock.ofType(ConcreteSink);
innerSink.setup(m => m.emit(TypeMoq.It.isAny())).callback(batch => emittedBatches.push(batch));
innerSink.setup(m => m.flush()).returns(() => Promise.resolve());
const batchedSink = new BatchedSink(innerSink.object, {
period: 0,
maxSize: 2,
durableStore: <Storage> durableStore
});

return batchedSink.flush().then(() => {
expect(emittedBatches[0][0]).to.have.deep.property('messageTemplate.raw', 'Test 1');
expect(emittedBatches[0][1]).to.have.deep.property('messageTemplate.raw', 'Test 2');
});
});

it('stores events in a durable storage', () => {
const durableStore = new ConcreteStorage();
const batchedSink = new BatchedSink(null, {
period: 0,
maxSize: 3,
durableStore: <Storage> durableStore
});

batchedSink.emit([
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 1')),
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 2'))
]);

expect(durableStore.length).to.equal(1);
});

it('cleans up the stored events once they have been shipped', () => {
const durableStore = new ConcreteStorage();
const innerSink = TypeMoq.Mock.ofType(ConcreteSink);
innerSink.setup(m => m.flush()).returns(() => Promise.resolve());
const batchedSink = new BatchedSink(innerSink.object, {
period: 0,
maxSize: 2,
durableStore: <Storage> durableStore
});

batchedSink.emit([
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 1')),
new LogEvent('', LogEventLevel.information, new MessageTemplate('Test 2'))
]);

return batchedSink.flush().then(() => {
expect(durableStore.length).to.equal(0);
});
});
});
});
21 changes: 21 additions & 0 deletions test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,24 @@ export class ConcreteConsoleProxy implements ConsoleProxy {
debug(message?: any, ...properties: any[]) { }
log(message?: any, ...properties: any[]) { }
}

export class ConcreteStorage {
length: number = 0;

getItem(key: string): string {
return this[key] || null;
}

setItem(key: string, data: string) {
if (!this.getItem(key)) {
++this.length;
}
this[key] = data;
}

removeItem(key: string) {
this[key] = null;
delete this[key];
--this.length;
}
}

0 comments on commit 23ff5c8

Please sign in to comment.