Skip to content

Commit

Permalink
observer extension: Add generic endpoint watcher
Browse files Browse the repository at this point in the history
  • Loading branch information
asuresh4 committed Jul 13, 2020
1 parent 554000d commit daf3f9c
Show file tree
Hide file tree
Showing 12 changed files with 424 additions and 86 deletions.
87 changes: 87 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import "fmt"

// Endpoint is a service that can be contacted remotely.
type Endpoint struct {
// ID uniquely identifies this endpoint.
ID string
// Target is an IP address or hostname of the endpoint.
Target string
// Details contains additional context about the endpoint such as a Pod or Port.
Details interface{}
}

func (e *Endpoint) String() string {
return fmt.Sprintf("Endpoint{ID: %v, Target: %v, Details: %T%+v}", e.ID, e.Target, e.Details, e.Details)
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Name string
// Labels is a map of user-specified metadata.
Labels map[string]string
// Annotations is a map of user-specified metadata.
Annotations map[string]string
}

// Port is an endpoint that has a target as well as a port.
type Port struct {
Name string
Pod Pod
Port uint16
Protocol Protocol
}

type EndpointEnv map[string]interface{}

// EndpointToEnv converts an endpoint into a map suitable for expr evaluation.
func EndpointToEnv(endpoint Endpoint) (EndpointEnv, error) {
ruleTypes := map[string]interface{}{
"port": false,
"pod": false,
}

switch o := endpoint.Details.(type) {
case Pod:
ruleTypes["pod"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"labels": o.Labels,
"annotations": o.Annotations,
}, nil
case Port:
ruleTypes["port"] = true
return map[string]interface{}{
"type": ruleTypes,
"endpoint": endpoint.Target,
"name": o.Name,
"port": o.Port,
"pod": map[string]interface{}{
"name": o.Pod.Name,
"labels": o.Pod.Labels,
},
"protocol": o.Protocol,
}, nil

default:
return nil, fmt.Errorf("unknown endpoint details type %T", endpoint.Details)
}
}
121 changes: 121 additions & 0 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"reflect"
"testing"
)

func TestEndpointToEnv(t *testing.T) {
tests := []struct {
name string
endpoint Endpoint
want EndpointEnv
wantErr bool
}{
{
name: "Pod",
endpoint: Endpoint{
ID: "pod_id",
Target: "192.68.73.2",
Details: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": false,
"pod": true,
},
"endpoint": "192.68.73.2",
"name": "pod_name",
"labels": map[string]string{
"label_key": "label_val",
},
"annotations": map[string]string{
"annotation_1": "value_1",
},
},
wantErr: false,
},
{
name: "K8s port",
endpoint: Endpoint{
ID: "port_id",
Target: "192.68.73.2",
Details: Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
},
Port: 2379,
Protocol: ProtocolTCP,
},
},
want: EndpointEnv{
"type": map[string]interface{}{
"port": true,
"pod": false,
},
"endpoint": "192.68.73.2",
"name": "port_name",
"port": uint16(2379),
"pod": map[string]interface{}{
"name": "pod_name",
"labels": map[string]string{
"label_key": "label_val",
},
},
"protocol": ProtocolTCP,
},
wantErr: false,
},
{
name: "Unsupported endpoint",
endpoint: Endpoint{
ID: "port_id",
Target: "127.0.0.1:2379",
Details: map[string]interface{}{},
},
want: nil,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := EndpointToEnv(tt.endpoint)
if (err != nil) != tt.wantErr {
t.Errorf("EndpointToEnv() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("EndpointToEnv() got = %v, want %v", got, tt.want)
}
})
}
}
98 changes: 98 additions & 0 deletions extension/observer/endpointswatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"time"
)

// EndpointsWatcher provides a generic mechanism to run ListEndpoints every
// RefreshInterval and report any new or removed endpoints using Notify
// passed into ListAndWatch. Any observer that lists endpoints can make
// use of EndpointsWatcher to poll for endpoints by embedding this struct
// in the observer struct.
type EndpointsWatcher struct {
ListEndpoints func() []Endpoint
RefreshInterval time.Duration
existingEndpoints map[string]Endpoint
stop chan struct{}
}

