Skip to content
This repository has been archived by the owner on Oct 5, 2023. It is now read-only.

Commit

Permalink
Revert "syslog: Cache connections"
Browse files Browse the repository at this point in the history
This reverts commit 95faba1.
  • Loading branch information
Michael S. Fischer committed Oct 12, 2017
1 parent c0fea25 commit 99583ae
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 57 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -2,7 +2,7 @@ version: 2
jobs:
build:
docker:
- image: segment/golang:1.9
- image: segment/golang:latest
working_directory: /go/src/github.com/segmentio/ecs-logs
steps:
- checkout
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
@@ -1,6 +1,6 @@
# We need a go compiler that's based on an image with libsystemd-dev installed,
# segment/golang give us just that.
FROM segment/golang:1.9
FROM segment/golang:latest

# Copy the ecs-logs sources so they can be built within the container.
COPY . /go/src/github.com/segmentio/ecs-logs
Expand Down
57 changes: 2 additions & 55 deletions lib/syslog/writer.go
Expand Up @@ -11,7 +11,6 @@ import (
"os"
"strconv"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -87,7 +86,7 @@ func DialWriter(config WriterConfig) (w lib.Writer, err error) {
connect:
for _, n := range netopts {
for _, a := range addropts {
if backend, err = cachedDialWriter(n, a, config.TLS, config.SocksProxy); err == nil {
if backend, err = dialWriter(n, a, config.TLS, config.SocksProxy); err == nil {
break connect
}
}
Expand All @@ -99,7 +98,6 @@ connect:

w = newWriter(writerConfig{
backend: backend,
config: config,
template: config.Template,
timeFormat: config.TimeFormat,
tag: config.Tag,
Expand All @@ -109,7 +107,6 @@ connect:

type writerConfig struct {
backend io.Writer
config WriterConfig
template string
timeFormat string
tag string
Expand Down Expand Up @@ -153,36 +150,20 @@ func newWriterTemplate(format string) *template.Template {

type writer struct {
writerConfig
sync.Mutex
buf bytes.Buffer
tpl *template.Template
out func(*writer, message) error
flush func() error
}

// For this backend, Close() will not actually close the socket. This is a
// temporary hack to allow the connection to remain open for reuse between
// multiple caller write() operations. It's not ever necessary to close the
// socket during the normal operation of ecs-logs since we don't allow runtime
// reconfiguration of destinations yet. This is not ideal, but a more elegant
// fix would involve updating the API and every destination and is significantly
// more work than we can afford right now. --MSF 2017-10-02
func (w *writer) Close() (err error) {
return
}

// This is the real close function.
func (w *writer) reallyClose() (err error) {
if c, ok := w.backend.(io.Closer); ok {
err = c.Close()
writerCache.Delete(writerCacheKey(w.config.Network, w.config.Address, w.config.SocksProxy))
}
return
}

func (w *writer) WriteMessageBatch(batch lib.MessageBatch) (err error) {
w.Lock()
defer w.Unlock()
for _, msg := range batch {
if err = w.write(msg); err != nil {
return
Expand All @@ -192,8 +173,6 @@ func (w *writer) WriteMessageBatch(batch lib.MessageBatch) (err error) {
}

func (w *writer) WriteMessage(msg lib.Message) (err error) {
w.Lock()
defer w.Unlock()
if err = w.write(msg); err == nil {
err = w.flush()
}
Expand Down Expand Up @@ -226,11 +205,7 @@ func (w *writer) write(msg lib.Message) (err error) {
}

m.MSG = msg.Event.String()
err = w.out(w, m)
if err != nil {
w.reallyClose()
}
return
return w.out(w, m)
}

func (w *writer) directWrite(m message) (err error) {
Expand Down Expand Up @@ -269,34 +244,6 @@ func (c bufferedConn) Close() error { return c.conn.Close() }
func (c bufferedConn) Flush() error { return c.buf.Flush() }
func (c bufferedConn) Write(b []byte) (int, error) { return c.buf.Write(b) }

var dialMutex = sync.Map{}
var writerCache = sync.Map{}

func writerCacheKey(network string, address string, socksProxy string) string {
// We don't currently include the TLS configuration in the cache key. For
// now, this is probably OK as the TLS configuration is unlikely to change.
return network + ";" + address + ";" + socksProxy
}

func cachedDialWriter(network string, address string, config *tls.Config, socksProxy string) (w io.Writer, err error) {
key := writerCacheKey(network, address, socksProxy)

res, _ := dialMutex.LoadOrStore(key, &sync.Mutex{})
mu := res.(*sync.Mutex)
mu.Lock()
defer mu.Unlock()

// Fast path: return if already in the cache
if w, ok := writerCache.Load(key); ok {
return w.(io.Writer), nil
}
w, err = dialWriter(network, address, config, socksProxy)
if err != nil {
writerCache.Store(key, w)
}
return w, err
}

func dialWriter(network string, address string, config *tls.Config, socksProxy string) (w io.Writer, err error) {
var conn, rawConn net.Conn
var dial func(string, string) (net.Conn, error)
Expand Down

0 comments on commit 99583ae

Please sign in to comment.