Skip to content

Commit

Permalink
Support multiple observer.Notify subscribers in observer.EndpointsWat…
Browse files Browse the repository at this point in the history
…cher (#12480)

Adding a feature - These changes update the observer.EndpointsWatcher to support multiple observer.Notify instances receiving Endpoint events where currently only one is allowed. This enables multiple receiver creator instances to use the same observer instances as well as* future components that are designed to receive events from observers.

These changes add a new observer.NotifyID type for Notify identification and update the EndpointsWatcher to keep an internal state for each subscribed Notify instance to account for arbitrary subscription times.
  • Loading branch information
rmfitzpatrick committed Aug 5, 2022
1 parent b0646a9 commit 9b2a885
Show file tree
Hide file tree
Showing 24 changed files with 624 additions and 135 deletions.
2 changes: 1 addition & 1 deletion extension/observer/dockerobserver/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newObserver(logger *zap.Logger, config *Config) (component.Extension, error
logger: logger, config: config,
once: &sync.Once{},
}
d.EndpointsWatcher = &observer.EndpointsWatcher{Endpointslister: d, RefreshInterval: time.Second}
d.EndpointsWatcher = observer.NewEndpointsWatcher(d, time.Second, logger)
return d, nil
}

Expand Down
1 change: 1 addition & 0 deletions extension/observer/dockerobserver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions extension/observer/dockerobserver/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ func stopObserver(t *testing.T, obvs *dockerObserver) {
assert.NoError(t, obvs.Shutdown(context.Background()))
}

var _ observer.Notify = (*mockNotifier)(nil)

type mockNotifier struct {
sync.Mutex
endpointsMap map[observer.EndpointID]observer.Endpoint
Expand All @@ -175,6 +177,10 @@ type mockNotifier struct {
changeCount int
}

func (m *mockNotifier) ID() observer.NotifyID {
return "mockNotifier"
}

func (m *mockNotifier) AddCount() int {
m.Lock()
defer m.Unlock()
Expand Down
5 changes: 1 addition & 4 deletions extension/observer/ecstaskobserver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ func createExtension(
e.Extension = extension{
ShutdownFunc: e.Shutdown,
}
e.EndpointsWatcher = &observer.EndpointsWatcher{
Endpointslister: e,
RefreshInterval: obsCfg.RefreshInterval,
}
e.EndpointsWatcher = observer.NewEndpointsWatcher(e, obsCfg.RefreshInterval, params.TelemetrySettings.Logger)

return e, nil
}
Expand Down
1 change: 1 addition & 0 deletions extension/observer/ecstaskobserver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions extension/observer/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package observer // import "github.com/open-telemetry/opentelemetry-collector-co
import (
"errors"
"fmt"
"reflect"
)

type (
Expand Down Expand Up @@ -83,6 +84,25 @@ func (e *Endpoint) String() string {
return fmt.Sprintf("Endpoint{ID: %v, Target: %v, Details: %T%+v}", e.ID, e.Target, e.Details, e.Details)
}

func (e Endpoint) equals(other Endpoint) bool {
switch {
case e.ID != other.ID:
return false
case e.Target != other.Target:
return false
case e.Details == nil && other.Details != nil:
return false
case other.Details == nil && e.Details != nil:
return false
case e.Details == nil && other.Details == nil:
return true
case e.Details.Type() != other.Details.Type():
return false
default:
return reflect.DeepEqual(e.Details.Env(), other.Details.Env())
}
}

// Pod is a discovered k8s pod.
type Pod struct {
// Name of the pod.
Expand Down
177 changes: 177 additions & 0 deletions extension/observer/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package observer
import (
"reflect"
"testing"

"github.com/stretchr/testify/require"
)

func TestEndpointEnv(t *testing.T) {
Expand Down Expand Up @@ -223,3 +225,178 @@ func TestEndpointEnv(t *testing.T) {
})
}
}

func TestEndpointEquals(t *testing.T) {
tests := []struct {
name string
first Endpoint
second Endpoint
areEqual bool
}{
{
name: "equal empty endpoints",
first: Endpoint{}, second: Endpoint{},
areEqual: true,
},
{
name: "equal ID",
first: Endpoint{ID: "id"},
second: Endpoint{ID: "id"},
areEqual: true,
},
{
name: "unequal ID",
first: Endpoint{ID: "first"},
second: Endpoint{ID: "second"},
areEqual: false,
},
{
name: "equal Target",
first: Endpoint{Target: "target"},
second: Endpoint{Target: "target"},
areEqual: true,
},
{
name: "unequal Target",
first: Endpoint{Target: "first"},
second: Endpoint{Target: "second"},
areEqual: false,
},
{
name: "equal empty Port",
first: Endpoint{Details: &Port{}},
second: Endpoint{Details: &Port{}},
areEqual: true,
},
{
name: "equal Port Name",
first: Endpoint{Details: &Port{Name: "port_name"}},
second: Endpoint{Details: &Port{Name: "port_name"}},
areEqual: true,
},
{
name: "unequal Port Name",
first: Endpoint{Details: &Port{Name: "first"}},
second: Endpoint{Details: &Port{Name: "second"}},
areEqual: false,
},
{
name: "equal Port Port",
first: Endpoint{Details: &Port{Port: 2379}},
second: Endpoint{Details: &Port{Port: 2379}},
areEqual: true,
},
{
name: "unequal Port Port",
first: Endpoint{Details: &Port{Port: 0}},
second: Endpoint{Details: &Port{Port: 1}},
areEqual: false,
},
{
name: "equal Port Transport",
first: Endpoint{Details: &Port{Transport: "transport"}},
second: Endpoint{Details: &Port{Transport: "transport"}},
areEqual: true,
},
{
name: "unequal Port Transport",
first: Endpoint{Details: &Port{Transport: "first"}},
second: Endpoint{Details: &Port{Transport: "second"}},
areEqual: false,
},
{
name: "equal Port",
first: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
second: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"label_key": "label_val",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
areEqual: true,
},
{
name: "unequal Port Pod Label",
first: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"key_one": "val_one",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
second: Endpoint{
ID: EndpointID("port_id"),
Target: "192.68.73.2",
Details: &Port{
Name: "port_name",
Pod: Pod{
Name: "pod_name",
Labels: map[string]string{
"key_two": "val_two",
},
Annotations: map[string]string{
"annotation_1": "value_1",
},
Namespace: "pod-namespace",
UID: "pod-uid",
},
Port: 2379,
Transport: ProtocolTCP,
},
},
areEqual: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.first.equals(tt.second), tt.areEqual)
require.Equal(t, tt.second.equals(tt.first), tt.areEqual)
})
}
}
Loading

0 comments on commit 9b2a885

Please sign in to comment.