Skip to content

Commit

Permalink
fix(inputs.socket_listener): fix tracking of unix sockets
Browse files Browse the repository at this point in the history
This fixes an issue where the code would lose track of unix sockets. The remote end of a unix socket does not have a unique address representation, thus multiple entries may overwrite each other in the map. This changes the map to key off the net.Conn object itself, basically using the map as a set.

Fixes influxdata#13058
  • Loading branch information
phemmer committed Apr 7, 2023
1 parent 35edd18 commit 218d0f3
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
12 changes: 12 additions & 0 deletions plugins/inputs/socket_listener/socket_listener_test.go
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net"
"os"
"path/filepath"
Expand All @@ -15,6 +16,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/influxdata/telegraf"
Expand Down Expand Up @@ -181,6 +183,16 @@ func TestSocketListener(t *testing.T) {
}, time.Second, 100*time.Millisecond, "did not receive metrics (%d)", acc.NMetrics())
actual := acc.GetTelegrafMetrics()
testutil.RequireMetricsEqual(t, expected, actual, testutil.SortMetrics())

plugin.Stop()

if _, ok := plugin.listener.(*streamListener); ok {
// Verify that plugin.Stop() closed the client's connection
_ = client.SetReadDeadline(time.Now().Add(time.Second))
buf := []byte{1}
_, err = client.Read(buf)
assert.Equal(t, err, io.EOF)
}
})
}
}
Expand Down
10 changes: 5 additions & 5 deletions plugins/inputs/socket_listener/stream_listener.go
Expand Up @@ -32,7 +32,7 @@ type streamListener struct {
Log telegraf.Logger

listener net.Listener
connections map[string]net.Conn
connections map[net.Conn]struct{}
path string

wg sync.WaitGroup
Expand Down Expand Up @@ -123,7 +123,7 @@ func (l *streamListener) setupConnection(conn net.Conn) error {

// Store the connection mapped to its address
l.Lock()
l.connections[addr] = conn
l.connections[conn] = struct{}{}
l.Unlock()

return nil
Expand All @@ -134,7 +134,7 @@ func (l *streamListener) closeConnection(conn net.Conn) {
if err := conn.Close(); err != nil {
l.Log.Errorf("Cannot close connection to %q: %v", addr, err)
}
delete(l.connections, addr)
delete(l.connections, conn)
}

func (l *streamListener) addr() net.Addr {
Expand All @@ -147,7 +147,7 @@ func (l *streamListener) close() error {
}

l.Lock()
for _, conn := range l.connections {
for conn := range l.connections {
l.closeConnection(conn)
}
l.Unlock()
Expand All @@ -164,7 +164,7 @@ func (l *streamListener) close() error {
}

func (l *streamListener) listen(acc telegraf.Accumulator) {
l.connections = make(map[string]net.Conn)
l.connections = make(map[net.Conn]struct{})

l.wg.Add(1)
defer l.wg.Done()
Expand Down

0 comments on commit 218d0f3

Please sign in to comment.