Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/loadbalancingexporter] support k8s service resolver #22776

Merged
merged 1 commit into from
Jul 26, 2023

Conversation

fyuan1316
Copy link
Contributor

@fyuan1316 fyuan1316 commented May 25, 2023

Description:

Add k8s service resolver for exporter/loadbalancingexporter

The exporter/loadbalancingexporter component currently supports both static and dns resolvers, and does not currently support the ability to load balance pods when the collector application is running in a kubernetes environment. (Backends address discovery is achieved by monitoring kubernetes endpoints resources). This pr provides that capability.

Link to tracking Issue:
suitable for scenarios where services are located in a k8s cluster #18412

Testing:

Documentation:

@fyuan1316 fyuan1316 requested review from a team and jpkrohling as code owners May 25, 2023 10:29
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented May 25, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

  • ✅ login: fyuan1316 / name: fyuan1316 (d201b41)

@fyuan1316
Copy link
Contributor Author

@codeboten Hi 😄 I submitted a pull request a few days ago and have not heard back from you, do you need more information or have another request? Thank you for your time!

@github-actions
Copy link
Contributor

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Jun 14, 2023
@fyuan1316
Copy link
Contributor Author

@jpkrohling
I would like to know if it is feasible to add a k8s resolver? If there are other better ways to support this scenario, I'd love to learn more.
I'm new here, any feedback and guidance would be greatly appreciated.

@github-actions github-actions bot removed the Stale label Jun 15, 2023
Copy link
Contributor

@codeboten codeboten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @fyuan1316, pinging @jpkrohling as the code owner to review the code

Copy link
Member

@jpkrohling jpkrohling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks really cool, thank you! There are a few comments, and I couldn't test it manually as I couldn't build this due to:

go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
go test -race -timeout 300s -parallel 4 --tags="" ./...
go: open /home/jpkroehling/Projects/github.com/open-telemetry/opentelemetry-collector-contrib/connector/routingconnector/go.mod: no such file or directory
make: *** [Makefile.Common:98: test] Error 1

I'll give it another try once you rebase this.

exporter/loadbalancingexporter/loadbalancer_test.go Outdated Show resolved Hide resolved
var tests = []struct {
name string
cfg *Config
want *loadBalancerImp
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove this one

},
},
},
want: nil,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If both tests are using the same value(nil), you don't need it here.

t.Run(tt.name, func(t *testing.T) {
p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), tt.cfg, nil)

if tt.wantErr != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace the if/else here with:

assert.Equal(t, tt.wantErr)

