Skip to content

Commit

Permalink
PR feedback, renamed resolver and removed port from config
Browse files Browse the repository at this point in the history
  • Loading branch information
snuggie12 committed Feb 27, 2024
1 parent 0beac79 commit b2f3a19
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 145 deletions.
29 changes: 20 additions & 9 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Note that either the Trace ID or Service name is used for the decision on which

This load balancer is especially useful for backends configured with tail-based samplers or red-metrics-collectors, which make a decision based on the view of the full trace.

When a list of backends is updated, some of the signals will be rerouted to different backends.
When a list of backends is updated, some of the signals will be rerouted to different backends.
Around R/N of the "routes" will be rerouted differently, where:

* A "route" is either a trace ID or a service name mapped to a certain backend.
Expand Down Expand Up @@ -71,12 +71,11 @@ Refer to [config.yaml](./testdata/config.yaml) for detailed examples on using th
* `service` Kubernetes service to resolve, e.g. `lb-svc.lb-ns`. If no namespace is specified, an attempt will be made to infer the namespace for this collector, and if this fails it will fall back to the `default` namespace.
* `ports` port to be used for exporting the traces to the addresses resolved from `service`. If `ports` is not specified, the default port 4317 is used. When multiple ports are specified, two backends are added to the load balancer as if they were at different pods.
* The `routing_key` property is used to route spans to exporters based on different parameters. This functionality is currently enabled only for `trace` pipeline types. It supports one of the following values:
* `service`: exports spans based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `service`: exports spans based on their service name. This is useful when using processors like the span metrics, so all spans for each service are sent to consistent collector instances for metric collection. Otherwise, metrics for the same services are sent to different collectors, making aggregations inaccurate.
* `traceID` (default): exports spans based on their `traceID`.
* If not configured, defaults to `traceID` based routing.
* The `srv` node accepts the following optional properties:
* The `dnssrvnoa` node accepts the following optional properties:
* `hostname` DNS SRV hostname to resolve.
* `port` port to be used for exporting the traces to the IP addresses resolved from `hostname`. If `port` is not specified, the default port 4317 is used.
* `interval` resolver interval in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `5s` will be used.
* `timeout` resolver timeout in go-Duration format, e.g. `5s`, `1d`, `30m`. If not specified, `1s` will be used.

Expand Down Expand Up @@ -105,9 +104,9 @@ exporters:
- backend-2:4317
- backend-3:4317
- backend-4:4317
# Notice to config a headless service DNS in Kubernetes
# Notice to config a headless service DNS in Kubernetes
# dns:
# hostname: otelcol-headless.observability.svc.cluster.local
# hostname: otelcol-headless.observability.svc.cluster.local

service:
pipelines:
Expand Down Expand Up @@ -167,8 +166,21 @@ service:
- loadbalancing
```

The SRV Resolver is useful in situations when you want to return hostnames instead of IPs for endpoints. An example would be a `StatefulSet`-backed headless kubernetes `Service` with istio. Example:
The DNSSRVNOA Resolver is useful in situations when you want to return hostnames instead of IPs for endpoints. An example would be a `StatefulSet`-backed headless kubernetes `Service` with istio.

The format for the name of an SRV record is `_service._proto.name` such as `_ldap._tcp.example.com`. The full record contains:

| _service._proto.name | TTL | Class | SRV(type) | Priority | Weight | Port | Target |
|----------------------|-----|-------|-----------|----------|--------|------|--------|
| _otlp._tcp.otel-collector.example.com | 900 | IN | SRV | 10 | 5 | 4317 | otel-collector.example.com |

Note that we do not define a port in the config since the port is provided by the record. The target must be either an A or AAAA record. For more information see https://www.ietf.org/rfc/rfc2782.txt

> [!IMPORTANT]
> Currently priority and weight are not supported features. Additionally, all targets should map to a single IP address.

Example Config:
```yaml
receivers:
otlp:
Expand All @@ -183,9 +195,8 @@ exporters:
protocol:
otlp: {}
resolver:
srv:
dnssrvnoa:
hostname: _<svc-port-name>._<svc-port-protocol>.<svc-name>.<svc-namespace>.svc.cluster.local
routing_key: traceID

