Skip to content

Commit

Permalink
fix(inputs.socket_listener): fix noisy logs on closed connection
Browse files Browse the repository at this point in the history
This suppresses several different logs that may be emitted when the connection is closed (locally or by remote) at various points.

Fixes influxdata#13043
  • Loading branch information
phemmer committed Apr 7, 2023
1 parent 35edd18 commit 160fd34
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 4 deletions.
88 changes: 86 additions & 2 deletions plugins/inputs/socket_listener/socket_listener_test.go
Expand Up @@ -5,16 +5,19 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"sync"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
Expand All @@ -29,6 +32,61 @@ import (

var pki = testutil.NewPKI("../../../testutil/pki")

type TestLogger struct {
T *testing.T
Logs []string
mtx sync.Mutex
}

func (t *TestLogger) logf(format string, args ...any) {
msg := fmt.Sprintf(format, args...)
t.mtx.Lock()
t.Logs = append(t.Logs, msg)
t.mtx.Unlock()
t.T.Logf(format, args...)
}

func (t *TestLogger) log(prefix string, args ...any) {
args = append([]any{prefix}, args...)
msg := fmt.Sprint(args...)
t.mtx.Lock()
t.Logs = append(t.Logs, msg)
t.mtx.Unlock()
t.T.Log(args...)
}

func (t *TestLogger) Errorf(format string, args ...interface{}) {
t.logf("E! "+format, args...)
}

func (t *TestLogger) Error(args ...interface{}) {
t.log("E!", args...)
}

func (t *TestLogger) Debugf(format string, args ...interface{}) {
t.logf("D! "+format, args...)
}

func (t *TestLogger) Debug(args ...interface{}) {
t.log("D!", args...)
}

func (t *TestLogger) Warnf(format string, args ...interface{}) {
t.logf(format, args...)
}

func (t *TestLogger) Warn(args ...interface{}) {
t.log("W!", args...)
}

func (t *TestLogger) Infof(format string, args ...interface{}) {
t.logf("I! "+format, args...)
}

func (t *TestLogger) Info(args ...interface{}) {
t.log("I!", args...)
}

func TestSocketListener(t *testing.T) {
messages := [][]byte{
[]byte("test,foo=bar v=1i 123456789\ntest,foo=baz v=2i 123456790\n"),
Expand Down Expand Up @@ -135,8 +193,9 @@ func TestSocketListener(t *testing.T) {
}

// Setup plugin according to test specification
log := &TestLogger{T: t}
plugin := &SocketListener{
Log: &testutil.Logger{},
Log: log,
ServiceAddress: proto + "://" + serverAddr,
ContentEncoding: tt.encoding,
ReadBufferSize: tt.buffersize,
Expand All @@ -157,9 +216,16 @@ func TestSocketListener(t *testing.T) {
require.NoError(t, plugin.Start(&acc))
defer plugin.Stop()

// Setup the client for submitting data
addr := plugin.listener.addr()

// Create a noop client
// Server is async, so verify no errors at the end.
client, err := createClient(plugin.ServiceAddress, addr, tlsCfg)
assert.NoError(t, err)
_ = client.Close()

// Setup the client for submitting data
client, err = createClient(plugin.ServiceAddress, addr, tlsCfg)
require.NoError(t, err)

// Send the data with the correct encoding
Expand All @@ -181,6 +247,24 @@ func TestSocketListener(t *testing.T) {
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())

plugin.Stop()

if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
assert.Equal(t, err, io.EOF)
}

var msgs []string
for _, msg := range log.Logs {
if msg[:2] == "E!" || msg[:2] == "W!" {
msgs = append(msgs, msg)
}
}
assert.Empty(t, msgs)
})
}
}
Expand Down
8 changes: 6 additions & 2 deletions plugins/inputs/socket_listener/stream_listener.go
Expand Up @@ -5,11 +5,13 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/url"
"os"
"strconv"
"sync"
"syscall"
"time"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -131,7 +133,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {

func (l *streamListener) closeConnection(conn net.Conn) {
addr := conn.RemoteAddr().String()
if err := conn.Close(); err != nil {
if err := conn.Close(); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, syscall.EPIPE) {
l.Log.Errorf("Cannot close connection to %q: %v", addr, err)
}
delete(l.connections, addr)
Expand Down Expand Up @@ -187,7 +189,9 @@ func (l *streamListener) listen(acc telegraf.Accumulator) {
go func(c net.Conn) {
defer wg.Done()
if err := l.read(acc, c); err != nil {
acc.AddError(err)
if !errors.Is(err, io.EOF) && !errors.Is(err, syscall.ECONNRESET) {
acc.AddError(err)
}
}
l.Lock()
l.closeConnection(conn)
Expand Down

0 comments on commit 160fd34

Please sign in to comment.