case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
default: // unsupported
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this happens, you definitely want to know about. Log a warn here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done ( log warn in unsupported branch. )

}
h.logger.Debug("onAdd check", zap.Bool("changed", changed))
if changed {
h.callback(context.TODO())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either a Background(), or something with a timeout. Same on other similar places.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

endpoints := convertToEndpoints(newEps)
changed := false
for _, ep := range endpoints {
if _, loaded := h.endpoints.LoadOrStore(ep, ep); !loaded {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of storing a map[string]string, you could store a map[string]bool, and set this value here to true. You'd save some space with that, while still having the semantics you need.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -66,6 +66,12 @@ exporters:
- backend-2:4317
- backend-3:4317
- backend-4:4317
## use k8s service resolver, if collector runs in kubernetes environment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new resolver deserves more information than that. There are other parts of the documentation that needs to be changed, especially line 31.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true
}, time.Second, 20*time.Millisecond)

// step-3 delete, simulate deletion of backends
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to have this on its own test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't get your point here, you mean a separate test method for the delete operation?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant that the step 3 should be on its own test function: instead of having one test doing everything, have smaller tests exercising smaller units of your code.

@fyuan1316
Copy link
Contributor Author

@jpkrohling
Thank you very much for your guidance! 😄
I've just rebase the code. Please help revisit it.
I haven't come up with a better way to adjust the output for user-specific error messages. Do you have any good suggestion?

Copy link
Member

@jpkrohling jpkrohling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking good, I had just a couple more comments.

@@ -28,13 +28,16 @@ This also supports service name based exporting for traces. If you have two or m
Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using the processor.

* The `otlp` property configures the template used for building the OTLP exporter. Refer to the OTLP Exporter documentation for information on which options are available. Note that the `endpoint` property should not be set and will be overridden by this exporter with the backend endpoint.
* The `resolver` accepts either a `static` node, or a `dns`. If both are specified, `dns` takes precedence.
* The `resolver` accepts a `static` node, a `dns` or a `k8s` node. If all three are specified, `k8s` takes precedence.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The `resolver` accepts a `static` node, a `dns` or a `k8s` node. If all three are specified, `k8s` takes precedence.
* The `resolver` accepts a `static` node, a `dns` or a `k8s` service. If all three are specified, `k8s` takes precedence.

* The `hostname` property inside a `dns` node specifies the hostname to query in order to obtain the list of IP addresses.
* The `dns` node also accepts the following optional properties:
* `hostname` DNS hostname to resolve.
* `port` port to be used for exporting the traces to the IP addresses resolved from `hostname`. If `port` is not specified, the default port 4317 is used.
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.
* The `k8s` node accepts the following optional properties:
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.

@@ -83,6 +86,49 @@ service:
- loadbalancing
```

K8s resolver example
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
K8s resolver example
Kubernetes resolver example


var (
errNoSvc = errors.New("no service specified to resolve the backends")
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s service")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use k8s here, as the other ones use the config key as the resolver name.

Suggested change
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s service")
k8sResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "k8s")

if len(nAddr) > 1 {
namespace = nAddr[1]
} else {
logger.Info("no namespace was provided, introspection firstly")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.Info("no namespace was provided, introspection firstly")
logger.Info("the namespace for the Kubernetes service wasn't provided, trying to determine the current namespace", zap.String("name", name))

case *corev1.Endpoints:
endpoints = convertToEndpoints(object)
default: // unsupported
h.logger.Warn("onAdd unable to handle object", zap.Any("obj", obj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be best to record the object's type, rather than the object itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The message could also be more user-focused: Got an unexpected Kubernetes data type during the inclusion of a new pods for the service (and add the service).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that's much better, will adjust accordingly.

h.callback(context.Background())
}
default: // unsupported
h.logger.Warn("onUpdate unable to handle object", zap.Any("obj", oldObj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
h.logger.Warn("onUpdate unable to handle object", zap.Any("obj", oldObj))
h.logger.Warn("Got an unexpected Kubernetes data type during the update of the pods for a service", zap.Any("obj", oldObj))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot, this makes the meaning very clear.

endpoints = convertToEndpoints(object)
}
default: // unsupported
h.logger.Warn("onDelete unable to handle object", zap.Any("obj", obj))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here: improve the message, perhaps based on the previous suggestions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true
}, time.Second, 20*time.Millisecond)

// step-3 delete, simulate deletion of backends
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant that the step 3 should be on its own test function: instead of having one test doing everything, have smaller tests exercising smaller units of your code.

@JaredTan95
Copy link
Member

@fyuan1316 hi, after resolving conflicts, I think we can push this PR to get merged. :-P

JaredTan95 added a commit to openinsight-proj/opentelemetry-collector-contrib that referenced this pull request Jul 7, 2023
refs:open-telemetry#22776

Signed-off-by: Jared Tan <jian.tan@daocloud.io>
@github-actions github-actions bot added the cmd/configschema configschema command label Jul 7, 2023
@github-actions github-actions bot requested a review from pmcollins July 7, 2023 16:15
Copy link
Contributor

@codeboten codeboten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I resolved the go.mod conflicts, @jpkrohling can you resolve the conversations if they've been addressed and give this another reivew

Copy link
Contributor

@codeboten codeboten left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a changelog for the enhancement

JaredTan95 added a commit to openinsight-proj/opentelemetry-collector-contrib that referenced this pull request Jul 9, 2023
refs:open-telemetry#22776

Signed-off-by: Jared Tan <jian.tan@daocloud.io>
@fyuan1316
Copy link
Contributor Author

Looks good, just a few small things that shouldn't block this PR. If you prefer to complete the pending items on a follow-up PR, that's fine by me, but please let me know the issue number tracking those items.

I tested this with the following config:

apiVersion: v1
kind: Namespace
metadata:
  name: observability
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: loadbalancer-role
rules:
- apiGroups:
  - ""
  resources:
  - endpoints
  verbs:
  - list
  - watch
  - get
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: loadbalancer
  namespace: observability
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: loadbalancer-rolebinding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: loadbalancer-role
subjects:
- kind: ServiceAccount
  name: loadbalancer
  namespace: observability
---
apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
  name: loadbalancer
  namespace: observability
spec:
  image: docker.io/jpkroehling/otelcol-with-k8sresolver:latest
  serviceAccount: loadbalancer
  config: |
    receivers:
      otlp:
        protocols:
          grpc:

    processors:

    exporters:
      loadbalancing:
        protocol:
          otlp:
            tls:
              insecure: true
        resolver:
          k8s:
            service: backends-collector-headless.observability

    service:
      pipelines:
        traces:
          receivers:
            - otlp
          processors: []
          exporters:
            - loadbalancing
---
apiVersion: opentelemetry.io/v1alpha1
kind: OpenTelemetryCollector
metadata:
  name: backends
  namespace: observability
spec:
  replicas: 5
  config: |
    receivers:
      otlp:
        protocols:
          grpc:

    processors:

    exporters:
      logging:

    service:
      pipelines:
        traces:
          receivers:
            - otlp
          processors: []
          exporters:
            - logging

I think this configuration is short and clear, and it states precisely the minimum requirements to run the k8s service resolver.

Can we put it in the readme as a documentation description?

@JaredTan95
Copy link
Member

JaredTan95 commented Jul 14, 2023

I think this configuration is short and clear, and it states precisely the minimum requirements to run the k8s service resolver.

Can we put it in the readme as a documentation description?

+1 for me, I used this way to test in my environment before, so I'm in favor of using those CR to describe it (maybe in the next separate PR).

@fyuan1316
Copy link
Contributor Author

I created a new issue to follow up on document change #24287 .
can we merge this pr. Thanks again for the help!

// Check whether the namespace file exists.
// If not, we are not running in cluster so can't guess the namespace.
if _, err := os.Stat(inClusterNamespacePath); os.IsNotExist(err) {
return "", fmt.Errorf("not running in-cluster, please specify LeaderElectionNamespace")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeaderElectionNamespace?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, maybe we just need to use namespaces and "LeaderElectionNamespace" has no role in this context.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

endpoints = convertToEndpoints(object)
default: // unsupported
h.logger.Warn("Got an unexpected Kubernetes data type during the inclusion of a new pods for the service", zap.Any("obj", obj))
_ = stats.RecordWithTags(context.Background(), k8sResolverSuccessFalseMutators, mNumResolutions.M(1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, create an issue to track this. Note that the same pattern repeats elsewhere.

@fyuan1316
Copy link
Contributor Author

Hi @jpkrohling :
Adjustments to LeaderElectionNamespace have been completed.

There are currently two follow-up issues:
#24287 (docs/example)
#24365 (improve metrics of unsupported k8s runtime object for k8s service resolver )

PTAL

@jpkrohling
Copy link
Member

It looks like the CI is still failing:

diff --git a/exporter/loadbalancingexporter/resolver_k8s_handler.go b/exporter/loadbalancingexporter/resolver_k8s_handler.go
index 61b3603..b83e0f8 100644
--- a/exporter/loadbalancingexporter/resolver_k8s_handler.go
+++ b/exporter/loadbalancingexporter/resolver_k8s_handler.go
@@ -1,7 +1,7 @@
 // Copyright The OpenTelemetry Authors
 // SPDX-License-Identifier: Apache-2.0
 
-package loadbalancingexporter
+package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"
 
 import (
 	"context"
Porto links are out of date, please run "make goporto" and commit the changes in this PR.

@github-actions github-actions bot added the cmd/configschema configschema command label Jul 20, 2023
@fyuan1316 fyuan1316 force-pushed the k8s-resolver branch 2 times, most recently from 0918ff2 to c9e78df Compare July 20, 2023 02:34
@fyuan1316
Copy link
Contributor Author

Thanks ! @jpkrohling
Running make -j2 golint GROUP=other and make gotest GROUP=exporter locally succeeds.

@jpkrohling
Copy link
Member

The tests are failing:

--- FAIL: TestNewLoadBalancerInvalidK8sResolver (0.00s)
    loadbalancer_test.go:81: 
        	Error Trace:	/home/runner/work/opentelemetry-collector-contrib/opentelemetry-collector-contrib/exporter/loadbalancingexporter/loadbalancer_test.go:81
        	Error:      	Not equal: 
        	            	expected: *errors.errorString(&errors.errorString{s:"no service specified to resolve the backends"})
        	            	actual  : clientcmd.errConfigurationInvalid(clientcmd.errConfigurationInvalid{(*clientcmd.errEmptyConfig)(0xc000069cc0)})
        	Test:       	TestNewLoadBalancerInvalidK8sResolver
FAIL
FAIL	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	0.409s
?   	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata	[no test files]
FAIL
make[2]: *** [../../Makefile.Common:99: test] Error 1
make[1]: *** [Makefile:176: exporter/loadbalancingexporter] Error 2
make: *** [Makefile:100: gotest] Error 2

@fyuan1316
Copy link
Contributor Author

The tests are failing:

--- FAIL: TestNewLoadBalancerInvalidK8sResolver (0.00s)
    loadbalancer_test.go:81: 
        	Error Trace:	/home/runner/work/opentelemetry-collector-contrib/opentelemetry-collector-contrib/exporter/loadbalancingexporter/loadbalancer_test.go:81
        	Error:      	Not equal: 
        	            	expected: *errors.errorString(&errors.errorString{s:"no service specified to resolve the backends"})
        	            	actual  : clientcmd.errConfigurationInvalid(clientcmd.errConfigurationInvalid{(*clientcmd.errEmptyConfig)(0xc000069cc0)})
        	Test:       	TestNewLoadBalancerInvalidK8sResolver
FAIL
FAIL	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter	0.409s
?   	github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata	[no test files]
FAIL
make[2]: *** [../../Makefile.Common:99: test] Error 1
make[1]: *** [Makefile:176: exporter/loadbalancingexporter] Error 2
make: *** [Makefile:100: gotest] Error 2

Sorry, there's still an error. It looks like the difference between local configuration (with kubeconfig configured) and ci not configured, leads to different expected results.

I would prefer a simple solution to this problem as follows:

p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, nil)
assert.Nil(t, p)

assert.Equal(t, errNoSvc, err)

modified to

p, err := newLoadBalancer(exportertest.NewNopCreateSettings(), cfg, nil)
assert.Nil(t, p)

assert.True(t, errors.Is(err, clientcmd.ErrEmptyConfig) || errors.Is(err, errNoSvc)) 

What do you think? @jpkrohling

@jpkrohling
Copy link
Member

I believe there's a assert.ErrorAs that could be used.

@fyuan1316
Copy link
Contributor Author

fyuan1316 commented Jul 21, 2023

I believe there's a assert.ErrorAs that could be used.

Since clientcmd.errConfigurationInvalid was not exported and errNoSvc is not a struct, my attempts at assert.ErrorAs were unsuccessful.
I suggest the following tweak, and have also updated the pr accordingly:

assert.True(t, clientcmd.IsConfigurationInvalid(err) || errors.Is(err, errNoSvc))

@fyuan1316
Copy link
Contributor Author

My bad, looks like the previous modification got lost. I'll update it later after I test it locally. By the way, I'm curious to know if I can trigger the CI pipeline to execute by myself, for example, is there a label way to do it?

Signed-off-by: Yuan Fang <yuanfang@alauda.io>
@fyuan1316
Copy link
Contributor Author

Tested locally and passed, hopefully this time it will work.

make -j2 golint GROUP=other
make -j2 goporto
make gotest-with-cover GROUP=other
make gotest GROUP=other

@jpkrohling
Copy link
Member

I'm curious to know if I can trigger the CI pipeline to execute by myself, for example, is there a label way to do it?

It only requires approval for the first contribution. Once this is merged, your next contributions won't require an approval to be executed.

@fyuan1316
Copy link
Contributor Author

I'm curious to know if I can trigger the CI pipeline to execute by myself, for example, is there a label way to do it?

It only requires approval for the first contribution. Once this is merged, your next contributions won't require an approval to be executed.

Got it, thanks.
I noticed that there are 2 non-required check fails, not sure if they are pr related or not, does this affect merge?

@fyuan1316
Copy link
Contributor Author

@jpkrohling Please take a look. Is it possible to merge now?

@jpkrohling jpkrohling merged commit be04baa into open-telemetry:main Jul 26, 2023
87 of 89 checks passed
@github-actions github-actions bot added this to the next release milestone Jul 26, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cmd/configschema configschema command cmd/otelcontribcol otelcontribcol command exporter/loadbalancing
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants