From 443f251cf596006fe4cb621dcab955f7da0a2a00 Mon Sep 17 00:00:00 2001 From: Samuel Karp Date: Tue, 19 Jul 2016 11:32:12 -0700 Subject: [PATCH] awslogs: Record log line insert order for sorting Fixes https://github.com/docker/docker/issues/24775 Signed-off-by: Samuel Karp --- daemon/logger/awslogs/cloudwatchlogs.go | 45 ++++++++++++++------ daemon/logger/awslogs/cloudwatchlogs_test.go | 33 ++++++++------ 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 8f59b27855b06..796fe963808a3 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -64,7 +64,11 @@ type regionFinder interface { Region() (string, error) } -type byTimestamp []*cloudwatchlogs.InputLogEvent +type wrappedEvent struct { + inputLogEvent *cloudwatchlogs.InputLogEvent + insertOrder int +} +type byTimestamp []wrappedEvent // init registers the awslogs driver func init() { @@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker { // calculations. func (l *logStream) collectBatch() { timer := newTicker(batchPublishFrequency) - var events []*cloudwatchlogs.InputLogEvent + var events []wrappedEvent bytes := 0 for { select { @@ -258,9 +262,12 @@ func (l *logStream) collectBatch() { events = events[:0] bytes = 0 } - events = append(events, &cloudwatchlogs.InputLogEvent{ - Message: aws.String(string(line)), - Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), + events = append(events, wrappedEvent{ + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(string(line)), + Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)), + }, + insertOrder: len(events), }) bytes += (lineBytes + perEventBytes) } @@ -271,14 +278,17 @@ func (l *logStream) collectBatch() { // publishBatch calls PutLogEvents for a given set of InputLogEvents, // accounting for sequencing requirements (each request must reference the // sequence token returned by the previous request). -func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { +func (l *logStream) publishBatch(events []wrappedEvent) { if len(events) == 0 { return } + // events in a batch must be sorted by timestamp + // see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html sort.Sort(byTimestamp(events)) + cwEvents := unwrapEvents(events) - nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken) + nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken) if err != nil { if awsErr, ok := err.(awserr.Error); ok { @@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) { // sequence code is bad, grab the correct one and retry parts := strings.Split(awsErr.Message(), " ") token := parts[len(parts)-1] - nextSequenceToken, err = l.putLogEvents(events, &token) + nextSequenceToken, err = l.putLogEvents(cwEvents, &token) } } } @@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int { // required by the sort.Interface interface. func (slice byTimestamp) Less(i, j int) bool { iTimestamp, jTimestamp := int64(0), int64(0) - if slice != nil && slice[i].Timestamp != nil { - iTimestamp = *slice[i].Timestamp + if slice != nil && slice[i].inputLogEvent.Timestamp != nil { + iTimestamp = *slice[i].inputLogEvent.Timestamp + } + if slice != nil && slice[j].inputLogEvent.Timestamp != nil { + jTimestamp = *slice[j].inputLogEvent.Timestamp } - if slice != nil && slice[j].Timestamp != nil { - jTimestamp = *slice[j].Timestamp + if iTimestamp == jTimestamp { + return slice[i].insertOrder < slice[j].insertOrder } return iTimestamp < jTimestamp } @@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool { func (slice byTimestamp) Swap(i, j int) { slice[i], slice[j] = slice[j], slice[i] } + +func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent { + cwEvents := []*cloudwatchlogs.InputLogEvent{} + for _, input := range events { + cwEvents = append(cwEvents, input.inputLogEvent) + } + return cwEvents +} diff --git a/daemon/logger/awslogs/cloudwatchlogs_test.go b/daemon/logger/awslogs/cloudwatchlogs_test.go index f6a0d7590cccb..8d0d9f4149c6d 100644 --- a/daemon/logger/awslogs/cloudwatchlogs_test.go +++ b/daemon/logger/awslogs/cloudwatchlogs_test.go @@ -150,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) { NextSequenceToken: aws.String(nextSequenceToken), }, } - - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -177,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } } @@ -194,9 +195,11 @@ func TestPublishBatchError(t *testing.T) { errorResult: errors.New("Error!"), } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -226,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { }, } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -253,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } @@ -270,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } } @@ -287,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil), } - events := []*cloudwatchlogs.InputLogEvent{ + events := []wrappedEvent{ { - Message: aws.String(logline), + inputLogEvent: &cloudwatchlogs.InputLogEvent{ + Message: aws.String(logline), + }, }, } @@ -314,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) { if len(argument.LogEvents) != 1 { t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents)) } - if argument.LogEvents[0] != events[0] { + if argument.LogEvents[0] != events[0].inputLogEvent { t.Error("Expected event to equal input") } }