Skip to content

Commit

Permalink
Merge pull request #51 from rzhouac/dev
Browse files Browse the repository at this point in the history
[bug] the behavior in memory should be in consistent with durableStore
  • Loading branch information
Wedvich committed Mar 25, 2018
2 parents 796a79e + d2cfece commit 19c3478
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions src/batchedSink.ts
Expand Up @@ -64,7 +64,8 @@ export class BatchedSink implements Sink {
this.batchedEvents.push(...events);
this.storeEvents();
} else {
let cursor = this.options.maxSize - this.batchedEvents.length;
let cursor = this.options.maxSize - this.batchedEvents.length < 0 ? 0 :
this.options.maxSize - this.batchedEvents.length;
this.batchedEvents.push(...events.slice(0, cursor));
this.storeEvents();
while (cursor < events.length) {
Expand Down Expand Up @@ -94,13 +95,18 @@ export class BatchedSink implements Sink {
protected cycleBatch() {
clearTimeout(this.batchTimeout);
if (this.batchedEvents.length) {
const emitPromise = this.emitCore(this.batchedEvents.slice(0));
if (this.options.durableStore) {
const previousBatchKey = this.batchKey;
(emitPromise instanceof Promise ? emitPromise : Promise.resolve())
.then(() => this.options.durableStore.removeItem(previousBatchKey));
}
var processEvents = this.batchedEvents.slice(0);
this.batchedEvents.length = 0;
const emitPromise = this.emitCore(processEvents);
(emitPromise instanceof Promise ? emitPromise : Promise.resolve())
.then(() => {
if(this.options.durableStore){
const previousBatchKey = this.batchKey;
return this.options.durableStore.removeItem(previousBatchKey)
}
}).catch(() => {
this.batchedEvents.unshift(...processEvents)
})
}
this.batchKey = `${this.durableStorageKey}-${new Date().getTime()}`;
if (!isNaN(this.options.period) && this.options.period > 0) {
Expand Down

0 comments on commit 19c3478

Please sign in to comment.