// ListAndWatch runs ListEndpoints on a regular interval and keeps the list.
func (ew *EndpointsWatcher) ListAndWatch(listener Notify) {
ew.existingEndpoints = make(map[string]Endpoint)
ew.stop = make(chan struct{})

ticker := time.NewTicker(ew.RefreshInterval)

// Do the initial listing immediately so that services can be monitored ASAP.
ew.refreshEndpoints(listener)

go func() {
for {
select {
case <-ew.stop:
ticker.Stop()
return
case <-ticker.C:
ew.refreshEndpoints(listener)
}
}
}()
}

// refreshEndpoints updates the listener with the latest list
// of active endpoints.
func (ew *EndpointsWatcher) refreshEndpoints(listener Notify) {
latestEndpoints := ew.ListEndpoints()

// Create map from ID to endpoint for lookup.
latestEndpointsMap := make(map[string]Endpoint, len(latestEndpoints))
for _, e := range latestEndpoints {
latestEndpointsMap[e.ID] = e
}

var removedEndpoints, addedEndpoints []Endpoint
// Iterate over the latest endpoints obtained. An endpoint needs
// to be added in case it is not already available in existingEndpoints.
for _, e := range latestEndpoints {
if _, ok := ew.existingEndpoints[e.ID]; !ok {
ew.existingEndpoints[e.ID] = e
addedEndpoints = append(addedEndpoints, e)
}
}

// If endpoint present in existingEndpoints does not exist in the latest
// list, it needs to be removed.
for id, e := range ew.existingEndpoints {
if _, ok := latestEndpointsMap[e.ID]; !ok {
delete(ew.existingEndpoints, id)
removedEndpoints = append(removedEndpoints, e)
}
}

if len(removedEndpoints) > 0 {
listener.OnRemove(removedEndpoints)
}

if len(addedEndpoints) > 0 {
listener.OnAdd(addedEndpoints)
}
}

// StopListAndWatch polling the ListEndpoints.
func (ew *EndpointsWatcher) StopListAndWatch() {
close(ew.stop)
}
89 changes: 89 additions & 0 deletions extension/observer/endpointswatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package observer

import (
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestRefreshEndpoints(t *testing.T) {
endpointsMap = map[string]Endpoint{}

ew := EndpointsWatcher{
ListEndpoints: listEndpoints,
RefreshInterval: 2,
existingEndpoints: map[string]Endpoint{},
}

mn := mockNotifier{}

addEndpoint(0)
ew.ListAndWatch(mn)

// Endpoints available before the ListAndWatch call should be
// readily discovered.
expected := map[string]Endpoint{"0": {ID: "0"}}
require.Equal(t, expected, ew.existingEndpoints)

addEndpoint(1)
addEndpoint(2)
removeEndpoint(0)

expected["1"] = Endpoint{ID: "1"}
expected["2"] = Endpoint{ID: "2"}
delete(expected, "0")

time.Sleep(1 * time.Second)

require.Equal(t, expected, ew.existingEndpoints)
ew.StopListAndWatch()
}

var endpointsMap map[string]Endpoint

func addEndpoint(n int) {
e := Endpoint{ID: strconv.Itoa(n)}
endpointsMap[e.ID] = e
}

func removeEndpoint(n int) {
delete(endpointsMap, strconv.Itoa(n))
}

func listEndpoints() []Endpoint {
endpoints := make([]Endpoint, 0)
for _, e := range endpointsMap {
endpoints = append(endpoints, e)
}
return endpoints
}

type mockNotifier struct {
}

var _ Notify = (*mockNotifier)(nil)

func (m mockNotifier) OnAdd([]Endpoint) {
}

func (m mockNotifier) OnRemove([]Endpoint) {
}

func (m mockNotifier) OnChange([]Endpoint) {
}

0 comments on commit daf3f9c

Please sign in to comment.