service:
pipelines:
Expand Down
15 changes: 7 additions & 8 deletions exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type Protocol struct {

// ResolverSettings defines the configurations for the backend resolver
type ResolverSettings struct {
Static *StaticResolver `mapstructure:"static"`
DNS *DNSResolver `mapstructure:"dns"`
K8sSvc *K8sSvcResolver `mapstructure:"k8s"`
SRV *SRVResolver `mapstructure:"srv"`
Static *StaticResolver `mapstructure:"static"`
DNS *DNSResolver `mapstructure:"dns"`
K8sSvc *K8sSvcResolver `mapstructure:"k8s"`
DNSSRVNOA *DNSSRVNOAResolver `mapstructure:"srv"`
}

// StaticResolver defines the configuration for the resolver providing a fixed list of backends
Expand All @@ -57,11 +57,10 @@ type K8sSvcResolver struct {
Ports []int32 `mapstructure:"ports"`
}

// TODO: Should a common struct be used for dns-based resolvers?
// SRVResolver defines the configuration for the DNS resolver of SRV records for headless Services
type SRVResolver struct {
// TODO: Make a common struct to be used for dns-based resolvers
// DNSSRVResolver defines the configuration for the DNS resolver of SRV records for headless Services
type DNSSRVNOAResolver struct {
Hostname string `mapstructure:"hostname"`
Port string `mapstructure:"port"`
Interval time.Duration `mapstructure:"interval"`
Timeout time.Duration `mapstructure:"timeout"`
}
6 changes: 3 additions & 3 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
return nil, err
}
}
if oCfg.Resolver.SRV != nil {
srvLogger := params.Logger.With(zap.String("resolver", "DNS SRV"))
if oCfg.Resolver.DNSSRVNOA != nil {
dnssrvnoaLogger := params.Logger.With(zap.String("resolver", "dnssrvnoa"))

var err error
res, err = newSRVResolver(srvLogger, oCfg.Resolver.SRV.Hostname, oCfg.Resolver.SRV.Port, oCfg.Resolver.SRV.Interval, oCfg.Resolver.SRV.Timeout)
res, err = newDNSSRVNOAResolver(dnssrvnoaLogger, oCfg.Resolver.DNSSRVNOA.Hostname, oCfg.Resolver.DNSSRVNOA.Interval, oCfg.Resolver.DNSSRVNOA.Timeout)
if err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -18,33 +19,33 @@ import (
"golang.org/x/exp/slices"
)

// TODO: What is this?
var _ resolver = (*srvResolver)(nil)
var _ resolver = (*dnssrvnoaResolver)(nil)

// TODO: Should these be moved to somethingl ike resolver_common.go?
// const (
// defaultResInterval = 5 * time.Second
// defaultResTimeout = time.Second
// )
/*
TODO: These are from resolver_dns.go, but are used here. We should move them to a common place
const (
defaultResInterval = 5 * time.Second
defaultResTimeout = time.Second
)
*/

var (
errNoSRV = errors.New("no SRV record found")
errBadSRV = errors.New("SRV hostname must be in the form of _service._proto.name")
errNotSingleIP = errors.New("underlying A record must return a single IP address")

srvResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "srv")
dnssrvnoaResolverMutator = tag.Upsert(tag.MustNewKey("resolver"), "dnssrvnoa")

srvResolverSuccessTrueMutators = []tag.Mutator{srvResolverMutator, successTrueMutator}
srvResolverSuccessFalseMutators = []tag.Mutator{srvResolverMutator, successFalseMutator}
dnssrvnoaResolverSuccessTrueMutators = []tag.Mutator{dnssrvnoaResolverMutator, successTrueMutator}
dnssrvnoaResolverSuccessFalseMutators = []tag.Mutator{dnssrvnoaResolverMutator, successFalseMutator}
)

type srvResolver struct {
type dnssrvnoaResolver struct {
logger *zap.Logger

srvService string
srvProto string
srvName string
port string
resolver multiResolver
resInterval time.Duration
resTimeout time.Duration
Expand All @@ -64,7 +65,7 @@ type multiResolver interface {
LookupSRV(ctx context.Context, service, proto, name string) (cname string, addrs []*net.SRV, err error)
}

func newSRVResolver(logger *zap.Logger, srvHostname string, port string, interval time.Duration, timeout time.Duration) (*srvResolver, error) {
func newDNSSRVNOAResolver(logger *zap.Logger, srvHostname string, interval time.Duration, timeout time.Duration) (*dnssrvnoaResolver, error) {
if len(srvHostname) == 0 {
return nil, errNoSRV
}
Expand All @@ -75,38 +76,40 @@ func newSRVResolver(logger *zap.Logger, srvHostname string, port string, interva
timeout = defaultResTimeout
}

service, proto, name, err := parseSRVHostname(srvHostname)
parsedSRVHostname, err := parseSRVHostname(srvHostname)
if err != nil {
logger.Warn("failed to parse SRV hostname", zap.Error(err))
logger.Warn("failed to parse SRV hostname")
return nil, err
}

return &srvResolver{
return &dnssrvnoaResolver{
logger: logger,
srvService: service,
srvProto: proto,
srvName: name,
port: port,
srvService: parsedSRVHostname.service,
srvProto: parsedSRVHostname.proto,
srvName: parsedSRVHostname.name,
resolver: &net.Resolver{},
resInterval: interval,
resTimeout: timeout,
stopCh: make(chan struct{}),
}, nil
}

func (r *srvResolver) start(ctx context.Context) error {
func (r *dnssrvnoaResolver) start(ctx context.Context) error {
if _, err := r.resolve(ctx); err != nil {
r.logger.Warn("failed to resolve", zap.Error(err))
}

go r.periodicallyResolve()

r.logger.Debug("SRV resolver started",
zap.String("SRV name", r.srvName), zap.String("port", r.port),
zap.Duration("interval", r.resInterval), zap.Duration("timeout", r.resTimeout))
zap.String("SRV name", r.srvName),
zap.Duration("interval", r.resInterval),
zap.Duration("timeout", r.resTimeout),
)
return nil
}

func (r *srvResolver) shutdown(_ context.Context) error {
func (r *dnssrvnoaResolver) shutdown(_ context.Context) error {
r.changeCallbackLock.Lock()
r.onChangeCallbacks = nil
r.changeCallbackLock.Unlock()
Expand All @@ -116,7 +119,7 @@ func (r *srvResolver) shutdown(_ context.Context) error {
return nil
}

func (r *srvResolver) periodicallyResolve() {
func (r *dnssrvnoaResolver) periodicallyResolve() {
ticker := time.NewTicker(r.resInterval)

for {
Expand All @@ -135,42 +138,43 @@ func (r *srvResolver) periodicallyResolve() {
}
}

func (r *srvResolver) resolve(ctx context.Context) ([]string, error) {
func (r *dnssrvnoaResolver) resolve(ctx context.Context) ([]string, error) {
r.shutdownWg.Add(1)
defer r.shutdownWg.Done()

_, srvs, err := r.resolver.LookupSRV(ctx, r.srvService, r.srvProto, r.srvName)
if err != nil {
_ = stats.RecordWithTags(ctx, srvResolverSuccessFalseMutators, mNumResolutions.M(1))
_ = stats.RecordWithTags(ctx, dnssrvnoaResolverSuccessFalseMutators, mNumResolutions.M(1))
return nil, err
}

_ = stats.RecordWithTags(ctx, srvResolverSuccessTrueMutators, mNumResolutions.M(1))
_ = stats.RecordWithTags(ctx, dnssrvnoaResolverSuccessTrueMutators, mNumResolutions.M(1))

// backendsWithInfo stores the port and later the IP addresses for comparison
backendsWithInfo := make(map[string]string)

// backendsWithIPs tracks the IP addresses for changes
backendsWithIPs := make(map[string]string)
for _, srv := range srvs {
target := strings.TrimSuffix(srv.Target, ".")
backendsWithIPs[target] = ""
port := strconv.FormatUint(uint64(srv.Port), 10)
backendsWithInfo[target] = port
}
// backends is what we use to compare against the current endpoints
var backends []string

// freshBackends is used to compare against the existance of endpoints and if the IPs have changed
var freshBackends []string

// Lookup the IP addresses for the A records
for aRec := range backendsWithIPs {
for aRec, port := range backendsWithInfo {

// handle backends first
backend := aRec
// if a port is specified in the configuration, add it
if r.port != "" {
backend = fmt.Sprintf("%s:%s", backend, r.port)
}
backend := fmt.Sprintf("%s:%s", aRec, port)
backends = append(backends, backend)

ips, err := r.resolver.LookupIPAddr(ctx, aRec)
// Return the A record. If we can't resolve them, we'll try again next iteration
if err != nil {
_ = stats.RecordWithTags(ctx, srvResolverSuccessFalseMutators, mNumResolutions.M(1))
_ = stats.RecordWithTags(ctx, dnssrvnoaResolverSuccessFalseMutators, mNumResolutions.M(1))
continue
}
// A headless Service SRV target only returns 1 IP address for its A record
Expand All @@ -180,22 +184,19 @@ func (r *srvResolver) resolve(ctx context.Context) ([]string, error) {

ip := ips[0]
if ip.IP.To4() != nil {
backendsWithIPs[aRec] = ip.String()
backendsWithInfo[aRec] = ip.String()
} else {
// it's an IPv6 address
backendsWithIPs[aRec] = fmt.Sprintf("[%s]", ip.String())
backendsWithInfo[aRec] = fmt.Sprintf("[%s]", ip.String())
}
}

var freshBackends []string
for endpoint := range backendsWithIPs {
// If the old map doesn't have the endpoint, it's fresh
if _, ok := r.endpointsWithIPs[endpoint]; !ok {
freshBackends = append(freshBackends, endpoint)
if _, ok := r.endpointsWithIPs[aRec]; !ok {
freshBackends = append(freshBackends, backend)
// If the old map has the endpoint and IPs match it's still fresh
// Else freshBackends will be smaller and used later during callbacks
} else if backendsWithIPs[endpoint] == r.endpointsWithIPs[endpoint] {
freshBackends = append(freshBackends, endpoint)
} else if backendsWithInfo[aRec] == r.endpointsWithIPs[aRec] {
freshBackends = append(freshBackends, backend)
}
}

Expand All @@ -210,12 +211,11 @@ func (r *srvResolver) resolve(ctx context.Context) ([]string, error) {

// the list has changed!
r.updateLock.Lock()
r.logger.Debug("Updating endpoints", zap.Strings("new endpoints", backends))
r.logger.Debug("Endpoints with IPs", zap.Any("old", r.endpointsWithIPs), zap.Any("new", backendsWithIPs))
r.logger.Debug("Updating endpoints", zap.Strings("new endpoints", backends), zap.Any("old endpoints with IPs", r.endpointsWithIPs), zap.Any("new endpoints with IPs", backendsWithInfo))
r.endpoints = backends
r.endpointsWithIPs = backendsWithIPs
r.endpointsWithIPs = backendsWithInfo
r.updateLock.Unlock()
_ = stats.RecordWithTags(ctx, srvResolverSuccessTrueMutators, mNumBackends.M(int64(len(backends))))
_ = stats.RecordWithTags(ctx, dnssrvnoaResolverSuccessTrueMutators, mNumBackends.M(int64(len(backends))))

// propagate the change
r.changeCallbackLock.RLock()
Expand All @@ -232,21 +232,31 @@ func (r *srvResolver) resolve(ctx context.Context) ([]string, error) {
return r.endpoints, nil
}

func (r *srvResolver) onChange(f func([]string)) {
func (r *dnssrvnoaResolver) onChange(f func([]string)) {
r.changeCallbackLock.Lock()
defer r.changeCallbackLock.Unlock()
r.onChangeCallbacks = append(r.onChangeCallbacks, f)
}

func parseSRVHostname(srvHostname string) (service string, proto string, name string, err error) {
type parsedSRVHostname struct {
service string
proto string
name string
}

func parseSRVHostname(srvHostname string) (result *parsedSRVHostname, err error) {
parts := strings.Split(srvHostname, ".")
if len(parts) < 3 {
return "", "", "", errBadSRV
return nil, errBadSRV
}

service = strings.TrimPrefix(parts[0], "_")
proto = strings.TrimPrefix(parts[1], "_")
name = strings.Join(parts[2:], ".")
service := strings.TrimPrefix(parts[0], "_")
proto := strings.TrimPrefix(parts[1], "_")
name := strings.Join(parts[2:], ".")

return service, proto, name, nil
return &parsedSRVHostname{
service: service,
proto: proto,
name: name,
}, nil
}

0 comments on commit b2f3a19

Please sign in to comment.