Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

awslogs: Record log line insert order for sorting #24814

Merged
merged 2 commits into from
Aug 12, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
45 changes: 33 additions & 12 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ type regionFinder interface {
Region() (string, error)
}

type byTimestamp []*cloudwatchlogs.InputLogEvent
type wrappedEvent struct {
inputLogEvent *cloudwatchlogs.InputLogEvent
insertOrder int
}
type byTimestamp []wrappedEvent

// init registers the awslogs driver
func init() {
Expand Down Expand Up @@ -229,7 +233,7 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// calculations.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []*cloudwatchlogs.InputLogEvent
var events []wrappedEvent
bytes := 0
for {
select {
Expand Down Expand Up @@ -258,9 +262,12 @@ func (l *logStream) collectBatch() {
events = events[:0]
bytes = 0
}
events = append(events, &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
events = append(events, wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
}
Expand All @@ -271,14 +278,17 @@ func (l *logStream) collectBatch() {
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
func (l *logStream) publishBatch(events []wrappedEvent) {
if len(events) == 0 {
return
}

// events in a batch must be sorted by timestamp
// see http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
sort.Sort(byTimestamp(events))
cwEvents := unwrapEvents(events)

nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
nextSequenceToken, err := l.putLogEvents(cwEvents, l.sequenceToken)

if err != nil {
if awsErr, ok := err.(awserr.Error); ok {
Expand All @@ -297,7 +307,7 @@ func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
// sequence code is bad, grab the correct one and retry
parts := strings.Split(awsErr.Message(), " ")
token := parts[len(parts)-1]
nextSequenceToken, err = l.putLogEvents(events, &token)
nextSequenceToken, err = l.putLogEvents(cwEvents, &token)
}
}
}
Expand Down Expand Up @@ -360,11 +370,14 @@ func (slice byTimestamp) Len() int {
// required by the sort.Interface interface.
func (slice byTimestamp) Less(i, j int) bool {
iTimestamp, jTimestamp := int64(0), int64(0)
if slice != nil && slice[i].Timestamp != nil {
iTimestamp = *slice[i].Timestamp
if slice != nil && slice[i].inputLogEvent.Timestamp != nil {
iTimestamp = *slice[i].inputLogEvent.Timestamp
}
if slice != nil && slice[j].inputLogEvent.Timestamp != nil {
jTimestamp = *slice[j].inputLogEvent.Timestamp
}
if slice != nil && slice[j].Timestamp != nil {
jTimestamp = *slice[j].Timestamp
if iTimestamp == jTimestamp {
return slice[i].insertOrder < slice[j].insertOrder
}
return iTimestamp < jTimestamp
}
Expand All @@ -374,3 +387,11 @@ func (slice byTimestamp) Less(i, j int) bool {
func (slice byTimestamp) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

func unwrapEvents(events []wrappedEvent) []*cloudwatchlogs.InputLogEvent {
cwEvents := []*cloudwatchlogs.InputLogEvent{}
for _, input := range events {
cwEvents = append(cwEvents, input.inputLogEvent)
}
return cwEvents
}
92 changes: 79 additions & 13 deletions daemon/logger/awslogs/cloudwatchlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"net/http"
"reflect"
"runtime"
"strings"
"testing"
Expand Down Expand Up @@ -149,10 +150,11 @@ func TestPublishBatchSuccess(t *testing.T) {
NextSequenceToken: aws.String(nextSequenceToken),
},
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -176,7 +178,7 @@ func TestPublishBatchSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand All @@ -193,9 +195,11 @@ func TestPublishBatchError(t *testing.T) {
errorResult: errors.New("Error!"),
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand Down Expand Up @@ -225,9 +229,11 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
},
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -252,7 +258,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}

Expand All @@ -269,7 +275,7 @@ func TestPublishBatchInvalidSeqSuccess(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand All @@ -286,9 +292,11 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
errorResult: awserr.New(dataAlreadyAcceptedCode, "use token token", nil),
}

events := []*cloudwatchlogs.InputLogEvent{
events := []wrappedEvent{
{
Message: aws.String(logline),
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(logline),
},
},
}

Expand All @@ -313,7 +321,7 @@ func TestPublishBatchAlreadyAccepted(t *testing.T) {
if len(argument.LogEvents) != 1 {
t.Errorf("Expected LogEvents to contain 1 element, but contains %d", len(argument.LogEvents))
}
if argument.LogEvents[0] != events[0] {
if argument.LogEvents[0] != events[0].inputLogEvent {
t.Error("Expected event to equal input")
}
}
Expand Down Expand Up @@ -625,3 +633,61 @@ func TestCollectBatchMaxTotalBytes(t *testing.T) {
t.Errorf("Expected message to be %s but was %s", "B", message[len(message)-1:])
}
}

func TestCollectBatchWithDuplicateTimestamps(t *testing.T) {
mockClient := newMockClient()
stream := &logStream{
client: mockClient,
logGroupName: groupName,
logStreamName: streamName,
sequenceToken: aws.String(sequenceToken),
messages: make(chan *logger.Message),
}
mockClient.putLogEventsResult <- &putLogEventsResult{
successResult: &cloudwatchlogs.PutLogEventsOutput{
NextSequenceToken: aws.String(nextSequenceToken),
},
}
ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

go stream.collectBatch()

times := maximumLogEventsPerPut
expectedEvents := []*cloudwatchlogs.InputLogEvent{}
timestamp := time.Now()
for i := 0; i < times; i++ {
line := fmt.Sprintf("%d", i)
if i%2 == 0 {
timestamp.Add(1 * time.Nanosecond)
}
stream.Log(&logger.Message{
Line: []byte(line),
Timestamp: timestamp,
})
expectedEvents = append(expectedEvents, &cloudwatchlogs.InputLogEvent{
Message: aws.String(line),
Timestamp: aws.Int64(timestamp.UnixNano() / int64(time.Millisecond)),
})
}

ticks <- time.Time{}
stream.Close()

argument := <-mockClient.putLogEventsArgument
if argument == nil {
t.Fatal("Expected non-nil PutLogEventsInput")
}
if len(argument.LogEvents) != times {
t.Errorf("Expected LogEvents to contain %d elements, but contains %d", times, len(argument.LogEvents))
}
for i := 0; i < times; i++ {
if !reflect.DeepEqual(*argument.LogEvents[i], *expectedEvents[i]) {
t.Errorf("Expected event to be %v but was %v", *expectedEvents[i], *argument.LogEvents[i])
}
}
}