Skip to content

Commit

Permalink
Remove undocumented maxSequenceTokenAge; retry upon invalid sequence …
Browse files Browse the repository at this point in the history
…token (see #71)
  • Loading branch information
timdp committed Dec 3, 2017
1 parent 8db73ba commit cd379c2
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 54 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ winston.add(CloudWatchTransport, {
createLogGroup: true,
createLogStream: true,
submissionInterval: 2000,
submissionRetryCount: 1,
batchSize: 20,
awsConfig: {
accessKeyId: '...',
Expand Down
58 changes: 39 additions & 19 deletions src/cloudwatch-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,22 @@ export default class CloudWatchClient {
this._logStreamName = logStreamName
this._options = defaults(options, {
awsConfig: null,
maxSequenceTokenAge: -1,
formatLog: null,
formatLogItem: null,
createLogGroup: false,
createLogStream: false
createLogStream: false,
submissionRetryCount: 1
})
this._formatter = new CloudWatchEventFormatter(this._options)
this._sequenceTokenInfo = null
this._sequenceToken = null
this._client = new AWS.CloudWatchLogs(this._options.awsConfig)
this._initializing = null
}

submit (batch) {
debug('submit', {batch})
return this._initialize()
.then(() => this._getSequenceToken())
.then((sequenceToken) => this._putLogEvents(batch, sequenceToken))
.then(({nextSequenceToken}) => this._storeSequenceToken(nextSequenceToken))
.then(() => this._doSubmit(batch, 0))
}

_initialize () {
Expand Down Expand Up @@ -67,12 +65,43 @@ export default class CloudWatchClient {
}

_allowResourceAlreadyExistsException (err) {
if (err.code !== 'ResourceAlreadyExistsException') {
throw err
return (err.code === 'ResourceAlreadyExistsException')
? Promise.resolve()
: Promise.reject(err)
}

_doSubmit (batch, retryCount) {
return this._maybeUpdateSequenceToken()
.then(() => this._putLogEventsAndStoreSequenceToken(batch))
.catch((err) => this._handlePutError(err, batch, retryCount))
}

_maybeUpdateSequenceToken () {
return (this._sequenceToken != null)
? Promise.resolve()
: this._fetchAndStoreSequenceToken()
}

_handlePutError (err, batch, retryCount) {
if (err.code !== 'InvalidSequenceTokenException') {
return Promise.reject(err)
}
if (retryCount >= this._options.submissionRetryCount) {
const error = new Error('Invalid sequence token, will retry')
error.code = 'InvalidSequenceTokenException'
return Promise.reject(error)
}
this._sequenceToken = null
return this._doSubmit(batch, retryCount + 1)
}

_putLogEventsAndStoreSequenceToken (batch) {
return this._putLogEvents(batch)
.then(({nextSequenceToken}) => this._storeSequenceToken(nextSequenceToken))
}

_putLogEvents (batch, sequenceToken) {
_putLogEvents (batch) {
const sequenceToken = this._sequenceToken
debug('putLogEvents', {batch, sequenceToken})
const params = {
logGroupName: this._logGroupName,
Expand All @@ -83,14 +112,6 @@ export default class CloudWatchClient {
return this._client.putLogEvents(params).promise()
}

_getSequenceToken () {
const now = +new Date()
const isStale = (!this._sequenceTokenInfo ||
this._sequenceTokenInfo.date + this._options.maxSequenceTokenAge < now)
return isStale ? this._fetchAndStoreSequenceToken()
: Promise.resolve(this._sequenceTokenInfo.sequenceToken)
}

_fetchAndStoreSequenceToken () {
debug('fetchSequenceToken')
return this._findLogStream()
Expand All @@ -99,8 +120,7 @@ export default class CloudWatchClient {

_storeSequenceToken (sequenceToken) {
debug('storeSequenceToken', {sequenceToken})
const date = +new Date()
this._sequenceTokenInfo = {sequenceToken, date}
this._sequenceToken = sequenceToken
return sequenceToken
}

Expand Down
12 changes: 4 additions & 8 deletions src/relay.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,12 @@ export default class Relay extends EventEmitter {

_onError (err, batch) {
debug('onError', {error: err})
// Expected errors:
// - DataAlreadyAcceptedException
// Message: "The given batch of log events has already been accepted."
// Action: Assume the request got replayed and remove the batch.
// - InvalidSequenceTokenException
// Message: "The given sequenceToken is invalid."
// Action: Keep the items in the queue and retry next time.
if (err.code === 'DataAlreadyAcceptedException') {
// Assume the request got replayed and remove the batch
this._queue.remove(batch.length)
} else if (err.code !== 'InvalidSequenceTokenException') {
} else if (err.code === 'InvalidSequenceTokenException') {
// Keep retrying
} else {
this.emit('error', err)
}
}
Expand Down
36 changes: 21 additions & 15 deletions test/unit/cloudwatch-client.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,21 @@ const createClient = (options) => {
clientOptions: null,
streamsStrategy: streamsStrategies.default,
groupErrorCode: null,
streamErrorCode: false
streamErrorCode: false,
putRejectionCode: null
})
const client = new CloudWatchClient(logGroupName, logStreamName,
options.clientOptions)
let putPromise
if (options.putRejectionCode != null) {
const err = new Error()
err.code = options.putRejectionCode
putPromise = Promise.reject(err)
} else {
putPromise = Promise.resolve({nextSequenceToken: 'token42'})
}
sinon.stub(client._client, 'putLogEvents')
.returns(withPromise(Promise.resolve({nextSequenceToken: 'token42'})))
.returns(withPromise(putPromise))
sinon.stub(client._client, 'createLogGroup')
.returns(withPromise(options.groupErrorCode
? Promise.reject(createErrorWithCode(options.groupErrorCode))
Expand Down Expand Up @@ -103,37 +112,34 @@ describe('CloudWatchClient', () => {
).to.eventually.equal(3)
})

it('rejects if the log stream is not found in a single page', () => {
it('rejects after retrying upon InvalidSequenceTokenException', () => {
const client = createClient({
streamsStrategy: streamsStrategies.notFound
putRejectionCode: 'InvalidSequenceTokenException'
})
const batch = createBatch(1)
return expect(
client.submit(batch)
).to.be.rejected
).to.be.rejectedWith('Invalid sequence token, will retry')
})

it('rejects if the log stream is not found in multiple pages', () => {
it('rejects if the log stream is not found in a single page', () => {
const client = createClient({
streamsStrategy: streamsStrategies.pagedNotFound
streamsStrategy: streamsStrategies.notFound
})
const batch = createBatch(1)
return expect(
client.submit(batch)
).to.be.rejected
})
})

describe('#options.maxSequenceTokenAge', () => {
it('caches the sequence token', () => {
it('rejects if the log stream is not found in multiple pages', () => {
const client = createClient({
clientOptions: {maxSequenceTokenAge: 1000}
streamsStrategy: streamsStrategies.pagedNotFound
})
const batch = createBatch(1)
return expect(
client.submit(createBatch(1))
.then(() => client.submit(createBatch(1)))
.then(() => client._client.describeLogStreams.calledOnce)
).to.eventually.equal(true)
client.submit(batch)
).to.be.rejected
})
})

Expand Down
12 changes: 0 additions & 12 deletions test/unit/relay.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,17 +96,5 @@ describe('Relay', () => {
await delay(submissionInterval * failures.length * 1.1)
expect(spy.callCount).to.equal(0)
})

it('handles InvalidSequenceTokenException errors by retrying', async () => {
const submissionInterval = 50
const failures = ['InvalidSequenceTokenException', 'InvalidSequenceTokenException']
const client = new ClientMock(failures)
const relay = new Relay(client, {submissionInterval})
relay.start()
relay.submit(createItem())
expect(client.submitted.length).to.equal(0, 'Submitted too early')
await delay(submissionInterval * (failures.length + 1) * 1.1)
expect(client.submitted.length).to.equal(1, 'Not submitted')
})
})
})

0 comments on commit cd379c2

Please sign in to comment.