diff --git a/.circleci/config.yml b/.circleci/config.yml index c1480f5..722fdf9 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -2,7 +2,7 @@ version: 2 jobs: build: docker: - - image: segment/golang:1.9 + - image: segment/golang:latest working_directory: /go/src/github.com/segmentio/ecs-logs steps: - checkout diff --git a/Dockerfile b/Dockerfile index 7f2ff73..6a0a52f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # We need a go compiler that's based on an image with libsystemd-dev installed, # segment/golang give us just that. -FROM segment/golang:1.9 +FROM segment/golang:latest # Copy the ecs-logs sources so they can be built within the container. COPY . /go/src/github.com/segmentio/ecs-logs diff --git a/lib/syslog/writer.go b/lib/syslog/writer.go index f2a4566..91cce9b 100644 --- a/lib/syslog/writer.go +++ b/lib/syslog/writer.go @@ -11,7 +11,6 @@ import ( "os" "strconv" "strings" - "sync" "text/template" "time" @@ -87,7 +86,7 @@ func DialWriter(config WriterConfig) (w lib.Writer, err error) { connect: for _, n := range netopts { for _, a := range addropts { - if backend, err = cachedDialWriter(n, a, config.TLS, config.SocksProxy); err == nil { + if backend, err = dialWriter(n, a, config.TLS, config.SocksProxy); err == nil { break connect } } @@ -99,7 +98,6 @@ connect: w = newWriter(writerConfig{ backend: backend, - config: config, template: config.Template, timeFormat: config.TimeFormat, tag: config.Tag, @@ -109,7 +107,6 @@ connect: type writerConfig struct { backend io.Writer - config WriterConfig template string timeFormat string tag string @@ -153,36 +150,20 @@ func newWriterTemplate(format string) *template.Template { type writer struct { writerConfig - sync.Mutex buf bytes.Buffer tpl *template.Template out func(*writer, message) error flush func() error } -// For this backend, Close() will not actually close the socket. This is a -// temporary hack to allow the connection to remain open for reuse between -// multiple caller write() operations. It's not ever necessary to close the -// socket during the normal operation of ecs-logs since we don't allow runtime -// reconfiguration of destinations yet. This is not ideal, but a more elegant -// fix would involve updating the API and every destination and is significantly -// more work than we can afford right now. --MSF 2017-10-02 func (w *writer) Close() (err error) { - return -} - -// This is the real close function. -func (w *writer) reallyClose() (err error) { if c, ok := w.backend.(io.Closer); ok { err = c.Close() - writerCache.Delete(writerCacheKey(w.config.Network, w.config.Address, w.config.SocksProxy)) } return } func (w *writer) WriteMessageBatch(batch lib.MessageBatch) (err error) { - w.Lock() - defer w.Unlock() for _, msg := range batch { if err = w.write(msg); err != nil { return @@ -192,8 +173,6 @@ func (w *writer) WriteMessageBatch(batch lib.MessageBatch) (err error) { } func (w *writer) WriteMessage(msg lib.Message) (err error) { - w.Lock() - defer w.Unlock() if err = w.write(msg); err == nil { err = w.flush() } @@ -226,11 +205,7 @@ func (w *writer) write(msg lib.Message) (err error) { } m.MSG = msg.Event.String() - err = w.out(w, m) - if err != nil { - w.reallyClose() - } - return + return w.out(w, m) } func (w *writer) directWrite(m message) (err error) { @@ -269,34 +244,6 @@ func (c bufferedConn) Close() error { return c.conn.Close() } func (c bufferedConn) Flush() error { return c.buf.Flush() } func (c bufferedConn) Write(b []byte) (int, error) { return c.buf.Write(b) } -var dialMutex = sync.Map{} -var writerCache = sync.Map{} - -func writerCacheKey(network string, address string, socksProxy string) string { - // We don't currently include the TLS configuration in the cache key. For - // now, this is probably OK as the TLS configuration is unlikely to change. - return network + ";" + address + ";" + socksProxy -} - -func cachedDialWriter(network string, address string, config *tls.Config, socksProxy string) (w io.Writer, err error) { - key := writerCacheKey(network, address, socksProxy) - - res, _ := dialMutex.LoadOrStore(key, &sync.Mutex{}) - mu := res.(*sync.Mutex) - mu.Lock() - defer mu.Unlock() - - // Fast path: return if already in the cache - if w, ok := writerCache.Load(key); ok { - return w.(io.Writer), nil - } - w, err = dialWriter(network, address, config, socksProxy) - if err != nil { - writerCache.Store(key, w) - } - return w, err -} - func dialWriter(network string, address string, config *tls.Config, socksProxy string) (w io.Writer, err error) { var conn, rawConn net.Conn var dial func(string, string) (net.Conn, error)