Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Host Observer #432

Merged
merged 5 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/otelcontribcol/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/signalfxexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/splunkhecexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/stackdriverexporter"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/hostobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/extension/observer/k8sobserver"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sprocessor"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor"
Expand All @@ -50,14 +51,15 @@ import (
)

func components() (component.Factories, error) {
errs := []error{}
var errs []error
factories, err := defaultcomponents.Components()
if err != nil {
return component.Factories{}, err
}

extensions := []component.ExtensionFactory{
k8sobserver.NewFactory(),
&hostobserver.Factory{},
}

for _, ext := range factories.Extensions {
Expand Down
466 changes: 0 additions & 466 deletions exporter/signalfxexporter/go.sum

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions extension/observer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ Currently the only component that uses observers is the [receiver_creator](../..
## Current Observers

* [k8sobserver](k8sobserver/README.md)
* [hostobserver](hostobserver/README.md)
37 changes: 34 additions & 3 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,30 @@ type Pod struct {

// Port is an endpoint that has a target as well as a port.
type Port struct {
Name string
Pod Pod
Port uint16
// Name is the name of the container port.
Name string
// Pod is the k8s pod in which the container is running.
Pod Pod
// Port number of the endpoint.
Port uint16
// Transport is the transport protocol used by the Endpoint. (TCP or UDP).
Transport Transport
}

// HostPort is an endpoint discovered on a host.
type HostPort struct {
// Name of the process associated to Endpoint. If host_observer
// is unable to collect information about process using the
// Port, this value is an empty string.
Name string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't collect it should we just make name be empty string? Why set it to id?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good. Will update.

// Command used to invoke the process using the Endpoint.
Command string
// Port number of the endpoint.
Port uint16
// Transport is the transport protocol used by the Endpoint. (TCP or UDP).
Transport Transport
// IsIPv6 indicates whether or not the Endpoint is IPv6.
IsIPv6 bool
}

type EndpointEnv map[string]interface{}
Expand Down Expand Up @@ -83,6 +103,17 @@ func EndpointToEnv(endpoint Endpoint) (EndpointEnv, error) {
},
"transport": o.Transport,
}, nil
case HostPort:
ruleTypes["port"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"command": o.Command,
"is_ipv6": o.IsIPv6,
"port": o.Port,
"transport": o.Transport,
}, nil

default:
return nil, fmt.Errorf("unknown endpoint details type %T", endpoint.Details)
Expand Down
27 changes: 27 additions & 0 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,33 @@ func TestEndpointToEnv(t *testing.T) {
},
wantErr: false,
},
{
name: "Host port",
endpoint: Endpoint{
ID: EndpointID("port_id"),
Target: "127.0.0.1",
Details: HostPort{
Name: "process_name",
Command: "./cmd --config config.yaml",
Port: 2379,
Transport: ProtocolUDP,
IsIPv6: true,
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": true,
"pod": false,
},
"endpoint": "127.0.0.1",
"name": "process_name",
"command": "./cmd --config config.yaml",
"is_ipv6": true,
"port": uint16(2379),
"transport": ProtocolUDP,
},
wantErr: false,
},
{
name: "Unsupported endpoint",
endpoint: Endpoint{
Expand Down
11 changes: 9 additions & 2 deletions extension/observer/endpointswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// use of EndpointsWatcher to poll for endpoints by embedding this struct
// in the observer struct.
type EndpointsWatcher struct {
ListEndpoints func() []Endpoint
Endpointslister EndpointsLister
RefreshInterval time.Duration
existingEndpoints map[EndpointID]Endpoint
stop chan struct{}
Expand Down Expand Up @@ -57,7 +57,7 @@ func (ew *EndpointsWatcher) ListAndWatch(listener Notify) {
// refreshEndpoints updates the listener with the latest list
// of active endpoints.
func (ew *EndpointsWatcher) refreshEndpoints(listener Notify) {
latestEndpoints := ew.ListEndpoints()
latestEndpoints := ew.Endpointslister.ListEndpoints()

// Create map from ID to endpoint for lookup.
latestEndpointsMap := make(map[EndpointID]bool, len(latestEndpoints))
Expand Down Expand Up @@ -107,3 +107,10 @@ func (ew *EndpointsWatcher) refreshEndpoints(listener Notify) {
func (ew *EndpointsWatcher) StopListAndWatch() {
close(ew.stop)
}

// EndpointsLister that provides a list of endpoints.
type EndpointsLister interface {
// ListEndpoints provides a list of endpoints and is expected to be
// implemented by an observer looking for endpoints.
ListEndpoints() []Endpoint
}
116 changes: 69 additions & 47 deletions extension/observer/endpointswatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,17 @@ package observer

import (
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRefreshEndpointsOnStartup(t *testing.T) {
endpointsMap = map[EndpointID]Endpoint{}
ml, ew, mn := setup()

ew := EndpointsWatcher{
ListEndpoints: listEndpoints,
RefreshInterval: 2 * time.Second,
existingEndpoints: map[EndpointID]Endpoint{},
}

mn := mockNotifier{}

addEndpoint(0)
ml.addEndpoint(0)
ew.ListAndWatch(mn)
ew.StopListAndWatch()

Expand All @@ -44,79 +37,108 @@ func TestRefreshEndpointsOnStartup(t *testing.T) {
}

func TestRefreshEndpoints(t *testing.T) {
endpointsMap = map[EndpointID]Endpoint{}

ew := EndpointsWatcher{
ListEndpoints: listEndpoints,
RefreshInterval: 2 * time.Second,
existingEndpoints: map[EndpointID]Endpoint{},
}

mn := mockNotifier{}
ml, ew, mn := setup()

addEndpoint(0)
ml.addEndpoint(0)
ew.refreshEndpoints(mn)

expected := map[EndpointID]Endpoint{"0": {ID: "0"}}
require.Equal(t, expected, ew.existingEndpoints)

addEndpoint(1)
addEndpoint(2)
removeEndpoint(0)
ml.addEndpoint(1)
ml.addEndpoint(2)
ml.removeEndpoint(0)
ew.refreshEndpoints(mn)

expected["1"] = Endpoint{ID: "1"}
expected["2"] = Endpoint{ID: "2"}
delete(expected, "0")
require.Equal(t, expected, ew.existingEndpoints)

updateEndpoint(2, "updated_target")
ml.updateEndpoint(2, "updated_target")
ew.refreshEndpoints(mn)

expected["2"] = Endpoint{ID: "2", Target: "updated_target"}
require.Equal(t, expected, ew.existingEndpoints)
}

var endpointsMap map[EndpointID]Endpoint
func setup() (*mockEndpointsLister, EndpointsWatcher, mockNotifier) {
ml := &mockEndpointsLister{
endpointsMap: map[EndpointID]Endpoint{},
}

ew := EndpointsWatcher{
Endpointslister: ml,
RefreshInterval: 2 * time.Second,
existingEndpoints: map[EndpointID]Endpoint{},
}

mn := mockNotifier{}

return ml, ew, mn
}

type mockNotifier struct {
}

var _ Notify = (*mockNotifier)(nil)

func (m mockNotifier) OnAdd([]Endpoint) {
}

func (m mockNotifier) OnRemove([]Endpoint) {
}

func (m mockNotifier) OnChange([]Endpoint) {
}

type mockEndpointsLister struct {
sync.Mutex
endpointsMap map[EndpointID]Endpoint
}

func (m *mockEndpointsLister) addEndpoint(n int) {
m.Lock()
defer m.Unlock()

func addEndpoint(n int) {
id := EndpointID(strconv.Itoa(n))
e := Endpoint{ID: id}
endpointsMap[id] = e
m.endpointsMap[id] = e
}

func removeEndpoint(n int) {
func (m *mockEndpointsLister) removeEndpoint(n int) {
m.Lock()
defer m.Unlock()

id := EndpointID(strconv.Itoa(n))
delete(endpointsMap, id)
delete(m.endpointsMap, id)
}

func updateEndpoint(n int, target string) {
func (m *mockEndpointsLister) updateEndpoint(n int, target string) {
m.Lock()
defer m.Unlock()

id := EndpointID(strconv.Itoa(n))
e := Endpoint{
ID: id,
Target: target,
}
endpointsMap[id] = e
m.endpointsMap[id] = e
}

func listEndpoints() []Endpoint {
endpoints := make([]Endpoint, 0)
for _, e := range endpointsMap {
endpoints = append(endpoints, e)
}
return endpoints
}
func (m *mockEndpointsLister) ListEndpoints() []Endpoint {
m.Lock()
defer m.Unlock()

type mockNotifier struct {
}

var _ Notify = (*mockNotifier)(nil)
out := make([]Endpoint, len(m.endpointsMap))

func (m mockNotifier) OnAdd([]Endpoint) {
}
i := 0
for _, e := range m.endpointsMap {
out[i] = e
i++
}

func (m mockNotifier) OnRemove([]Endpoint) {
return out
}

func (m mockNotifier) OnChange([]Endpoint) {
}
var _ EndpointsLister = (*mockEndpointsLister)(nil)
1 change: 1 addition & 0 deletions extension/observer/hostobserver/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../../Makefile.Common
30 changes: 30 additions & 0 deletions extension/observer/hostobserver/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Host Observer Extension

**Status: beta**

The `host_observer` looks at the current host for listening network endpoints.

It will look for all listening sockets on TCP and UDP over IPv4 and IPv6.

It uses the /proc filesystem and requires the SYS_PTRACE and DAC_READ_SEARCH capabilities so that it can determine what processes own the listening sockets.

### Configuration

#### `refresh_interval`

Determines how often to look for changes in endpoints.

default: `10s`

### Endpoint Variables

Endpoint variables exposed by this observer are as follows.

| Variable | Description |
|-----------|--------------------------------------------------------------------------------------------|
| type.port | `true` |
| name | name of the process associated to the port |
| port | port number |
| command | full command used to invoke this process, including the executable itself at the beginning |
| is_ipv6 | `true` if the endpoint is IPv6 |
| transport | "TCP" or "UDP" |
30 changes: 30 additions & 0 deletions extension/observer/hostobserver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package hostobserver

import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
)

// Config defines configuration for host observer.
type Config struct {
configmodels.ExtensionSettings `mapstructure:",squash"`

// RefreshInterval determines how frequency at which the observer
// needs to poll for collecting information about new processes.
RefreshInterval time.Duration `mapstructure:"refresh_interval"`
}