Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement optional ring buffer for container logs #28762

Merged
merged 2 commits into from Feb 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/types/container/host_config.go
Expand Up @@ -223,6 +223,17 @@ func (rp *RestartPolicy) IsSame(tp *RestartPolicy) bool {
return rp.Name == tp.Name && rp.MaximumRetryCount == tp.MaximumRetryCount
}

// LogMode is a type to define the available modes for logging
// These modes affect how logs are handled when log messages start piling up.
type LogMode string

// Available logging modes
const (
LogModeUnset = ""
LogModeBlocking LogMode = "blocking"
LogModeNonBlock LogMode = "non-blocking"
)

// LogConfig represents the logging configuration of the container.
type LogConfig struct {
Type string
Expand Down
21 changes: 19 additions & 2 deletions container/container.go
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/docker/docker/runconfig"
"github.com/docker/docker/volume"
"github.com/docker/go-connections/nat"
"github.com/docker/go-units"
"github.com/docker/libnetwork"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/options"
Expand Down Expand Up @@ -316,7 +317,7 @@ func (container *Container) CheckpointDir() string {
// StartLogger starts a new logger driver for the container.
func (container *Container) StartLogger() (logger.Logger, error) {
cfg := container.HostConfig.LogConfig
c, err := logger.GetLogDriver(cfg.Type)
initDriver, err := logger.GetLogDriver(cfg.Type)
if err != nil {
return nil, fmt.Errorf("failed to get logging factory: %v", err)
}
Expand All @@ -341,7 +342,23 @@ func (container *Container) StartLogger() (logger.Logger, error) {
return nil, err
}
}
return c(info)

l, err := initDriver(info)
if err != nil {
return nil, err
}

if containertypes.LogMode(cfg.Config["mode"]) == containertypes.LogModeNonBlock {
bufferSize := int64(-1)
if s, exists := cfg.Config["max-buffer-size"]; exists {
bufferSize, err = units.RAMInBytes(s)
if err != nil {
return nil, err
}
}
l = logger.NewRingLogger(l, info, bufferSize)
}
return l, nil
}

// GetProcessLabel returns the process label for the container.
Expand Down
4 changes: 2 additions & 2 deletions daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -203,8 +203,7 @@ func (l *logStream) Log(msg *logger.Message) error {
l.lock.RLock()
defer l.lock.RUnlock()
if !l.closed {
// buffer up the data, making sure to copy the Line data
l.messages <- logger.CopyMessage(msg)
l.messages <- msg
}
return nil
}
Expand Down Expand Up @@ -347,6 +346,7 @@ func (l *logStream) collectBatch() {
})
bytes += (lineBytes + perEventBytes)
}
logger.PutMessage(msg)
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions daemon/logger/copier.go
Expand Up @@ -47,7 +47,6 @@ func (c *Copier) copySrc(name string, src io.Reader) {
buf := make([]byte, bufSize)
n := 0
eof := false
msg := &Message{Source: name}

for {
select {
Expand Down Expand Up @@ -77,14 +76,16 @@ func (c *Copier) copySrc(name string, src io.Reader) {
}
// Break up the data that we've buffered up into lines, and log each in turn.
p := 0
for q := bytes.Index(buf[p:n], []byte{'\n'}); q >= 0; q = bytes.Index(buf[p:n], []byte{'\n'}) {
msg.Line = buf[p : p+q]
msg.Timestamp = time.Now().UTC()
msg.Partial = false
for q := bytes.IndexByte(buf[p:n], '\n'); q >= 0; q = bytes.IndexByte(buf[p:n], '\n') {
select {
case <-c.closed:
return
default:
msg := NewMessage()
msg.Source = name
msg.Timestamp = time.Now().UTC()
msg.Line = append(msg.Line, buf[p:p+q]...)

if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
Expand All @@ -96,9 +97,12 @@ func (c *Copier) copySrc(name string, src io.Reader) {
// noting that it's a partial log line.
if eof || (p == 0 && n == len(buf)) {
if p < n {
msg.Line = buf[p:n]
msg := NewMessage()
msg.Source = name
msg.Timestamp = time.Now().UTC()
msg.Line = append(msg.Line, buf[p:n]...)
msg.Partial = true

if logErr := c.dst.Log(msg); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", msg.Line, c.dst.Name(), logErr)
}
Expand Down
2 changes: 1 addition & 1 deletion daemon/logger/copier_test.go
Expand Up @@ -208,7 +208,7 @@ func TestCopierSlow(t *testing.T) {
type BenchmarkLoggerDummy struct {
}

func (l *BenchmarkLoggerDummy) Log(m *Message) error { return nil }
func (l *BenchmarkLoggerDummy) Log(m *Message) error { PutMessage(m); return nil }

func (l *BenchmarkLoggerDummy) Close() error { return nil }

Expand Down
4 changes: 3 additions & 1 deletion daemon/logger/etwlogs/etwlogs_windows.go
Expand Up @@ -76,7 +76,9 @@ func (etwLogger *etwLogs) Log(msg *logger.Message) error {
logrus.Error(errorMessage)
return errors.New(errorMessage)
}
return callEventWriteString(createLogMessage(etwLogger, msg))
m := createLogMessage(etwLogger, msg)
logger.PutMessage(msg)
return callEventWriteString(m)
}

// Close closes the logger by unregistering the ETW provider.
Expand Down
33 changes: 32 additions & 1 deletion daemon/logger/factory.go
Expand Up @@ -3,6 +3,10 @@ package logger
import (
"fmt"
"sync"

containertypes "github.com/docker/docker/api/types/container"
units "github.com/docker/go-units"
"github.com/pkg/errors"
)

// Creator builds a logging driver instance with given context.
Expand Down Expand Up @@ -85,20 +89,47 @@ func GetLogDriver(name string) (Creator, error) {
return factory.get(name)
}

var builtInLogOpts = map[string]bool{
"mode": true,
"max-buffer-size": true,
}

// ValidateLogOpts checks the options for the given log driver. The
// options supported are specific to the LogDriver implementation.
func ValidateLogOpts(name string, cfg map[string]string) error {
if name == "none" {
return nil
}

switch containertypes.LogMode(cfg["mode"]) {
case containertypes.LogModeBlocking, containertypes.LogModeNonBlock, containertypes.LogModeUnset:
default:
return fmt.Errorf("logger: logging mode not supported: %s", cfg["mode"])
}

if s, ok := cfg["max-buffer-size"]; ok {
if containertypes.LogMode(cfg["mode"]) != containertypes.LogModeNonBlock {
return fmt.Errorf("logger: max-buffer-size option is only supported with 'mode=%s'", containertypes.LogModeNonBlock)
}
if _, err := units.RAMInBytes(s); err != nil {
return errors.Wrap(err, "error parsing option max-buffer-size")
}
}

if !factory.driverRegistered(name) {
return fmt.Errorf("logger: no log driver named '%s' is registered", name)
}

filteredOpts := make(map[string]string, len(builtInLogOpts))
for k, v := range cfg {
if !builtInLogOpts[k] {
filteredOpts[k] = v
}
}

validator := factory.getLogOptValidator(name)
if validator != nil {
return validator(cfg)
return validator(filteredOpts)
}
return nil
}
5 changes: 4 additions & 1 deletion daemon/logger/fluentd/fluentd.go
Expand Up @@ -151,9 +151,12 @@ func (f *fluentd) Log(msg *logger.Message) error {
for k, v := range f.extra {
data[k] = v
}

ts := msg.Timestamp
logger.PutMessage(msg)
// fluent-logger-golang buffers logs from failures and disconnections,
// and these are transferred again automatically.
return f.writer.PostWithTime(f.tag, msg.Timestamp, data)
return f.writer.PostWithTime(f.tag, ts, data)
}

func (f *fluentd) Close() error {
Expand Down
8 changes: 6 additions & 2 deletions daemon/logger/gcplogs/gcplogging.go
Expand Up @@ -194,12 +194,16 @@ func ValidateLogOpts(cfg map[string]string) error {
}

func (l *gcplogs) Log(m *logger.Message) error {
data := string(m.Line)
ts := m.Timestamp
logger.PutMessage(m)

l.logger.Log(logging.Entry{
Timestamp: m.Timestamp,
Timestamp: ts,
Payload: &dockerLogEntry{
Instance: l.instance,
Container: l.container,
Data: string(m.Line),
Data: data,
},
})
return nil
Expand Down
1 change: 1 addition & 0 deletions daemon/logger/gelf/gelf.go
Expand Up @@ -133,6 +133,7 @@ func (s *gelfLogger) Log(msg *logger.Message) error {
Level: level,
RawExtra: s.rawExtra,
}
logger.PutMessage(msg)

if err := s.writer.WriteMessage(&m); err != nil {
return fmt.Errorf("gelf: cannot send GELF message: %v", err)
Expand Down
8 changes: 6 additions & 2 deletions daemon/logger/journald/journald.go
Expand Up @@ -105,10 +105,14 @@ func (s *journald) Log(msg *logger.Message) error {
if msg.Partial {
vars["CONTAINER_PARTIAL_MESSAGE"] = "true"
}

line := string(msg.Line)
logger.PutMessage(msg)

if msg.Source == "stderr" {
return journal.Send(string(msg.Line), journal.PriErr, vars)
return journal.Send(line, journal.PriErr, vars)
}
return journal.Send(string(msg.Line), journal.PriInfo, vars)
return journal.Send(line, journal.PriInfo, vars)
}

func (s *journald) Name() string {
Expand Down
1 change: 1 addition & 0 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Expand Up @@ -100,6 +100,7 @@ func (l *JSONFileLogger) Log(msg *logger.Message) error {
Created: timestamp,
RawAttrs: l.extra,
}).MarshalJSONBuf(l.buf)
logger.PutMessage(msg)
if err != nil {
l.mu.Unlock()
return err
Expand Down
4 changes: 3 additions & 1 deletion daemon/logger/logentries/logentries.go
Expand Up @@ -61,7 +61,9 @@ func (f *logentries) Log(msg *logger.Message) error {
for k, v := range f.extra {
data[k] = v
}
f.writer.Println(f.tag, msg.Timestamp, data)
ts := msg.Timestamp
logger.PutMessage(msg)
f.writer.Println(f.tag, ts, data)
return nil
}

Expand Down
39 changes: 23 additions & 16 deletions daemon/logger/logger.go
Expand Up @@ -26,9 +26,24 @@ const (
logWatcherBufferSize = 4096
)

var messagePool = &sync.Pool{New: func() interface{} { return &Message{Line: make([]byte, 0, 256)} }}

// NewMessage returns a new message from the message sync.Pool
func NewMessage() *Message {
return messagePool.Get().(*Message)
}

// PutMessage puts the specified message back n the message pool.
// The message fields are reset before putting into the pool.
func PutMessage(msg *Message) {
msg.reset()
messagePool.Put(msg)
}

// Message is datastructure that represents piece of output produced by some
// container. The Line member is a slice of an array whose contents can be
// changed after a log driver's Log() method returns.
// Any changes made to this struct must also be updated in the `reset` function
type Message struct {
Line []byte
Source string
Expand All @@ -37,22 +52,14 @@ type Message struct {
Partial bool
}

// CopyMessage creates a copy of the passed-in Message which will remain
// unchanged if the original is changed. Log drivers which buffer Messages
// rather than dispatching them during their Log() method should use this
// function to obtain a Message whose Line member's contents won't change.
func CopyMessage(msg *Message) *Message {
m := new(Message)
m.Line = make([]byte, len(msg.Line))
copy(m.Line, msg.Line)
m.Source = msg.Source
m.Timestamp = msg.Timestamp
m.Partial = msg.Partial
m.Attrs = make(LogAttributes)
for k, v := range msg.Attrs {
m.Attrs[k] = v
}
return m
// reset sets the message back to default values
// This is used when putting a message back into the message pool.
// Any changes to the `Message` struct should be reflected here.
func (m *Message) reset() {
m.Line = m.Line[:0]
m.Source = ""
m.Attrs = nil
m.Partial = false
}

// LogAttributes is used to hold the extra attributes available in the log message
Expand Down
26 changes: 0 additions & 26 deletions daemon/logger/logger_test.go

This file was deleted.