From af439c7a8da878450d0e5583935a1c4208fa92ad Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Mon, 6 Jun 2016 11:50:09 -0400 Subject: [PATCH 1/2] Add a benchmark for logger.Copier Add a benchmark for measuring how the logger.Copier implementation handles logged lines of sizes ranging up from 64 bytes to 256KB. Signed-off-by: Nalin Dahyabhai --- daemon/logger/copier_test.go | 92 ++++++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) diff --git a/daemon/logger/copier_test.go b/daemon/logger/copier_test.go index 69225e9a7b69f..e9e9bb45cf684 100644 --- a/daemon/logger/copier_test.go +++ b/daemon/logger/copier_test.go @@ -3,7 +3,9 @@ package logger import ( "bytes" "encoding/json" + "fmt" "io" + "os" "sync" "testing" "time" @@ -116,3 +118,93 @@ func TestCopierSlow(t *testing.T) { case <-wait: } } + +type BenchmarkLoggerDummy struct { +} + +func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil } + +func (l *BenchmarkLoggerDummy) Close() error { return nil } + +func (l *BenchmarkLoggerDummy) Name() string { return "dummy" } + +func BenchmarkCopier64(b *testing.B) { + benchmarkCopier(b, 1<<6) +} +func BenchmarkCopier128(b *testing.B) { + benchmarkCopier(b, 1<<7) +} +func BenchmarkCopier256(b *testing.B) { + benchmarkCopier(b, 1<<8) +} +func BenchmarkCopier512(b *testing.B) { + benchmarkCopier(b, 1<<9) +} +func BenchmarkCopier1K(b *testing.B) { + benchmarkCopier(b, 1<<10) +} +func BenchmarkCopier2K(b *testing.B) { + benchmarkCopier(b, 1<<11) +} +func BenchmarkCopier4K(b *testing.B) { + benchmarkCopier(b, 1<<12) +} +func BenchmarkCopier8K(b *testing.B) { + benchmarkCopier(b, 1<<13) +} +func BenchmarkCopier16K(b *testing.B) { + benchmarkCopier(b, 1<<14) +} +func BenchmarkCopier32K(b *testing.B) { + benchmarkCopier(b, 1<<15) +} +func BenchmarkCopier64K(b *testing.B) { + benchmarkCopier(b, 1<<16) +} +func BenchmarkCopier128K(b *testing.B) { + benchmarkCopier(b, 1<<17) +} +func BenchmarkCopier256K(b *testing.B) { + benchmarkCopier(b, 1<<18) +} + +func piped(b *testing.B, iterations int, delay time.Duration, buf []byte) io.Reader { + r, w, err := os.Pipe() + if err != nil { + b.Fatal(err) + return nil + } + go func() { + for i := 0; i < iterations; i++ { + time.Sleep(delay) + if n, err := w.Write(buf); err != nil || n != len(buf) { + if err != nil { + b.Fatal(err) + } + b.Fatal(fmt.Errorf("short write")) + } + } + w.Close() + }() + return r +} + +func benchmarkCopier(b *testing.B, length int) { + b.StopTimer() + buf := []byte{'A'} + for len(buf) < length { + buf = append(buf, buf...) + } + buf = append(buf[:length-1], []byte{'\n'}...) + b.StartTimer() + for i := 0; i < b.N; i++ { + c := NewCopier( + map[string]io.Reader{ + "buffer": piped(b, 10, time.Nanosecond, buf), + }, + &BenchmarkLoggerDummy{}) + c.Run() + c.Wait() + c.Close() + } +} From 513ec73831269947d38a644c278ce3cac36783b2 Mon Sep 17 00:00:00 2001 From: Nalin Dahyabhai Date: Tue, 24 May 2016 14:12:47 -0400 Subject: [PATCH 2/2] Improve logging of long log lines This change updates how we handle long lines of output from the container. The previous logic used a bufio reader to read entire lines of output from the container through an intermediate BytesPipe, and that allowed the container to cause dockerd to consume an unconstrained amount of memory as it attempted to collect a whole line of output, by outputting data without newlines. To avoid that, we replace the bufio reader with our own buffering scheme that handles log lines up to 16k in length, breaking up anything longer than that into multiple chunks. If we can dispense with noting this detail properly at the end of output, we can switch from using ReadBytes() to using ReadLine() instead. We add a field ("Partial") to the log message structure to flag when we pass data to the log driver that did not end with a newline. The Line member of Message structures that we pass to log drivers is now a slice into data which can be overwritten between calls to the log driver's Log() method, so drivers which batch up Messages before processing them need to take additional care: we add a function (logger.CopyMessage()) that can be used to create a deep copy of a Message structure, and modify the awslogs driver to use it. We update the jsonfile log driver to append a "\n" to the data that it logs to disk only when the Partial flag is false (it previously did so unconditionally), to make its "logs" output correctly reproduce the data as we received it. Likewise, we modify the journald log driver to add a data field with value CONTAINER_PARTIAL_MESSAGE=true to entries when the Partial flag is true, and update its "logs" reader to refrain from appending a "\n" to the data that it retrieves if it does not see this field/value pair (it also previously did this unconditionally). Signed-off-by: Nalin Dahyabhai (github: nalind) --- daemon/logger/awslogs/cloudwatchlogs.go | 3 +- daemon/logger/copier.go | 82 +++++++++++++++++++----- daemon/logger/journald/journald.go | 11 +++- daemon/logger/journald/read.go | 15 +++-- daemon/logger/jsonfilelog/jsonfilelog.go | 6 +- daemon/logger/logger.go | 23 ++++++- 6 files changed, 115 insertions(+), 25 deletions(-) diff --git a/daemon/logger/awslogs/cloudwatchlogs.go b/daemon/logger/awslogs/cloudwatchlogs.go index 78a230fe8d4b0..8f59b27855b06 100644 --- a/daemon/logger/awslogs/cloudwatchlogs.go +++ b/daemon/logger/awslogs/cloudwatchlogs.go @@ -165,7 +165,8 @@ func (l *logStream) Log(msg *logger.Message) error { l.lock.RLock() defer l.lock.RUnlock() if !l.closed { - l.messages <- msg + // buffer up the data, making sure to copy the Line data + l.messages <- logger.CopyMessage(msg) } return nil } diff --git a/daemon/logger/copier.go b/daemon/logger/copier.go index 9abb59a176ecd..ecb4ed03b69c5 100644 --- a/daemon/logger/copier.go +++ b/daemon/logger/copier.go @@ -1,7 +1,6 @@ package logger import ( - "bufio" "bytes" "io" "sync" @@ -10,8 +9,13 @@ import ( "github.com/Sirupsen/logrus" ) +const ( + bufSize = 16 * 1024 + readSize = 2 * 1024 +) + // Copier can copy logs from specified sources to Logger and attach Timestamp. -// Writes are concurrent, so you need implement some sync in your logger +// Writes are concurrent, so you need implement some sync in your logger. type Copier struct { // srcs is map of name -> reader pairs, for example "stdout", "stderr" srcs map[string]io.Reader @@ -39,30 +43,76 @@ func (c *Copier) Run() { func (c *Copier) copySrc(name string, src io.Reader) { defer c.copyJobs.Done() - reader := bufio.NewReader(src) + buf := make([]byte, bufSize) + n := 0 + eof := false + msg := &Message{Source: name} for { select { case <-c.closed: return default: - line, err := reader.ReadBytes('\n') - line = bytes.TrimSuffix(line, []byte{'\n'}) - - // ReadBytes can return full or partial output even when it failed. - // e.g. it can return a full entry and EOF. - if err == nil || len(line) > 0 { - if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil { - logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr) - } + // Work out how much more data we are okay with reading this time. + upto := n + readSize + if upto > cap(buf) { + upto = cap(buf) } - - if err != nil { - if err != io.EOF { - logrus.Errorf("Error scanning log stream: %s", err) + // Try to read that data. + if upto > n { + read, err := src.Read(buf[n:upto]) + if err != nil { + if err != io.EOF { + logrus.Errorf("Error scanning log stream: %s", err) + return + } + eof = true } + n += read + } + // If we have no data to log, and there's no more coming, we're done. + if n == 0 && eof { return } + // Break up the data that we've buffered up into lines, and log each in turn. + p := 0 + for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) { + msg.Line = buf[p : p+q] + msg.Timestamp = time.Now().UTC() + msg.Partial = false + select { + case <-c.closed: + return + default: + if logErr := c.dst.Log(msg); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) + } + } + p += q + 1 + } + // If there's no more coming, or the buffer is full but + // has no newlines, log whatever we haven't logged yet, + // noting that it's a partial log line. + if eof || (p == 0 && n == len(buf)) { + if p < n { + msg.Line = buf[p:n] + msg.Timestamp = time.Now().UTC() + msg.Partial = true + if logErr := c.dst.Log(msg); logErr != nil { + logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr) + } + p = 0 + n = 0 + } + if eof { + return + } + } + // Move any unlogged data to the front of the buffer in preparation for another read. + if p > 0 { + copy(buf[0:], buf[p:n]) + n -= p + } } } } diff --git a/daemon/logger/journald/journald.go b/daemon/logger/journald/journald.go index 748dd8b24a181..e944116f359d1 100644 --- a/daemon/logger/journald/journald.go +++ b/daemon/logger/journald/journald.go @@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error { } func (s *journald) Log(msg *logger.Message) error { + vars := map[string]string{} + for k, v := range s.vars { + vars[k] = v + } + if msg.Partial { + vars["CONTAINER_PARTIAL_MESSAGE"] = "true" + } if msg.Source == "stderr" { - return journal.Send(string(msg.Line), journal.PriErr, s.vars) + return journal.Send(string(msg.Line), journal.PriErr, vars) } - return journal.Send(string(msg.Line), journal.PriInfo, s.vars) + return journal.Send(string(msg.Line), journal.PriInfo, vars) } func (s *journald) Name() string { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index bc009f61cf4cd..04370fdbc0ba0 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -12,11 +12,15 @@ package journald // #include // #include // -//static int get_message(sd_journal *j, const char **msg, size_t *length) +//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial) //{ // int rc; +// size_t plength; // *msg = NULL; // *length = 0; +// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true"); +// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length); +// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0)); // rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length); // if (rc == 0) { // if (*length > 8) { @@ -167,7 +171,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea var msg, data, cursor *C.char var length C.size_t var stamp C.uint64_t - var priority C.int + var priority, partial C.int // Walk the journal from here forward until we run out of new entries. drain: @@ -183,7 +187,7 @@ drain: } } // Read and send the logged message, if there is one to read. - i := C.get_message(j, &msg, &length) + i := C.get_message(j, &msg, &length, &partial) if i != -C.ENOENT && i != -C.EADDRNOTAVAIL { // Read the entry's timestamp. if C.sd_journal_get_realtime_usec(j, &stamp) != 0 { @@ -191,7 +195,10 @@ drain: } // Set up the time and text of the entry. timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000) - line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...) + line := C.GoBytes(unsafe.Pointer(msg), C.int(length)) + if partial == 0 { + line = append(line, "\n"...) + } // Recover the stream name by mapping // from the journal priority back to // the stream that we would have diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index 9faa4e02dba79..a429a08a4f22f 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error { return err } l.mu.Lock() + logline := msg.Line + if !msg.Partial { + logline = append(msg.Line, '\n') + } err = (&jsonlog.JSONLogs{ - Log: append(msg.Line, '\n'), + Log: logline, Stream: msg.Source, Created: timestamp, RawAttrs: l.extra, diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index fb8c9a7dee693..2e7b2265b78ff 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -25,12 +25,33 @@ const ( logWatcherBufferSize = 4096 ) -// Message is datastructure that represents record from some container. +// Message is datastructure that represents piece of output produced by some +// container. The Line member is a slice of an array whose contents can be +// changed after a log driver's Log() method returns. type Message struct { Line []byte Source string Timestamp time.Time Attrs LogAttributes + Partial bool +} + +// CopyMessage creates a copy of the passed-in Message which will remain +// unchanged if the original is changed. Log drivers which buffer Messages +// rather than dispatching them during their Log() method should use this +// function to obtain a Message whose Line member's contents won't change. +func CopyMessage(msg *Message) *Message { + m := new(Message) + m.Line = make([]byte, len(msg.Line)) + copy(m.Line, msg.Line) + m.Source = msg.Source + m.Timestamp = msg.Timestamp + m.Partial = msg.Partial + m.Attrs = make(LogAttributes) + for k, v := range m.Attrs { + m.Attrs[k] = v + } + return m } // LogAttributes is used to hold the extra attributes available in the log message