Skip to content

Commit

Permalink
[WIP, RFC] new logger: rawfifo
Browse files Browse the repository at this point in the history
Signed-off-by: Akihiro Suda <suda.akihiro@lab.ntt.co.jp>
  • Loading branch information
AkihiroSuda committed Jun 2, 2016
1 parent 287b0a6 commit 55ec631
Show file tree
Hide file tree
Showing 17 changed files with 372 additions and 83 deletions.
4 changes: 2 additions & 2 deletions container/container.go
Expand Up @@ -77,8 +77,8 @@ type CommonContainer struct {
HostConfig *containertypes.HostConfig `json:"-"` // do not serialize the host config in the json, otherwise we'll make the container unportable
ExecCommands *exec.Store `json:"-"`
// logDriver for closing
LogDriver logger.Logger `json:"-"`
LogCopier *logger.Copier `json:"-"`
LogDriver logger.Logger `json:"-"`
LogCopier logger.Copier `json:"-"`
restartManager restartmanager.RestartManager
attachContext *attachContext
}
Expand Down
1 change: 1 addition & 0 deletions contrib/completion/bash/docker
Expand Up @@ -402,6 +402,7 @@ __docker_complete_log_drivers() {
journald
json-file
none
rawfifo
splunk
syslog
" -- "$cur" ) )
Expand Down
4 changes: 2 additions & 2 deletions contrib/completion/zsh/_docker
Expand Up @@ -739,7 +739,7 @@ __docker_subcommand() {
"($help)--ipc=[IPC namespace to use]:IPC namespace: "
"($help)*--link=[Add link to another container]:link:->link"
"($help)*"{-l=,--label=}"[Container metadata]:label: "
"($help)--log-driver=[Default driver for container logs]:Logging driver:(awslogs etwlogs fluentd gcplogs gelf journald json-file none splunk syslog)"
"($help)--log-driver=[Default driver for container logs]:Logging driver:(awslogs etwlogs fluentd gcplogs gelf journald json-file none rawfifo splunk syslog)"
"($help)*--log-opt=[Log driver specific options]:log driver options:__docker_log_options"
"($help)--mac-address=[Container MAC address]:MAC address: "
"($help)--name=[Container name]:name: "
Expand Down Expand Up @@ -882,7 +882,7 @@ __docker_subcommand() {
"($help)--ipv6[Enable IPv6 networking]" \
"($help -l --log-level)"{-l=,--log-level=}"[Logging level]:level:(debug info warn error fatal)" \
"($help)*--label=[Key=value labels]:label: " \
"($help)--log-driver=[Default driver for container logs]:Logging driver:(awslogs etwlogs fluentd gcplogs gelf journald json-file none splunk syslog)" \
"($help)--log-driver=[Default driver for container logs]:Logging driver:(awslogs etwlogs fluentd gcplogs gelf journald json-file none rawfifo splunk syslog)" \
"($help)*--log-opt=[Log driver specific options]:log driver options:__docker_log_options" \
"($help)--max-concurrent-downloads[Set the max concurrent downloads for each pull]" \
"($help)--max-concurrent-uploads[Set the max concurrent uploads for each push]" \
Expand Down
1 change: 1 addition & 0 deletions daemon/logdrivers_linux.go
Expand Up @@ -9,6 +9,7 @@ import (
_ "github.com/docker/docker/daemon/logger/gelf"
_ "github.com/docker/docker/daemon/logger/journald"
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
_ "github.com/docker/docker/daemon/logger/rawfifo"
_ "github.com/docker/docker/daemon/logger/splunk"
_ "github.com/docker/docker/daemon/logger/syslog"
)
84 changes: 12 additions & 72 deletions daemon/logger/copier.go
@@ -1,83 +1,23 @@
package logger

import (
"bufio"
"bytes"
"fmt"
"io"
"sync"
"time"

"github.com/Sirupsen/logrus"
)

// Copier can copy logs from specified sources to Logger and attach
// ContainerID and Timestamp.
// Writes are concurrent, so you need implement some sync in your logger
type Copier struct {
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader
dst Logger
copyJobs sync.WaitGroup
closed chan struct{}
// Copier can copy logs from specified sources to Logger
type Copier interface {
Run()
Wait()
Close()
}

// NewCopier creates a new Copier
func NewCopier(srcs map[string]io.Reader, dst Logger) *Copier {
return &Copier{
srcs: srcs,
dst: dst,
closed: make(chan struct{}),
}
}

// Run starts logs copying
func (c *Copier) Run() {
for src, w := range c.srcs {
c.copyJobs.Add(1)
go c.copySrc(src, w)
}
}

func (c *Copier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
reader := bufio.NewReader(src)

for {
select {
case <-c.closed:
return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})

// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}

if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
return
}
}
}
}

// Wait waits until all copying is done
func (c *Copier) Wait() {
c.copyJobs.Wait()
}

// Close closes the copier
func (c *Copier) Close() {
select {
case <-c.closed:
default:
close(c.closed)
func NewCopier(srcs map[string]io.Reader, dst Logger) (Copier, error) {
if ml, ok := dst.(MessageLogger); ok {
return NewMessageCopier(srcs, ml), nil
} else if rl, ok := dst.(RawLogger); ok {
return NewRawCopier(srcs, rl), nil
}
return nil, fmt.Errorf("strange logger %s", dst.Name())
}
10 changes: 8 additions & 2 deletions daemon/logger/copier_test.go
Expand Up @@ -46,12 +46,15 @@ func TestCopier(t *testing.T) {

jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf)}

c := NewCopier(
c, err := NewCopier(
map[string]io.Reader{
"stdout": &stdout,
"stderr": &stderr,
},
jsonLog)
if err != nil {
t.Fatal(err)
}
c.Run()
wait := make(chan struct{})
go func() {
Expand Down Expand Up @@ -101,7 +104,10 @@ func TestCopierSlow(t *testing.T) {
//encoder := &encodeCloser{Encoder: json.NewEncoder(&jsonBuf)}
jsonLog := &TestLoggerJSON{Encoder: json.NewEncoder(&jsonBuf), delay: 100 * time.Millisecond}

c := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
c, err := NewCopier(map[string]io.Reader{"stdout": &stdout}, jsonLog)
if err != nil {
t.Fatal(err)
}
c.Run()
wait := make(chan struct{})
go func() {
Expand Down
14 changes: 13 additions & 1 deletion daemon/logger/logger.go
Expand Up @@ -9,6 +9,7 @@ package logger

import (
"errors"
"io"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -59,11 +60,22 @@ func (a LogAttributes) String() string {

// Logger is the interface for docker logging drivers.
type Logger interface {
Log(*Message) error
Name() string
Close() error
}

// MessageLogger is the interface for standard logging drivers.
type MessageLogger interface {
Logger
Log(*Message) error
}

// RawLogger is the interface for raw logging drivers.
type RawLogger interface {
Logger
RawWriter(string) (io.WriteCloser, error)
}

// ReadConfig is the configuration passed into ReadLogs.
type ReadConfig struct {
Since time.Time
Expand Down
82 changes: 82 additions & 0 deletions daemon/logger/messagecopier.go
@@ -0,0 +1,82 @@
package logger

import (
"bufio"
"bytes"
"io"
"sync"
"time"

"github.com/Sirupsen/logrus"
)

// MessageCopier can copy log messagess from specified sources to Logger and attach Timestamp.
// Writes are concurrent, so you need implement some sync in your logger
type MessageCopier struct {
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader
dst MessageLogger
copyJobs sync.WaitGroup
closed chan struct{}
}

// NewMessageCopier creates a new MessageCopier
func NewMessageCopier(srcs map[string]io.Reader, dst MessageLogger) *MessageCopier {
return &MessageCopier{
srcs: srcs,
dst: dst,
closed: make(chan struct{}),
}
}

// Run starts logs copying
func (c *MessageCopier) Run() {
for src, w := range c.srcs {
c.copyJobs.Add(1)
go c.copySrc(src, w)
}
}

func (c *MessageCopier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
reader := bufio.NewReader(src)

for {
select {
case <-c.closed:
return
default:
line, err := reader.ReadBytes('\n')
line = bytes.TrimSuffix(line, []byte{'\n'})

// ReadBytes can return full or partial output even when it failed.
// e.g. it can return a full entry and EOF.
if err == nil || len(line) > 0 {
if logErr := c.dst.Log(&Message{Line: line, Source: name, Timestamp: time.Now().UTC()}); logErr != nil {
logrus.Errorf("Failed to log msg %q for logger %s: %s", line, c.dst.Name(), logErr)
}
}

if err != nil {
if err != io.EOF {
logrus.Errorf("Error scanning log stream: %s", err)
}
return
}
}
}
}

// Wait waits until all copying is done
func (c *MessageCopier) Wait() {
c.copyJobs.Wait()
}

// Close closes the copier
func (c *MessageCopier) Close() {
select {
case <-c.closed:
default:
close(c.closed)
}
}
73 changes: 73 additions & 0 deletions daemon/logger/rawcopier.go
@@ -0,0 +1,73 @@
package logger

import (
"io"
"sync"

"github.com/Sirupsen/logrus"
)

// RawCopier can copy logs from specified sources
type RawCopier struct {
// srcs is map of name -> reader pairs, for example "stdout", "stderr"
srcs map[string]io.Reader
dst RawLogger
copyJobs sync.WaitGroup
closed chan struct{}
}

// NewRawCopier creates a new RawCopier
func NewRawCopier(srcs map[string]io.Reader, dst RawLogger) *RawCopier {
return &RawCopier{
srcs: srcs,
dst: dst,
closed: make(chan struct{}),
}
}

// Run starts logs copying
func (c *RawCopier) Run() {
for src, w := range c.srcs {
c.copyJobs.Add(1)
go c.copySrc(src, w)
}
}

func (c *RawCopier) copySrc(name string, src io.Reader) {
defer c.copyJobs.Done()
w, err := c.dst.RawWriter(name)
if err != nil {
logrus.Errorf("error while opening RawWriter for %s: %v", name, err)
return
}
for {
select {
case <-c.closed:
if err = w.Close(); err != nil {
logrus.Errorf("error while closing RawWriter for %s: %v", name, err)
}
return
default:
// use io.CopyN rather than io.Copy, so that we can catch <-c.closed
bufsz := int64(64 * 1024)
if _, err := io.CopyN(w, src, bufsz); err != nil {
logrus.Errorf("stream copy error: %s: %v", name, err)
return
}
}
}
}

// Wait waits until all copying is done
func (c *RawCopier) Wait() {
c.copyJobs.Wait()
}

// Close closes the copier
func (c *RawCopier) Close() {
select {
case <-c.closed:
default:
close(c.closed)
}
}

0 comments on commit 55ec631

Please sign in to comment.