Skip to content

Commit

Permalink
Improve logging of long log lines
Browse files Browse the repository at this point in the history
This change updates how we handle long lines of output from the
container.  The previous logic used a bufio reader to read entire lines
of output from the container through an intermediate BytesPipe, and that
allowed the container to cause dockerd to consume an unconstrained
amount of memory as it attempted to collect a whole line of output, by
outputting data without newlines.

To avoid that, we replace the bufio reader with our own buffering scheme
that handles log lines up to 16k in length, breaking up anything longer
than that into multiple chunks.  If we can dispense with noting this
detail properly at the end of output, we can switch from using
ReadBytes() to using ReadLine() instead.  We add a field ("Partial") to
the log message structure to flag when we pass data to the log driver
that did not end with a newline.

The Line member of Message structures that we pass to log drivers is now
a slice into data which can be overwritten between calls to the log
driver's Log() method, so drivers which batch up Messages before
processing them need to take additional care: we add a function
(logger.CopyMessage()) that can be used to create a deep copy of a
Message structure, and modify the awslogs driver to use it.

We update the jsonfile log driver to append a "\n" to the data that it
logs to disk only when the Partial flag is false (it previously did so
unconditionally), to make its "logs" output correctly reproduce the data
as we received it.

Likewise, we modify the journald log driver to add a data field with
value CONTAINER_PARTIAL_MESSAGE=true to entries when the Partial flag is
true, and update its "logs" reader to refrain from appending a "\n" to
the data that it retrieves if it does not see this field/value pair (it
also previously did this unconditionally).

fix http://code.huawei.com/docker/docker/issues/324
upsteam issue: moby#18057
fix DTS2017062307255
cherry-pick from: moby#22982
conflicts:
	daemon/logger/copier.go
	daemon/logger/journald/read.go
	daemon/logger/logger.go

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com> (github: nalind)
Signed-off-by: Lei Jitang <leijitang@huawei.com>
  • Loading branch information
nalind authored and coolljt0725 committed Jun 23, 2017
1 parent b71c76b commit 92ec75c
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 24 deletions.
3 changes: 2 additions & 1 deletion daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
l.messages <- msg
// buffer up the data, making sure to copy the Line data
l.messages <- logger.CopyMessage(msg)
}
return nil
}
Expand Down
82 changes: 67 additions & 15 deletions daemon/logger/copier.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package logger

import (
"bufio"
"bytes"
"io"
"sync"
Expand All @@ -10,6 +9,11 @@ import (
"github.com/Sirupsen/logrus"
)

const (
bufSize = 16 * 1024
readSize = 2 * 1024
)

// Copier can copy logs from specified sources to Logger and attach
// ContainerID and Timestamp.
// Writes are concurrent, so you need implement some sync in your logger
Expand Down Expand Up @@ -43,30 +47,78 @@ func (c *Copier) Run() {

func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
reader := bufio.NewReader(src)
buf := make([]byte, bufSize)
n := 0
eof := false
msg := &Message{Source: name}

for {
select {
case <-c.closed:
return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})

// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{ContainerID: c.cid, Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
// Work out how much more data we are okay with reading this time.
upto := n + readSize
if upto > cap(buf) {
upto = cap(buf)
}

if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
// Try to read that data.
if upto > n {
read, err := src.Read(buf[n:upto])
if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
return
}
eof = true
}
n += read
}
// If we have no data to log, and there's no more coming, we're done.
if n == 0 && eof {
return
}
// Break up the data that we've buffered up into lines, and log each in turn.
p := 0
for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
msg.Line = buf[p : p+q]
msg.Timestamp = time.Now().UTC()
msg.Partial = false
msg.ContainerID = c.cid
select {
case <-c.closed:
return
default:
if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
}
p += q + 1
}
// If there's no more coming, or the buffer is full but
// has no newlines, log whatever we haven't logged yet,
// noting that it's a partial log line.
if eof || (p == 0 && n == len(buf)) {
if p < n {
msg.Line = buf[p:n]
msg.ContainerID = c.cid
msg.Timestamp = time.Now().UTC()
msg.Partial = true
if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
p = 0
n = 0
}
if eof {
return
}
}
// Move any unlogged data to the front of the buffer in preparation for another read.
if p > 0 {
copy(buf[0:], buf[p:n])
n -= p
}
}
}
}
Expand Down
11 changes: 9 additions & 2 deletions daemon/logger/journald/journald.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,17 @@ func validateLogOpt(cfg map[string]string) error {
}

