Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for fields and protocol lookups in port_name. #8157

Merged
merged 6 commits into from Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 13 additions & 3 deletions plugins/processors/port_name/README.md
@@ -1,8 +1,10 @@
# Port Name Lookup Processor Plugin

Use the `port_name` processor to convert a tag containing a well-known port number to the registered service name.
Use the `port_name` processor to convert a tag or field containing a well-known port number to the registered service name.

Tag can contain a number ("80") or number and protocol separated by slash ("443/tcp"). If protocol is not provided it defaults to tcp but can be changed with the default_protocol setting.
Tag or field can contain a number ("80") or number and protocol separated by slash ("443/tcp"). If protocol is not provided it defaults to tcp but can be changed with the default_protocol setting. An additional tag or field can be specified for the protocol.

If the source was found in tag, the service name will be added as a tag. If the source was found in a field, the service name will also be a field.

Telegraf minimum version: Telegraf 1.15.0

Expand All @@ -12,12 +14,20 @@ Telegraf minimum version: Telegraf 1.15.0
[[processors.port_name]]
## Name of tag holding the port number
# tag = "port"
## Or name of the field holding the port number
# field = "port"

## Name of output tag where service name will be added
## Name of output tag or field (depending on the source) where service name will be added
# dest = "service"

## Default tcp or udp
# default_protocol = "tcp"

## Tag containing the protocol (tcp or udp, case-insensitive)
# protocol_tag = "proto"

## Field containing the protocol (tcp or udp, case-insensitive)
# protocol_field = "proto"
```

### Example
Expand Down
76 changes: 68 additions & 8 deletions plugins/processors/port_name/port_name.go
Expand Up @@ -15,12 +15,20 @@ var sampleConfig = `
[[processors.port_name]]
## Name of tag holding the port number
# tag = "port"
## Or name of the field holding the port number
# field = "port"

## Name of output tag where service name will be added
## Name of output tag or field (depending on the source) where service name will be added
# dest = "service"

## Default tcp or udp
# default_protocol = "tcp"

## Tag containing the protocol (tcp or udp, case-insensitive)
# protocol_tag = "proto"

