Skip to content

Commit

Permalink
Handle concurrency errors (see #31)
Browse files Browse the repository at this point in the history
  • Loading branch information
timdp committed Sep 21, 2016
1 parent e9ccbe0 commit 93891cc
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 14 deletions.
17 changes: 14 additions & 3 deletions src/relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export default class Relay extends EventEmitter {
const num = batch.length
debug(`submit: submitting ${num} item(s)`)
return this._client.submit(batch)
.then(() => this._onSubmitted(num), (err) => this._onError(err))
.then(() => this._onSubmitted(num), (err) => this._onError(err, num))
.then(() => this._scheduleSubmission())
}

Expand All @@ -60,8 +60,19 @@ export default class Relay extends EventEmitter {
this._queue.remove(num)
}

_onError (err) {
_onError (err, num) {
debug('onError', {error: err})
this.emit('error', err)
// Expected errors:
// - DataAlreadyAcceptedException
// Message: "The given batch of log events has already been accepted."
// Action: Assume the request got replayed and remove the batch.
// - InvalidSequenceTokenException
// Message: "The given sequenceToken is invalid."
// Action: Keep the items in the queue and retry next time.
if (err.code === 'DataAlreadyAcceptedException') {
this._queue.remove(num)
} else if (err.code !== 'InvalidSequenceTokenException') {
this.emit('error', err)
}
}
}
17 changes: 10 additions & 7 deletions test/lib/client-mock.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
export default class MockClient {
constructor (failures = 0) {
constructor (failures = []) {
this._submitted = []
this._failures = failures
this._failures = failures.slice()
}

submit (batch) {
this._submitted = this._submitted.concat(batch)
if (this._failures > 0) {
this._failures--
return Promise.reject(new Error('FAIL'))
if (this._failures.length === 0) {
this._submitted = this._submitted.concat(batch)
return Promise.resolve()
} else {
const code = this._failures.shift()
const error = new Error(code)
error.code = code
return Promise.reject(error)
}
return Promise.resolve()
}

get submitted () {
Expand Down
31 changes: 27 additions & 4 deletions test/unit/relay.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,39 @@ describe('Relay', () => {

it('emits an error event', async () => {
const submissionInterval = 50
const failures = 3
const retries = 2
const failures = ['FAIL', 'FAIL', 'FAIL']
const spy = sinon.spy()
const client = new ClientMock(failures)
const relay = new Relay(client, {submissionInterval})
relay.on('error', spy)
relay.start()
relay.submit({})
await delay(submissionInterval * (failures + retries) * 1.1)
expect(spy.callCount).to.equal(failures)
await delay(submissionInterval * failures.length * 1.1)
expect(spy.callCount).to.equal(failures.length)
})

it('silently handles a DataAlreadyAcceptedException error', async () => {
const submissionInterval = 50
const failures = ['DataAlreadyAcceptedException']
const spy = sinon.spy()
const client = new ClientMock(failures)
const relay = new Relay(client, {submissionInterval})
relay.on('error', spy)
relay.start()
relay.submit({})
await delay(submissionInterval * failures.length * 1.1)
expect(spy.callCount).to.equal(0)
})

it('handles InvalidSequenceTokenException errors by retrying', async () => {
const submissionInterval = 50
const failures = ['InvalidSequenceTokenException', 'InvalidSequenceTokenException']
const client = new ClientMock(failures)
const relay = new Relay(client, {submissionInterval})
relay.start()
relay.submit({})
await delay(submissionInterval * (failures.length + 1) * 1.1)
expect(client.submitted.length).to.equal(1)
})
})
})

0 comments on commit 93891cc

Please sign in to comment.