Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve logging of long log lines #22982

Merged
merged 2 commits into from
Jul 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
l.messages <- msg
// buffer up the data, making sure to copy the Line data
l.messages <- logger.CopyMessage(msg)
}
return nil
}
Expand Down
82 changes: 66 additions & 16 deletions daemon/logger/copier.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package logger

import (
"bufio"
"bytes"
"io"
"sync"
Expand All @@ -10,8 +9,13 @@ import (
"github.com/Sirupsen/logrus"
)

const (
bufSize = 16 * 1024
readSize = 2 * 1024
)

// Copier can copy logs from specified sources to Logger and attach Timestamp.
// Writes are concurrent, so you need implement some sync in your logger
// Writes are concurrent, so you need implement some sync in your logger.
type Copier struct {
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader
Expand Down Expand Up @@ -39,30 +43,76 @@ func (c *Copier) Run() {

func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
reader := bufio.NewReader(src)
buf := make([]byte, bufSize)
n := 0
eof := false
msg := &Message{Source: name}

for {
select {
case <-c.closed:
return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})

// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
// Work out how much more data we are okay with reading this time.
upto := n + readSize
if upto > cap(buf) {
upto = cap(buf)
}

if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
// Try to read that data.
if upto > n {
read, err := src.Read(buf[n:upto])
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
return
}
eof = true
}
n += read
}
// If we have no data to log, and there's no more coming, we're done.
if n == 0 && eof {
return
}
// Break up the data that we've buffered up into lines, and log each in turn.
p := 0
for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
msg.Line = buf[p : p+q]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little not comfortable with passing part of a slice to Log. I'm not sure about current loggers if they can stash messages and send them later, but that would lead to messages corruption.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could allocate msg.Line and copy the slice into it every time, at the cost of a hit to speed and memory usage. Some of the log driver internals are complicated enough that I can't say for sure that we don't need to worry about that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, dug into it.

  • awslogs buffers Message structures on a channel and processes multiple Messages in batches in a separate goroutine. It could be tripped up by using slices.
  • etwlogs uses the slice to build a string using fmt.Sprintf, so it'd be fine.
  • fluentd copies the byte slice into a string immediately, so it'd be fine.
  • gcplogs copies the byte slice into a string immediately, so it'd be fine.
  • gelf copies the byte slice into a string immediately, so it'd be fine.
  • journald copies the byte slice into a string immediately, so it'd be fine.
  • jsonfilelog marshals the byte slice into a JSON blob immediately, so it'd be fine.
  • splunk copies the byte slice into a string immediately, so it'd be fine.
  • syslog copies the byte slice into a string immediately, so it'd be fine.

I'm tempted to make awslogs copy the data, and let the rest use slices, because otherwise we're copying the log data twice for most log drivers (once in Copier, and again in the log driver).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ok with this, let's document this also.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added notes around the definition of Message and the new CopyMessage function, and a very short note in the awslog driver where it calls CopyMessage(). @LK4D4 is this what you were thinking, or are there other places this should be called out?

msg.Timestamp = time.Now().UTC()
msg.Partial = false
select {
case <-c.closed:
return
default:
if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
}
p += q + 1
}
// If there's no more coming, or the buffer is full but
// has no newlines, log whatever we haven't logged yet,
// noting that it's a partial log line.
if eof || (p == 0 && n == len(buf)) {
if p < n {
msg.Line = buf[p:n]
msg.Timestamp = time.Now().UTC()
msg.Partial = true
if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for avoiding massive error lines

const maxErrors := 256
errors := 0
// ...
if errors < maxErrors {
    logrus.Errorf(theError)
    errors += 1
    if errors == maxErrors {
        logrus.Warnf("reached the limit")
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow your meaning here. Are you looking at having dockerd stop logging "Failed to log msg..." errors once some number of them have been encountered?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I understand, then. I think that belongs in a different PR.

}
p = 0
n = 0
}
if eof {
return
}
}
// Move any unlogged data to the front of the buffer in preparation for another read.
if p > 0 {
copy(buf[0:], buf[p:n])
n -= p
}
}
}
}
Expand Down
92 changes: 92 additions & 0 deletions daemon/logger/copier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package logger
import (
"bytes"
"encoding/json"
"fmt"
"io"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -116,3 +118,93 @@ func TestCopierSlow(t *testing.T) {
case <-wait:
}
}

type BenchmarkLoggerDummy struct {
}

func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil }

func (l *BenchmarkLoggerDummy) Close() error { return nil }

func (l *BenchmarkLoggerDummy) Name() string { return "dummy" }

