Skip to content

Commit

Permalink
[receivercreator, observers]: Cleanup endpoints code (#2004)
Browse files Browse the repository at this point in the history
This is a more limited change from #1898 that just cleans up the endpoint code
but doesn't change how endpoint types are selected.

**Testing:**
Tested end to end with helm chart.

**Documentation:**
Updated receiver_creator docs to document hostport.
  • Loading branch information
jrcamp committed Jan 14, 2021
1 parent f74b765 commit ebd62bd
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 120 deletions.
148 changes: 99 additions & 49 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,47 @@

package observer

import "fmt"
import (
"errors"
"fmt"
)

type EndpointID string
type (
// EndpointID unique identifies an endpoint per-observer instance.
EndpointID string
// EndpointEnv is a map of endpoint attributes.
EndpointEnv map[string]interface{}
// EndpointType is a type of an endpoint like a port or pod.
EndpointType string
)

const (
// PortType is a port endpoint.
PortType EndpointType = "port"
// PodType is a pod endpoint.
PodType EndpointType = "pod"
// HostPortType is a hostport endpoint.
HostPortType EndpointType = "hostport"
)

var (
// EndpointTypes is a map of all known endpoint types.
EndpointTypes = map[EndpointType]bool{
PortType: true,
PodType: true,
HostPortType: true,
}

_ EndpointDetails = (*Pod)(nil)
_ EndpointDetails = (*Port)(nil)
_ EndpointDetails = (*HostPort)(nil)
)

// EndpointDetails provides additional context about an endpoint such as a Pod or Port.
type EndpointDetails interface {
Env() EndpointEnv
Type() EndpointType
}

// Endpoint is a service that can be contacted remotely.
type Endpoint struct {
Expand All @@ -25,7 +63,26 @@ type Endpoint struct {
// Target is an IP address or hostname of the endpoint.
Target string
// Details contains additional context about the endpoint such as a Pod or Port.
Details interface{}
Details EndpointDetails
}

// Env converts an endpoint into a map suitable for expr evaluation.
func (e *Endpoint) Env() (EndpointEnv, error) {
if e.Details == nil {
return nil, errors.New("endpoint is missing details")
}
env := e.Details.Env()
env["endpoint"] = e.Target

// Populate type field for evaluating rules with `type.port && ...`.
// Use string instead of EndpointType for rule evaluation.
types := map[string]bool{}
for endpointType := range EndpointTypes {
types[string(endpointType)] = false
}
types[string(e.Details.Type())] = true
env["type"] = types
return env, nil
}

func (e *Endpoint) String() string {
Expand All @@ -42,6 +99,18 @@ type Pod struct {
Annotations map[string]string
}

func (p *Pod) Env() EndpointEnv {
return map[string]interface{}{
"name": p.Name,
"labels": p.Labels,
"annotations": p.Annotations,
}
}

func (p *Pod) Type() EndpointType {
return PodType
}

// Port is an endpoint that has a target as well as a port.
type Port struct {
// Name is the name of the container port.
Expand All @@ -54,6 +123,23 @@ type Port struct {
Transport Transport
}

func (p *Port) Env() EndpointEnv {
return map[string]interface{}{
"name": p.Name,
"port": p.Port,
"pod": map[string]interface{}{
"name": p.Pod.Name,
"labels": p.Pod.Labels,
"annotations": p.Pod.Annotations,
},
"transport": p.Transport,
}
}

func (p *Port) Type() EndpointType {
return PortType
}

// HostPort is an endpoint discovered on a host.
type HostPort struct {
// Name of the process associated to Endpoint. If host_observer
Expand All @@ -70,52 +156,16 @@ type HostPort struct {
IsIPv6 bool
}

type EndpointEnv map[string]interface{}

// EndpointToEnv converts an endpoint into a map suitable for expr evaluation.
func EndpointToEnv(endpoint Endpoint) (EndpointEnv, error) {
ruleTypes := map[string]interface{}{
"port": false,
"pod": false,
func (h *HostPort) Env() EndpointEnv {
return map[string]interface{}{
"name": h.Name,
"command": h.Command,
"is_ipv6": h.IsIPv6,
"port": h.Port,
"transport": h.Transport,
}
}

switch o := endpoint.Details.(type) {
case Pod:
ruleTypes["pod"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"labels": o.Labels,
"annotations": o.Annotations,
}, nil
case Port:
ruleTypes["port"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"port": o.Port,
"pod": map[string]interface{}{
"name": o.Pod.Name,
"labels": o.Pod.Labels,
"annotations": o.Pod.Annotations,
},
"transport": o.Transport,
}, nil
case HostPort:
ruleTypes["port"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"command": o.Command,
"is_ipv6": o.IsIPv6,
"port": o.Port,
"transport": o.Transport,
}, nil

default:
return nil, fmt.Errorf("unknown endpoint details type %T", endpoint.Details)
}
func (h *HostPort) Type() EndpointType {
return HostPortType
}
45 changes: 19 additions & 26 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"testing"
)

func TestEndpointToEnv(t *testing.T) {
func TestEndpointEnv(t *testing.T) {
tests := []struct {
name string
endpoint Endpoint
Expand All @@ -31,7 +31,7 @@ func TestEndpointToEnv(t *testing.T) {
endpoint: Endpoint{
ID: EndpointID("pod_id"),
Target: "192.68.73.2",
Details: Pod{
Details: &Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
Expand All @@ -42,9 +42,10 @@ func TestEndpointToEnv(t *testing.T) {
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": false,
"pod": true,
"type": map[string]bool{
"pod": true,
"hostport": false,
"port": false,
},
"endpoint": "192.68.73.2",
"name": "pod_name",
Expand All @@ -62,7 +63,7 @@ func TestEndpointToEnv(t *testing.T) {
endpoint: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: Port{
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Expand All @@ -78,9 +79,10 @@ func TestEndpointToEnv(t *testing.T) {
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": true,
"pod": false,
"type": map[string]bool{
"pod": false,
"hostport": false,
"port": true,
},
"endpoint": "192.68.73.2",
"name": "port_name",
Expand All @@ -103,7 +105,7 @@ func TestEndpointToEnv(t *testing.T) {
endpoint: Endpoint{
ID: EndpointID("port_id"),
Target: "127.0.0.1",
Details: HostPort{
Details: &HostPort{
Name: "process_name",
Command: "./cmd --config config.yaml",
Port: 2379,
Expand All @@ -112,9 +114,10 @@ func TestEndpointToEnv(t *testing.T) {
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": true,
"pod": false,
"type": map[string]bool{
"hostport": true,
"pod": false,
"port": false,
},
"endpoint": "127.0.0.1",
"name": "process_name",
Expand All @@ -125,26 +128,16 @@ func TestEndpointToEnv(t *testing.T) {
},
wantErr: false,
},
{
name: "Unsupported endpoint",
endpoint: Endpoint{
ID: EndpointID("port_id"),
Target: "127.0.0.1:2379",
Details: map[string]interface{}{},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := EndpointToEnv(tt.endpoint)
got, err := tt.endpoint.Env()
if (err != nil) != tt.wantErr {
t.Errorf("EndpointToEnv() error = %v, wantErr %v", err, tt.wantErr)
t.Errorf("Env() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("EndpointToEnv() got = %v, want %v", got, tt.want)
t.Errorf("Env() got = %v, want %v", got, tt.want)
}
})
}
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/hostobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (e endpointsLister) collectEndpoints(conns []net.ConnectionStat) []observer
endpoints = append(endpoints, observer.Endpoint{
ID: id,
Target: cd.target,
Details: observer.HostPort{
Details: &observer.HostPort{
Port: cd.port,
Transport: cd.transport,
// TODO: Move this field to observer.Endpoint and
Expand Down Expand Up @@ -166,7 +166,7 @@ func (e endpointsLister) collectEndpoints(conns []net.ConnectionStat) []observer
e := observer.Endpoint{
ID: id,
Target: cd.target,
Details: observer.HostPort{
Details: &observer.HostPort{
Name: pd.name,
Command: pd.args,
Port: cd.port,
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/hostobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func TestHostObserver(t *testing.T) {
require.NotEmpty(t, actualEndpoint, "expected endpoint ID not found. ID: %v", expectedID)
assert.Equal(t, expectedID, actualEndpoint.ID, "unexpected endpoint ID found")

details, ok := actualEndpoint.Details.(observer.HostPort)
details, ok := actualEndpoint.Details.(*observer.HostPort)
assert.True(t, ok, "failed to get Endpoint.Details")
assert.Equal(t, filepath.Base(exe), details.Name)
assert.Equal(t, tt.protocol, details.Transport)
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestCollectEndpoints(t *testing.T) {
{
ID: observer.EndpointID("()123.345.567.789-80-TCP"),
Target: "123.345.567.789:80",
Details: observer.HostPort{
Details: &observer.HostPort{
Name: "",
Port: 80,
Transport: observer.ProtocolTCP,
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/k8sobserver/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestExtensionObserve(t *testing.T) {
assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/pod1-UID",
Target: "1.2.3.4",
Details: observer.Pod{
Details: &observer.Pod{
Name: "pod1",
Labels: map[string]string{
"env": "prod",
Expand All @@ -74,7 +74,7 @@ func TestExtensionObserve(t *testing.T) {
assert.Equal(t, observer.Endpoint{
ID: "k8s_observer/pod1-UID",
Target: "1.2.3.4",
Details: observer.Pod{
Details: &observer.Pod{
Name: "pod1",
Labels: map[string]string{
"env": "prod",
Expand Down
4 changes: 2 additions & 2 deletions extension/observer/k8sobserver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (h *handler) convertPodToEndpoints(pod *v1.Pod) []observer.Endpoint {
endpoints := []observer.Endpoint{{
ID: podID,
Target: podIP,
Details: podDetails,
Details: &podDetails,
}}

// Map of running containers by name.
Expand All @@ -84,7 +84,7 @@ func (h *handler) convertPodToEndpoints(pod *v1.Pod) []observer.Endpoint {
endpoints = append(endpoints, observer.Endpoint{
ID: endpointID,
Target: fmt.Sprintf("%s:%d", podIP, port.ContainerPort),
Details: observer.Port{
Details: &observer.Port{
Pod: podDetails,
Name: port.Name,
Port: uint16(port.ContainerPort),
Expand Down
Loading

0 comments on commit ebd62bd

Please sign in to comment.