func (s *journald) Log(msg *logger.Message) error {
vars := map[string]string{}
for k, v := range s.vars {
vars[k] = v
}
if msg.Partial {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}
if msg.Source == "stderr" {
return journal.Send(string(msg.Line), journal.PriErr, s.vars)
return journal.Send(string(msg.Line), journal.PriErr, vars)
}
return journal.Send(string(msg.Line), journal.PriInfo, s.vars)
return journal.Send(string(msg.Line), journal.PriInfo, vars)
}

func (s *journald) Name() string {
Expand Down
15 changes: 11 additions & 4 deletions daemon/logger/journald/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ package journald
// #include <time.h>
// #include <unistd.h>
//
//static int get_message(sd_journal *j, const char **msg, size_t *length)
//static int get_message(sd_journal *j, const char **msg, size_t *length, int *partial)
//{
// int rc;
// size_t plength;
// *msg = NULL;
// *length = 0;
// plength = strlen("CONTAINER_PARTIAL_MESSAGE=true");
// rc = sd_journal_get_data(j, "CONTAINER_PARTIAL_MESSAGE", (const void **) msg, length);
// *partial = ((rc == 0) && (*length == plength) && (memcmp(*msg, "CONTAINER_PARTIAL_MESSAGE=true", plength) == 0));
// rc = sd_journal_get_data(j, "MESSAGE", (const void **) msg, length);
// if (rc == 0) {
// if (*length > 8) {
Expand Down Expand Up @@ -119,7 +123,7 @@ func (s *journald) drainJournal(logWatcher *logger.LogWatcher, config logger.Rea
var msg, cursor *C.char
var length C.size_t
var stamp C.uint64_t
var priority C.int
var priority, partial C.int

// Walk the journal from here forward until we run out of new entries.
drain:
Expand All @@ -135,15 +139,18 @@ drain:
}
}
// Read and send the logged message, if there is one to read.
i := C.get_message(j, &msg, &length)
i := C.get_message(j, &msg, &length, &partial)
if i != -C.ENOENT && i != -C.EADDRNOTAVAIL {
// Read the entry's timestamp.
if C.sd_journal_get_realtime_usec(j, &stamp) != 0 {
break
}
// Set up the time and text of the entry.
timestamp := time.Unix(int64(stamp)/1000000, (int64(stamp)%1000000)*1000)
line := append(C.GoBytes(unsafe.Pointer(msg), C.int(length)), "\n"...)
line := C.GoBytes(unsafe.Pointer(msg), C.int(length))
if partial == 0 {
line = append(line, "\n"...)
}
// Recover the stream name by mapping
// from the journal priority back to
// the stream that we would have
Expand Down
6 changes: 5 additions & 1 deletion daemon/logger/jsonfilelog/jsonfilelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
return err
}
l.mu.Lock()
logline := msg.Line
if !msg.Partial {
logline = append(msg.Line, '\n')
}
err = (&jsonlog.JSONLogs{
Log: append(msg.Line, '\n'),
Log: logline,
Stream: msg.Source,
Created: timestamp,
RawAttrs: l.extra,
Expand Down
20 changes: 19 additions & 1 deletion daemon/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,30 @@ const (
logWatcherBufferSize = 4096
)

// Message is datastructure that represents record from some container.
// Message is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
type Message struct {
ContainerID string
Line []byte
Source string
Timestamp time.Time
Partial bool
}

// CopyMessage creates a copy of the passed-in Message which will remain
// unchanged if the original is changed. Log drivers which buffer Messages
// rather than dispatching them during their Log() method should use this
// function to obtain a Message whose Line member's contents won't change.
func CopyMessage(msg *Message) *Message {
m := new(Message)
m.Line = make([]byte, len(msg.Line))
copy(m.Line, msg.Line)
m.ContainerID = msg.ContainerID
m.Source = msg.Source
m.Timestamp = msg.Timestamp
m.Partial = msg.Partial
return m
}

// Logger is the interface for docker logging drivers.
Expand Down

0 comments on commit 92ec75c

Please sign in to comment.