## Field containing the protocol (tcp or udp, case-insensitive)
# protocol_field = "proto"
`

type sMap map[string]map[int]string // "https" == services["tcp"][443]
Expand All @@ -29,8 +37,11 @@ var services sMap

type PortName struct {
SourceTag string `toml:"tag"`
DestTag string `toml:"dest"`
SourceField string `toml:"field"`
Dest string `toml:"dest"`
DefaultProtocol string `toml:"default_protocol"`
ProtocolTag string `toml:"protocol_tag"`
ProtocolField string `toml:"protocol_field"`

Log telegraf.Logger `toml:"-"`
}
Expand All @@ -40,7 +51,7 @@ func (d *PortName) SampleConfig() string {
}

func (d *PortName) Description() string {
return "Given a tag of a TCP or UDP port number, add a tag of the service name looked up in the system services file"
return "Given a tag/field of a TCP or UDP port number, add a tag/field of the service name looked up in the system services file"
}

func readServicesFile() {
Expand Down Expand Up @@ -97,11 +108,36 @@ func readServices(r io.Reader) sMap {

func (d *PortName) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
for _, m := range metrics {
portProto, ok := m.GetTag(d.SourceTag)
if !ok {
// Nonexistent tag

var portProto string
var fromField bool
a-bali marked this conversation as resolved.
Show resolved Hide resolved

if len(d.SourceTag) > 0 {
if tag, ok := m.GetTag(d.SourceTag); ok {
portProto = string([]byte(tag))
}
}
if len(d.SourceField) > 0 {
if field, ok := m.GetField(d.SourceField); ok {
switch v := field.(type) {
default:
d.Log.Errorf("Unexpected type %t in source field; must be string or int", v)
a-bali marked this conversation as resolved.
Show resolved Hide resolved
continue
case int64:
portProto = strconv.FormatInt(v, 10)
case uint64:
portProto = strconv.FormatUint(v, 10)
case string:
portProto = v
}
fromField = true
}
}

if len(portProto) == 0 {
continue
}

portProtoSlice := strings.SplitN(portProto, "/", 2)
l := len(portProtoSlice)

Expand All @@ -127,6 +163,23 @@ func (d *PortName) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
if l > 1 && len(portProtoSlice[1]) > 0 {
proto = portProtoSlice[1]
}
if len(d.ProtocolTag) > 0 {
if tag, ok := m.GetTag(d.ProtocolTag); ok {
proto = tag
}
}
if len(d.ProtocolField) > 0 {
if field, ok := m.GetField(d.ProtocolField); ok {
switch v := field.(type) {
default:
d.Log.Errorf("Unexpected type %t in protocol field; must be string", v)
continue
case string:
proto = v
}
}
}

proto = strings.ToLower(proto)

protoMap, ok := services[proto]
Expand All @@ -151,7 +204,11 @@ func (d *PortName) Apply(metrics ...telegraf.Metric) []telegraf.Metric {
continue
}

m.AddTag(d.DestTag, service)
if fromField {
m.AddField(d.Dest, service)
} else {
m.AddTag(d.Dest, service)
}
}

return metrics
Expand All @@ -167,8 +224,11 @@ func init() {
processors.Add("port_name", func() telegraf.Processor {
return &PortName{
SourceTag: "port",
DestTag: "service",
SourceField: "port",
Dest: "service",
DefaultProtocol: "tcp",
ProtocolTag: "proto",
ProtocolField: "proto",
}
})
}
107 changes: 100 additions & 7 deletions plugins/processors/port_name/port_name_test.go
Expand Up @@ -28,12 +28,15 @@ func TestFakeServices(t *testing.T) {

func TestTable(t *testing.T) {
var tests = []struct {
name string
tag string
dest string
prot string
input []telegraf.Metric
expected []telegraf.Metric
name string
tag string
field string
dest string
prot string
protField string
protTag string
input []telegraf.Metric
expected []telegraf.Metric
}{
{
name: "ordinary tcp default",
Expand Down Expand Up @@ -239,6 +242,93 @@ func TestTable(t *testing.T) {
),
},
},
{
name: "read from field instead of tag",
field: "foo",
dest: "bar",
prot: "tcp",
input: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{},
map[string]interface{}{
"foo": "80",
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{},
map[string]interface{}{
"foo": "80",
"bar": "http",
},
time.Unix(0, 0),
),
},
},
{
name: "read proto from field",
field: "foo",
dest: "bar",
prot: "udp",
protField: "proto",
input: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{},
map[string]interface{}{
"foo": "80",
"proto": "tcp",
},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{},
map[string]interface{}{
"foo": "80",
"bar": "http",
"proto": "tcp",
},
time.Unix(0, 0),
),
},
},
{
name: "read proto from tag",
tag: "foo",
dest: "bar",
prot: "udp",
protTag: "proto",
input: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{
"foo": "80",
"proto": "tcp",
},
map[string]interface{}{},
time.Unix(0, 0),
),
},
expected: []telegraf.Metric{
testutil.MustMetric(
"meas",
map[string]string{
"foo": "80",
"bar": "http",
"proto": "tcp",
},
map[string]interface{}{},
time.Unix(0, 0),
),
},
},
}

r := strings.NewReader(fakeServices)
Expand All @@ -248,8 +338,11 @@ func TestTable(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
p := PortName{
SourceTag: tt.tag,
DestTag: tt.dest,
SourceField: tt.field,
Dest: tt.dest,
DefaultProtocol: tt.prot,
ProtocolField: tt.protField,
ProtocolTag: tt.protTag,
Log: testutil.Logger{},
}

Expand Down