Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observer extension: Add generic endpoint watcher #427

Merged
merged 5 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
87 changes: 87 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import "fmt"

// Endpoint is a service that can be contacted remotely.
type Endpoint struct {
// ID uniquely identifies this endpoint.
ID string
// Target is an IP address or hostname of the endpoint.
Target string
// Details contains additional context about the endpoint such as a Pod or Port.
Details interface{}
}

func (e *Endpoint) String() string {
return fmt.Sprintf("Endpoint{ID: %v, Target: %v, Details: %T%+v}", e.ID, e.Target, e.Details, e.Details)
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Name string
// Labels is a map of user-specified metadata.
Labels map[string]string
// Annotations is a map of user-specified metadata.
Annotations map[string]string
}

// Port is an endpoint that has a target as well as a port.
type Port struct {
Name string
Pod Pod
Port uint16
Protocol Protocol
}

type EndpointEnv map[string]interface{}

// EndpointToEnv converts an endpoint into a map suitable for expr evaluation.
func EndpointToEnv(endpoint Endpoint) (EndpointEnv, error) {
ruleTypes := map[string]interface{}{
"port": false,
"pod": false,
}

switch o := endpoint.Details.(type) {
case Pod:
ruleTypes["pod"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"labels": o.Labels,
"annotations": o.Annotations,
}, nil
case Port:
ruleTypes["port"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"port": o.Port,
"pod": map[string]interface{}{
"name": o.Pod.Name,
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
"labels": o.Pod.Labels,
},
"protocol": o.Protocol,
}, nil

default:
return nil, fmt.Errorf("unknown endpoint details type %T", endpoint.Details)
}
}
121 changes: 121 additions & 0 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"reflect"
"testing"
)

func TestEndpointToEnv(t *testing.T) {
tests := []struct {
name string
endpoint Endpoint
want EndpointEnv
wantErr bool
}{
{
name: "Pod",
endpoint: Endpoint{
ID: "pod_id",
Target: "192.68.73.2",
Details: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": false,
"pod": true,
},
"endpoint": "192.68.73.2",
"name": "pod_name",
"labels": map[string]string{
"label_key": "label_val",
},
"annotations": map[string]string{
"annotation_1": "value_1",
},
},
wantErr: false,
},
{
name: "K8s port",
endpoint: Endpoint{
ID: "port_id",
Target: "192.68.73.2",
Details: Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
},
Port: 2379,
Protocol: ProtocolTCP,
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": true,
"pod": false,
},
"endpoint": "192.68.73.2",
"name": "port_name",
"port": uint16(2379),
"pod": map[string]interface{}{
"name": "pod_name",
"labels": map[string]string{
"label_key": "label_val",
},
},
"protocol": ProtocolTCP,
},
wantErr: false,
},
{
name: "Unsupported endpoint",
endpoint: Endpoint{
ID: "port_id",
Target: "127.0.0.1:2379",
Details: map[string]interface{}{},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := EndpointToEnv(tt.endpoint)
if (err != nil) != tt.wantErr {
t.Errorf("EndpointToEnv() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("EndpointToEnv() got = %v, want %v", got, tt.want)
}
})
}
}
98 changes: 98 additions & 0 deletions extension/observer/endpointswatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"time"
)

// EndpointsWatcher provides a generic mechanism to run ListEndpoints every
// RefreshInterval and report any new or removed endpoints using Notify
// passed into ListAndWatch. Any observer that lists endpoints can make
// use of EndpointsWatcher to poll for endpoints by embedding this struct
// in the observer struct.
type EndpointsWatcher struct {
ListEndpoints func() []Endpoint
RefreshInterval time.Duration
existingEndpoints map[string]Endpoint
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
stop chan struct{}
}

// ListAndWatch runs ListEndpoints on a regular interval and keeps the list.
func (ew *EndpointsWatcher) ListAndWatch(listener Notify) {
ew.existingEndpoints = make(map[string]Endpoint)
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
ew.stop = make(chan struct{})

ticker := time.NewTicker(ew.RefreshInterval)

// Do the initial listing immediately so that services can be monitored ASAP.
ew.refreshEndpoints(listener)

go func() {
for {
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-ew.stop:
ticker.Stop()
return
case <-ticker.C:
ew.refreshEndpoints(listener)
}
}
}()
}

// refreshEndpoints updates the listener with the latest list
// of active endpoints.
func (ew *EndpointsWatcher) refreshEndpoints(listener Notify) {
latestEndpoints := ew.ListEndpoints()

// Create map from ID to endpoint for lookup.
latestEndpointsMap := make(map[string]Endpoint, len(latestEndpoints))
for _, e := range latestEndpoints {
latestEndpointsMap[e.ID] = e
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
}

var removedEndpoints, addedEndpoints []Endpoint
// Iterate over the latest endpoints obtained. An endpoint needs
// to be added in case it is not already available in existingEndpoints.
for _, e := range latestEndpoints {
if _, ok := ew.existingEndpoints[e.ID]; !ok {
ew.existingEndpoints[e.ID] = e
addedEndpoints = append(addedEndpoints, e)
}
}

// If endpoint present in existingEndpoints does not exist in the latest
// list, it needs to be removed.
for id, e := range ew.existingEndpoints {
if _, ok := latestEndpointsMap[e.ID]; !ok {
asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
delete(ew.existingEndpoints, id)
removedEndpoints = append(removedEndpoints, e)
}
}

asuresh4 marked this conversation as resolved.
Show resolved Hide resolved
if len(removedEndpoints) > 0 {
listener.OnRemove(removedEndpoints)
}

if len(addedEndpoints) > 0 {
listener.OnAdd(addedEndpoints)
}
}

// StopListAndWatch polling the ListEndpoints.
func (ew *EndpointsWatcher) StopListAndWatch() {
close(ew.stop)
}
88 changes: 88 additions & 0 deletions extension/observer/endpointswatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRefreshEndpoints(t *testing.T) {
endpointsMap = map[string]Endpoint{}

ew := EndpointsWatcher{
ListEndpoints: listEndpoints,
RefreshInterval: 2,
existingEndpoints: map[string]Endpoint{},
}

mn := mockNotifier{}

addEndpoint(0)
ew.ListAndWatch(mn)

// Endpoints available before the ListAndWatch call should be
// readily discovered.
expected := map[string]Endpoint{"0": {ID: "0"}}
require.Equal(t, expected, ew.existingEndpoints)

addEndpoint(1)
addEndpoint(2)
removeEndpoint(0)

time.Sleep(1 * time.Second)
ew.StopListAndWatch()

expected["1"] = Endpoint{ID: "1"}
expected["2"] = Endpoint{ID: "2"}
delete(expected, "0")
require.Equal(t, expected, ew.existingEndpoints)
}

var endpointsMap map[string]Endpoint

func addEndpoint(n int) {
e := Endpoint{ID: strconv.Itoa(n)}
endpointsMap[e.ID] = e
}

func removeEndpoint(n int) {
delete(endpointsMap, strconv.Itoa(n))
}

func listEndpoints() []Endpoint {
endpoints := make([]Endpoint, 0)
for _, e := range endpointsMap {
endpoints = append(endpoints, e)
}
return endpoints
}

type mockNotifier struct {
}

var _ Notify = (*mockNotifier)(nil)

func (m mockNotifier) OnAdd([]Endpoint) {
}

func (m mockNotifier) OnRemove([]Endpoint) {
}

func (m mockNotifier) OnChange([]Endpoint) {
}