Skip to content

Commit

Permalink
awslogs: Record log line insert order for sorting
Browse files Browse the repository at this point in the history
Fixes #24775

Signed-off-by: Samuel Karp <skarp@amazon.com>
  • Loading branch information
samuelkarp committed Aug 3, 2016
1 parent 5ba6cab commit 443f251
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 25 deletions.
45 changes: 33 additions & 12 deletions daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -64,7 +64,11 @@ type regionFinder interface {
Region() (string, error)
}

type byTimestamp []*cloudwatchlogs.InputLogEvent
type wrappedEvent struct {
inputLogEvent *cloudwatchlogs.InputLogEvent
insertOrder int
}
type byTimestamp []wrappedEvent

// init registers the awslogs driver
func init() {
Expand Down Expand Up @@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// calculations.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []*cloudwatchlogs.InputLogEvent
var events []wrappedEvent
bytes := 0
for {
select {
Expand Down Expand Up @@ -258,9 +262,12 @@ func (l *logStream) collectBatch() {
events = events[:0]
bytes = 0
}
events = append(events, &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
events = append(events, wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
}
Expand All @@ -271,14 +278,17 @@ func (l *logStream) collectBatch() {
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 {
return
}

// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)

nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand All @@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
token := parts[len(parts)-1]
nextSequenceToken, err = l.putLogEvents(events, &token)
nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
}
}
}
Expand Down Expand Up @@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int {
// required by the sort.Interface interface.
func (slice byTimestamp) Less(i, j int) bool {
iTimestamp, jTimestamp := int64(0), int64(0)
if slice != nil && slice[i].Timestamp != nil {
iTimestamp = *slice[i].Timestamp
if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
iTimestamp = *slice[i].inputLogEvent.Timestamp
}
if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
jTimestamp = *slice[j].inputLogEvent.Timestamp
}
if slice != nil && slice[j].Timestamp != nil {
jTimestamp = *slice[j].Timestamp
if iTimestamp == jTimestamp {
return slice[i].insertOrder < slice[j].insertOrder
}
return iTimestamp < jTimestamp
}
Expand All @@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool {
func (slice byTimestamp) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
cwEvents := []*cloudwatchlogs.InputLogEvent{}
for _, input := range events {
cwEvents = append(cwEvents, input.inputLogEvent)
}
return cwEvents
}
33 changes: 20 additions & 13 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -150,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) {
NextSequenceToken: aws.String(nextSequenceToken),
},
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -177,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand All @@ -194,9 +195,11 @@ func TestPublishBatchError(t *testing.T) {
errorResult: errors.New("Error!"),
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand Down Expand Up @@ -226,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
},
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -253,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}

Expand All @@ -270,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand All @@ -287,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -314,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand Down

0 comments on commit 443f251

Please sign in to comment.