Skip to content

Commit

Permalink
Merge pull request #26088 from akirakoyasu/patch-fluent-unixsocket
Browse files Browse the repository at this point in the history
Proposal: unix-sockets support in Fluentd logging driver
  • Loading branch information
thaJeztah committed Nov 9, 2016
2 parents c025049 + cb176c8 commit 806f09b
Showing 1 changed file with 63 additions and 14 deletions.
77 changes: 63 additions & 14 deletions daemon/logger/fluentd/fluentd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"fmt"
"math"
"net"
"net/url"
"strconv"
"strings"
"time"

"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/pkg/urlutil"
"github.com/docker/go-units"
"github.com/fluent/fluent-logger-golang/fluent"
"github.com/pkg/errors"
)

type fluentd struct {
Expand All @@ -25,9 +28,17 @@ type fluentd struct {
extra map[string]string
}

type location struct {
protocol string
host string
port int
path string
}

const (
name = "fluentd"

defaultProtocol = "tcp"
defaultHost = "127.0.0.1"
defaultPort = 24224
defaultBufferLimit = 1024 * 1024
Expand Down Expand Up @@ -57,7 +68,7 @@ func init() {
// the context. The supported context configuration variable is
// fluentd-address.
func New(ctx logger.Context) (logger.Logger, error) {
host, port, err := parseAddress(ctx.Config[addressKey])
loc, err := parseAddress(ctx.Config[addressKey])
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -104,12 +115,14 @@ func New(ctx logger.Context) (logger.Logger, error) {
}

fluentConfig := fluent.Config{
FluentPort: port,
FluentHost: host,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
FluentPort: loc.port,
FluentHost: loc.host,
FluentNetwork: loc.protocol,
FluentSocketPath: loc.path,
BufferLimit: bufferLimit,
RetryWait: retryWait,
MaxRetry: maxRetries,
AsyncConnect: asyncConnect,
}

logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig).
Expand Down Expand Up @@ -169,29 +182,65 @@ func ValidateLogOpt(cfg map[string]string) error {
}
}

if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil {
if _, err := parseAddress(cfg["fluentd-address"]); err != nil {
return err
}

return nil
}

func parseAddress(address string) (string, int, error) {
func parseAddress(address string) (*location, error) {
if address == "" {
return defaultHost, defaultPort, nil
return &location{
protocol: defaultProtocol,
host: defaultHost,
port: defaultPort,
path: "",
}, nil
}

protocol := defaultProtocol
givenAddress := address
if urlutil.IsTransportURL(address) {
url, err := url.Parse(address)
if err != nil {
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
// unix and unixgram socket
if url.Scheme == "unix" || url.Scheme == "unixgram" {
return &location{
protocol: url.Scheme,
host: "",
port: 0,
path: url.Path,
}, nil
}
// tcp|udp
protocol = url.Scheme
address = url.Host
}

host, port, err := net.SplitHostPort(address)
if err != nil {
if !strings.Contains(err.Error(), "missing port in address") {
return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
return host, defaultPort, nil
return &location{
protocol: protocol,
host: host,
port: defaultPort,
path: "",
}, nil
}

portnum, err := strconv.Atoi(port)
if err != nil {
return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err)
return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress)
}
return host, portnum, nil
return &location{
protocol: protocol,
host: host,
port: portnum,
path: "",
}, nil
}

0 comments on commit 806f09b

Please sign in to comment.