diff --git a/README.md b/README.md index a22ce23..41c1c20 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,7 @@ winston.add(CloudWatchTransport, { createLogGroup: true, createLogStream: true, submissionInterval: 2000, + submissionRetryCount: 1, batchSize: 20, awsConfig: { accessKeyId: '...', diff --git a/src/cloudwatch-client.js b/src/cloudwatch-client.js index 95bb706..467fa3d 100644 --- a/src/cloudwatch-client.js +++ b/src/cloudwatch-client.js @@ -15,14 +15,14 @@ export default class CloudWatchClient { this._logStreamName = logStreamName this._options = defaults(options, { awsConfig: null, - maxSequenceTokenAge: -1, formatLog: null, formatLogItem: null, createLogGroup: false, - createLogStream: false + createLogStream: false, + submissionRetryCount: 1 }) this._formatter = new CloudWatchEventFormatter(this._options) - this._sequenceTokenInfo = null + this._sequenceToken = null this._client = new AWS.CloudWatchLogs(this._options.awsConfig) this._initializing = null } @@ -30,9 +30,7 @@ export default class CloudWatchClient { submit (batch) { debug('submit', {batch}) return this._initialize() - .then(() => this._getSequenceToken()) - .then((sequenceToken) => this._putLogEvents(batch, sequenceToken)) - .then(({nextSequenceToken}) => this._storeSequenceToken(nextSequenceToken)) + .then(() => this._doSubmit(batch, 0)) } _initialize () { @@ -67,12 +65,43 @@ export default class CloudWatchClient { } _allowResourceAlreadyExistsException (err) { - if (err.code !== 'ResourceAlreadyExistsException') { - throw err + return (err.code === 'ResourceAlreadyExistsException') + ? Promise.resolve() + : Promise.reject(err) + } + + _doSubmit (batch, retryCount) { + return this._maybeUpdateSequenceToken() + .then(() => this._putLogEventsAndStoreSequenceToken(batch)) + .catch((err) => this._handlePutError(err, batch, retryCount)) + } + + _maybeUpdateSequenceToken () { + return (this._sequenceToken != null) + ? Promise.resolve() + : this._fetchAndStoreSequenceToken() + } + + _handlePutError (err, batch, retryCount) { + if (err.code !== 'InvalidSequenceTokenException') { + return Promise.reject(err) } + if (retryCount >= this._options.submissionRetryCount) { + const error = new Error('Invalid sequence token, will retry') + error.code = 'InvalidSequenceTokenException' + return Promise.reject(error) + } + this._sequenceToken = null + return this._doSubmit(batch, retryCount + 1) + } + + _putLogEventsAndStoreSequenceToken (batch) { + return this._putLogEvents(batch) + .then(({nextSequenceToken}) => this._storeSequenceToken(nextSequenceToken)) } - _putLogEvents (batch, sequenceToken) { + _putLogEvents (batch) { + const sequenceToken = this._sequenceToken debug('putLogEvents', {batch, sequenceToken}) const params = { logGroupName: this._logGroupName, @@ -83,14 +112,6 @@ export default class CloudWatchClient { return this._client.putLogEvents(params).promise() } - _getSequenceToken () { - const now = +new Date() - const isStale = (!this._sequenceTokenInfo || - this._sequenceTokenInfo.date + this._options.maxSequenceTokenAge < now) - return isStale ? this._fetchAndStoreSequenceToken() - : Promise.resolve(this._sequenceTokenInfo.sequenceToken) - } - _fetchAndStoreSequenceToken () { debug('fetchSequenceToken') return this._findLogStream() @@ -99,8 +120,7 @@ export default class CloudWatchClient { _storeSequenceToken (sequenceToken) { debug('storeSequenceToken', {sequenceToken}) - const date = +new Date() - this._sequenceTokenInfo = {sequenceToken, date} + this._sequenceToken = sequenceToken return sequenceToken } diff --git a/src/relay.js b/src/relay.js index 19fc947..05704a4 100644 --- a/src/relay.js +++ b/src/relay.js @@ -65,16 +65,12 @@ export default class Relay extends EventEmitter { _onError (err, batch) { debug('onError', {error: err}) - // Expected errors: - // - DataAlreadyAcceptedException - // Message: "The given batch of log events has already been accepted." - // Action: Assume the request got replayed and remove the batch. - // - InvalidSequenceTokenException - // Message: "The given sequenceToken is invalid." - // Action: Keep the items in the queue and retry next time. if (err.code === 'DataAlreadyAcceptedException') { + // Assume the request got replayed and remove the batch this._queue.remove(batch.length) - } else if (err.code !== 'InvalidSequenceTokenException') { + } else if (err.code === 'InvalidSequenceTokenException') { + // Keep retrying + } else { this.emit('error', err) } } diff --git a/test/unit/cloudwatch-client.spec.js b/test/unit/cloudwatch-client.spec.js index f480f56..23e6f1c 100644 --- a/test/unit/cloudwatch-client.spec.js +++ b/test/unit/cloudwatch-client.spec.js @@ -54,12 +54,21 @@ const createClient = (options) => { clientOptions: null, streamsStrategy: streamsStrategies.default, groupErrorCode: null, - streamErrorCode: false + streamErrorCode: false, + putRejectionCode: null }) const client = new CloudWatchClient(logGroupName, logStreamName, options.clientOptions) + let putPromise + if (options.putRejectionCode != null) { + const err = new Error() + err.code = options.putRejectionCode + putPromise = Promise.reject(err) + } else { + putPromise = Promise.resolve({nextSequenceToken: 'token42'}) + } sinon.stub(client._client, 'putLogEvents') - .returns(withPromise(Promise.resolve({nextSequenceToken: 'token42'}))) + .returns(withPromise(putPromise)) sinon.stub(client._client, 'createLogGroup') .returns(withPromise(options.groupErrorCode ? Promise.reject(createErrorWithCode(options.groupErrorCode)) @@ -103,37 +112,34 @@ describe('CloudWatchClient', () => { ).to.eventually.equal(3) }) - it('rejects if the log stream is not found in a single page', () => { + it('rejects after retrying upon InvalidSequenceTokenException', () => { const client = createClient({ - streamsStrategy: streamsStrategies.notFound + putRejectionCode: 'InvalidSequenceTokenException' }) const batch = createBatch(1) return expect( client.submit(batch) - ).to.be.rejected + ).to.be.rejectedWith('Invalid sequence token, will retry') }) - it('rejects if the log stream is not found in multiple pages', () => { + it('rejects if the log stream is not found in a single page', () => { const client = createClient({ - streamsStrategy: streamsStrategies.pagedNotFound + streamsStrategy: streamsStrategies.notFound }) const batch = createBatch(1) return expect( client.submit(batch) ).to.be.rejected }) - }) - describe('#options.maxSequenceTokenAge', () => { - it('caches the sequence token', () => { + it('rejects if the log stream is not found in multiple pages', () => { const client = createClient({ - clientOptions: {maxSequenceTokenAge: 1000} + streamsStrategy: streamsStrategies.pagedNotFound }) + const batch = createBatch(1) return expect( - client.submit(createBatch(1)) - .then(() => client.submit(createBatch(1))) - .then(() => client._client.describeLogStreams.calledOnce) - ).to.eventually.equal(true) + client.submit(batch) + ).to.be.rejected }) }) diff --git a/test/unit/relay.spec.js b/test/unit/relay.spec.js index 21b7aa0..67a7690 100644 --- a/test/unit/relay.spec.js +++ b/test/unit/relay.spec.js @@ -96,17 +96,5 @@ describe('Relay', () => { await delay(submissionInterval * failures.length * 1.1) expect(spy.callCount).to.equal(0) }) - - it('handles InvalidSequenceTokenException errors by retrying', async () => { - const submissionInterval = 50 - const failures = ['InvalidSequenceTokenException', 'InvalidSequenceTokenException'] - const client = new ClientMock(failures) - const relay = new Relay(client, {submissionInterval}) - relay.start() - relay.submit(createItem()) - expect(client.submitted.length).to.equal(0, 'Submitted too early') - await delay(submissionInterval * (failures.length + 1) * 1.1) - expect(client.submitted.length).to.equal(1, 'Not submitted') - }) }) })