From cb176c848e0731f77fa48b4e1a90ae74d1f2deae Mon Sep 17 00:00:00 2001 From: Akira Koyasu Date: Sun, 28 Aug 2016 23:24:56 +0900 Subject: [PATCH] add scheme to fluentd-address Signed-off-by: Akira Koyasu - add scheme to fluentd-address - define a new type `location` - use `errors.Wrapf` --- daemon/logger/fluentd/fluentd.go | 77 ++++++++++++++++++++++++++------ 1 file changed, 63 insertions(+), 14 deletions(-) diff --git a/daemon/logger/fluentd/fluentd.go b/daemon/logger/fluentd/fluentd.go index 31315844d93f4..418d1ddd4b423 100644 --- a/daemon/logger/fluentd/fluentd.go +++ b/daemon/logger/fluentd/fluentd.go @@ -6,6 +6,7 @@ import ( "fmt" "math" "net" + "net/url" "strconv" "strings" "time" @@ -13,8 +14,10 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/daemon/logger/loggerutils" + "github.com/docker/docker/pkg/urlutil" "github.com/docker/go-units" "github.com/fluent/fluent-logger-golang/fluent" + "github.com/pkg/errors" ) type fluentd struct { @@ -25,9 +28,17 @@ type fluentd struct { extra map[string]string } +type location struct { + protocol string + host string + port int + path string +} + const ( name = "fluentd" + defaultProtocol = "tcp" defaultHost = "127.0.0.1" defaultPort = 24224 defaultBufferLimit = 1024 * 1024 @@ -60,7 +71,7 @@ func init() { // the context. The supported context configuration variable is // fluentd-address. func New(ctx logger.Context) (logger.Logger, error) { - host, port, err := parseAddress(ctx.Config[addressKey]) + loc, err := parseAddress(ctx.Config[addressKey]) if err != nil { return nil, err } @@ -107,12 +118,14 @@ func New(ctx logger.Context) (logger.Logger, error) { } fluentConfig := fluent.Config{ - FluentPort: port, - FluentHost: host, - BufferLimit: bufferLimit, - RetryWait: retryWait, - MaxRetry: maxRetries, - AsyncConnect: asyncConnect, + FluentPort: loc.port, + FluentHost: loc.host, + FluentNetwork: loc.protocol, + FluentSocketPath: loc.path, + BufferLimit: bufferLimit, + RetryWait: retryWait, + MaxRetry: maxRetries, + AsyncConnect: asyncConnect, } logrus.WithField("container", ctx.ContainerID).WithField("config", fluentConfig). @@ -172,29 +185,65 @@ func ValidateLogOpt(cfg map[string]string) error { } } - if _, _, err := parseAddress(cfg["fluentd-address"]); err != nil { + if _, err := parseAddress(cfg["fluentd-address"]); err != nil { return err } return nil } -func parseAddress(address string) (string, int, error) { +func parseAddress(address string) (*location, error) { if address == "" { - return defaultHost, defaultPort, nil + return &location{ + protocol: defaultProtocol, + host: defaultHost, + port: defaultPort, + path: "", + }, nil + } + + protocol := defaultProtocol + givenAddress := address + if urlutil.IsTransportURL(address) { + url, err := url.Parse(address) + if err != nil { + return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) + } + // unix and unixgram socket + if url.Scheme == "unix" || url.Scheme == "unixgram" { + return &location{ + protocol: url.Scheme, + host: "", + port: 0, + path: url.Path, + }, nil + } + // tcp|udp + protocol = url.Scheme + address = url.Host } host, port, err := net.SplitHostPort(address) if err != nil { if !strings.Contains(err.Error(), "missing port in address") { - return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err) + return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) } - return host, defaultPort, nil + return &location{ + protocol: protocol, + host: host, + port: defaultPort, + path: "", + }, nil } portnum, err := strconv.Atoi(port) if err != nil { - return "", 0, fmt.Errorf("invalid fluentd-address %s: %s", address, err) + return nil, errors.Wrapf(err, "invalid fluentd-address %s", givenAddress) } - return host, portnum, nil + return &location{ + protocol: protocol, + host: host, + port: portnum, + path: "", + }, nil }