func BenchmarkCopier64(b *testing.B) {
benchmarkCopier(b, 1<<6)
}
func BenchmarkCopier128(b *testing.B) {
benchmarkCopier(b, 1<<7)
}
func BenchmarkCopier256(b *testing.B) {
benchmarkCopier(b, 1<<8)
}
func BenchmarkCopier512(b *testing.B) {
benchmarkCopier(b, 1<<9)
}
func BenchmarkCopier1K(b *testing.B) {
benchmarkCopier(b, 1<<10)
}
func BenchmarkCopier2K(b *testing.B) {
benchmarkCopier(b, 1<<11)
}
func BenchmarkCopier4K(b *testing.B) {
benchmarkCopier(b, 1<<12)
}
func BenchmarkCopier8K(b *testing.B) {
benchmarkCopier(b, 1<<13)
}
func BenchmarkCopier16K(b *testing.B) {
benchmarkCopier(b, 1<<14)
}
func BenchmarkCopier32K(b *testing.B) {
benchmarkCopier(b, 1<<15)
}
func BenchmarkCopier64K(b *testing.B) {
benchmarkCopier(b, 1<<16)
}
func BenchmarkCopier128K(b *testing.B) {
benchmarkCopier(b, 1<<17)
}
func BenchmarkCopier256K(b *testing.B) {
benchmarkCopier(b, 1<<18)
}

func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader {
r, w, err := os.Pipe()
if err != nil {
b.Fatal(err)
return nil
}
go func() {
for i := 0; i < iterations; i++ {
time.Sleep(delay)
if n, err := w.Write(buf); err != nil || n != len(buf) {
if err != nil {
b.Fatal(err)
}
b.Fatal(fmt.Errorf("short write"))
}
}
w.Close()
}()
return r
}

func benchmarkCopier(b *testing.B, length int) {
b.StopTimer()
buf := []byte{'A'}
for len(buf) < length {
buf = append(buf, buf...)
}
buf = append(buf[:length-1], []byte{'\n'}...)
b.StartTimer()
for i := 0; i < b.N; i++ {
c := NewCopier(
map[string]io.Reader{
"buffer": piped(b, 10, time.Nanosecond, buf),
},
&BenchmarkLoggerDummy{})
c.Run()
c.Wait()
c.Close()
}
}
11 changes: 9 additions & 2 deletions daemon/logger/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error {
}

func (s *journald) Log(msg *logger.Message) error {
vars := map[string]string{}
for k, v := range s.vars {
vars[k] = v
}
if msg.Partial {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}
if msg.Source == "stderr" {
return journal.Send(string(msg.Line), journal.PriErr, s.vars)
return journal.Send(string(msg.Line), journal.PriErr, vars)
}
return journal.Send(string(msg.Line), journal.PriInfo, s.vars)
return journal.Send(string(msg.Line), journal.PriInfo, vars)
}

func (s *journald) Name() string {
Expand Down
15 changes: 11 additions & 4 deletions daemon/logger/journald/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ package journald
// #include <time.h>
// #include <unistd.h>
//
//static int get_message(sd_journal *j, const char **msg, size_t *length)
//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
//{
// int rc;
// size_t plength;
// *msg = NULL;
// *length = 0;
// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
// rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
// if (rc == 0) {
// if (*length > 8) {
Expand Down Expand Up @@ -167,7 +171,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
var msg, data, cursor *C.char
var length C.size_t
var stamp C.uint64_t
var priority C.int
var priority, partial C.int

// Walk the journal from here forward until we run out of new entries.
drain:
Expand All @@ -183,15 +187,18 @@ drain:
}
}
// Read and send the logged message, if there is one to read.
i := C.get_message(j, &msg, &length)
i := C.get_message(j, &msg, &length, &partial)
if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
// Read the entry's timestamp.
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
break
}
// Set up the time and text of the entry.
timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...)
line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
if partial == 0 {
line = append(line, "\n"...)
}
// Recover the stream name by mapping
// from the journal priority back to
// the stream that we would have
Expand Down
6 changes: 5 additions & 1 deletion daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
return err
}
l.mu.Lock()
logline := msg.Line
if !msg.Partial {
logline = append(msg.Line, '\n')
}
err = (&jsonlog.JSONLogs{
Log: append(msg.Line, '\n'),
Log: logline,
Stream: msg.Source,
Created: timestamp,
RawAttrs: l.extra,
Expand Down
23 changes: 22 additions & 1 deletion daemon/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,33 @@ const (
logWatcherBufferSize = 4096
)

// Message is datastructure that represents record from some container.
// Message is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
type Message struct {
Line []byte
Source string
Timestamp time.Time
Attrs LogAttributes
Partial bool
}

// CopyMessage creates a copy of the passed-in Message which will remain
// unchanged if the original is changed. Log drivers which buffer Messages
// rather than dispatching them during their Log() method should use this
// function to obtain a Message whose Line member's contents won't change.
func CopyMessage(msg *Message) *Message {
m := new(Message)
m.Line = make([]byte, len(msg.Line))
copy(m.Line, msg.Line)
m.Source = msg.Source
m.Timestamp = msg.Timestamp
m.Partial = msg.Partial
m.Attrs = make(LogAttributes)
for k, v := range m.Attrs {
m.Attrs[k] = v
}
return m
}

// LogAttributes is used to hold the extra attributes available in the log message
Expand Down