Skip to content

Commit

Permalink
feat: socketstat input plugin (influxdata#3649)
Browse files Browse the repository at this point in the history
  • Loading branch information
sajoupa authored and powersj committed Jan 21, 2022
1 parent 5073adf commit d9780f8
Show file tree
Hide file tree
Showing 9 changed files with 421 additions and 0 deletions.
1 change: 1 addition & 0 deletions plugins/inputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_legacy"
_ "github.com/influxdata/telegraf/plugins/inputs/snmp_trap"
_ "github.com/influxdata/telegraf/plugins/inputs/socket_listener"
_ "github.com/influxdata/telegraf/plugins/inputs/socketstat"
_ "github.com/influxdata/telegraf/plugins/inputs/solr"
_ "github.com/influxdata/telegraf/plugins/inputs/sql"
_ "github.com/influxdata/telegraf/plugins/inputs/sqlserver"
Expand Down
55 changes: 55 additions & 0 deletions plugins/inputs/socketstat/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# SocketStat plugin

The socketstat plugin gathers indicators from established connections, using iproute2's `ss` command.

The `ss` command does not require specific privileges.

**WARNING: The output format will produce series with very high cardinality.** You should either store those by an engine which doesn't suffer from it, use a short retention policy or do appropriate filtering.

## Configuration

```toml
[[inputs.socketstat]]
## ss can display information about tcp, udp, raw, unix, packet, dccp and sctp sockets
## Specify here the types you want to gather
socket_types = [ "tcp", "udp" ]
## The default timeout of 1s for ss execution can be overridden here:
# timeout = "1s"
```

## Measurements & Fields

- socketstat
- state (string) (for tcp, dccp and sctp protocols)
- If ss provides it (it depends on the protocol and ss version):
- bytes_acked (integer, bytes)
- bytes_received (integer, bytes)
- segs_out (integer, count)
- segs_in (integer, count)
- data_segs_out (integer, count)
- data_segs_in (integer, count)

## Tags

- All measurements have the following tags:
- proto
- local_addr
- local_port
- remote_addr
- remote_port

## Example Output

### recent ss version (iproute2 4.3.0 here)

```sh
./telegraf --config telegraf.conf --input-filter socketstat --test
> socketstat,host=ubuntu-xenial,local_addr=10.6.231.226,local_port=42716,proto=tcp,remote_addr=192.168.2.21,remote_port=80 bytes_acked=184i,bytes_received=2624519595i,recv_q=4344i,segs_in=1812580i,segs_out=661642i,send_q=0i,state="ESTAB" 1606457205000000000
```

### older ss version (iproute2 3.12.0 here)

```sh
./telegraf --config telegraf.conf --input-filter socketstat --test
> socketstat,host=ubuntu-trusty,local_addr=10.6.231.163,local_port=35890,proto=tcp,remote_addr=192.168.2.21,remote_port=80 recv_q=0i,send_q=0i,state="ESTAB" 1606456977000000000
```
222 changes: 222 additions & 0 deletions plugins/inputs/socketstat/socketstat.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
//go:build !windows
// +build !windows

// iproute2 doesn't exist on Windows

package socketstat

import (
"bufio"
"bytes"
"fmt"
"os/exec"
"regexp"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
)

const measurement = "socketstat"

// Socketstat is a telegraf plugin to gather indicators from established connections, using iproute2's `ss` command.
type Socketstat struct {
SocketProto []string `toml:"protocols"`
Timeout config.Duration `toml:"timeout"`
Log telegraf.Logger `toml:"-"`

isNewConnection *regexp.Regexp
validValues *regexp.Regexp
cmdName string
lister socketLister
}

type socketLister func(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error)

// Description returns a short description of the plugin
func (ss *Socketstat) Description() string {
return "Gather indicators from established connections, using iproute2's `ss` command."
}

// SampleConfig returns sample configuration options
func (ss *Socketstat) SampleConfig() string {
return `
## ss can display information about tcp, udp, raw, unix, packet, dccp and sctp sockets
## List of protocol types to collect
# protocols = [ "tcp", "udp" ]
## The default timeout of 1s for ss execution can be overridden here:
# timeout = "1s"
`
}

// Gather gathers indicators from established connections
func (ss *Socketstat) Gather(acc telegraf.Accumulator) error {
// best effort : we continue through the protocols even if an error is encountered,
// but we keep track of the last error.
for _, proto := range ss.SocketProto {
out, err := ss.lister(ss.cmdName, proto, ss.Timeout)
if err != nil {
acc.AddError(err)
continue
}
ss.parseAndGather(acc, out, proto)
}
return nil
}

func socketList(cmdName string, proto string, timeout config.Duration) (*bytes.Buffer, error) {
// Run ss for the given protocol, return the output as bytes.Buffer
args := []string{"-in", "--" + proto}
cmd := exec.Command(cmdName, args...)
var out bytes.Buffer
cmd.Stdout = &out
err := internal.RunTimeout(cmd, time.Duration(timeout))
if err != nil {
return &out, fmt.Errorf("error running ss -in --%s: %v", proto, err)
}
return &out, nil
}

func (ss *Socketstat) parseAndGather(acc telegraf.Accumulator, data *bytes.Buffer, proto string) {
scanner := bufio.NewScanner(data)
tags := map[string]string{}
fields := make(map[string]interface{})

// ss output can have blank lines, and/or socket basic info lines and more advanced
// statistics lines, in turns.
// In all non-empty lines, we can have metrics, so we need to group those relevant to
// the same connection.
// To achieve this, we're using the flushData variable which indicates if we should add
// a new measurement or postpone it to a later line.

// The first line is only headers
scanner.Scan()

flushData := false
for scanner.Scan() {
line := scanner.Text()
if line == "" {
continue
}
words := strings.Fields(line)

if ss.isNewConnection.MatchString(line) {
// A line with starting whitespace means metrics about the current connection.
// We should never get 2 consecutive such lines. If we do, log a warning and in
// a best effort, extend the metrics from the 1st line with the metrics of the 2nd
// one, possibly overwriting.
for _, word := range words {
if !ss.validValues.MatchString(word) {
continue
}
// kv will have 2 fields because it matched the regexp
kv := strings.Split(word, ":")
v, err := strconv.ParseUint(kv[1], 10, 64)
if err != nil {
ss.Log.Infof("Couldn't parse metric %q: %v", word, err)
continue
}
fields[kv[0]] = v
}
if !flushData {
ss.Log.Warnf("Found orphaned metrics: %s", words)
ss.Log.Warn("Added them to the last known connection.")
}
acc.AddFields(measurement, fields, tags)
flushData = false
continue
}
// A line with no starting whitespace means we're going to parse a new connection.
// Flush what we gathered about the previous one, if any.
if flushData {
acc.AddFields(measurement, fields, tags)
}

// Delegate the real parsing to getTagsAndState, which manages various
// formats depending on the protocol.
tags, fields = getTagsAndState(proto, words, ss.Log)

// This line containted metrics, so record that.
flushData = true
}
if flushData {
acc.AddFields(measurement, fields, tags)
}
}

func getTagsAndState(proto string, words []string, log telegraf.Logger) (map[string]string, map[string]interface{}) {
tags := map[string]string{
"proto": proto,
}
fields := make(map[string]interface{})
switch proto {
case "udp", "raw":
words = append([]string{"dummy"}, words...)
case "tcp", "dccp", "sctp":
fields["state"] = words[0]
}
switch proto {
case "tcp", "udp", "raw", "dccp", "sctp":
// Local and remote addresses are fields 3 and 4
// Separate addresses and ports with the last ':'
localIndex := strings.LastIndex(words[3], ":")
remoteIndex := strings.LastIndex(words[4], ":")
tags["local_addr"] = words[3][:localIndex]
tags["local_port"] = words[3][localIndex+1:]
tags["remote_addr"] = words[4][:remoteIndex]
tags["remote_port"] = words[4][remoteIndex+1:]
case "unix", "packet":
fields["netid"] = words[0]
tags["local_addr"] = words[4]
tags["local_port"] = words[5]
tags["remote_addr"] = words[6]
tags["remote_port"] = words[7]
}
v, err := strconv.ParseUint(words[1], 10, 64)
if err != nil {
log.Warnf("Couldn't read recv_q in %q: %v", words, err)
} else {
fields["recv_q"] = v
}
v, err = strconv.ParseUint(words[2], 10, 64)
if err != nil {
log.Warnf("Couldn't read send_q in %q: %v", words, err)
} else {
fields["send_q"] = v
}
return tags, fields
}

func (ss *Socketstat) Init() error {
if len(ss.SocketProto) == 0 {
ss.SocketProto = []string{"tcp", "udp"}
}

// Initialize regexps to validate input data
validFields := "(bytes_acked|bytes_received|segs_out|segs_in|data_segs_in|data_segs_out)"
ss.validValues = regexp.MustCompile("^" + validFields + ":[0-9]+$")
ss.isNewConnection = regexp.MustCompile(`^\s+.*$`)

ss.lister = socketList

// Check that ss is installed, get its path.
// Do it last, because in test environments where `ss` might not be available,
// we still want the other Init() actions to be performed.
ssPath, err := exec.LookPath("ss")
if err != nil {
return err
}
ss.cmdName = ssPath

return nil
}

func init() {
inputs.Add("socketstat", func() telegraf.Input {
return &Socketstat{Timeout: config.Duration(time.Second)}
})
}

0 comments on commit d9780f8

Please sign in to comment.