Skip to content

Commit

Permalink
dst: Update GetProfile's stream when pod associated to HostPort loo…
Browse files Browse the repository at this point in the history
…kup changes (#11334)

Followup to #11328

Implements a new pod watcher, instantiated along the other ones in the Destination server. It also watches on Servers and carries all the logic from ServerWatcher, which has now been decommissioned.

The `CreateAddress()` function has been moved into a function of the PodWatcher, because now we're calling it on every update given the pod associated to an ip:port might change and we need to regenerate the Address object. That function also takes care of capturing opaque protocol info from associated Servers, which is not new and had some logic that was duped in the now defunct ServerWatcher. `getAnnotatedOpaquePorts()` got also moved for similar reasons.

Other things to note about PodWatcher:

- It publishes a new pair of metrics `ip_port_subscribers` and `ip_port_updates` leveraging the framework in `prometheus.go`.
- The complexity in `updatePod()` is due to only send stream updates when there are changes in the pod's readiness, to avoid sending duped messages on every pod lifecycle event.
- 
Finally, endpointProfileTranslator's `endpoint` (*pb.WeightedAddr) not being a static object anymore, the `Update()` function now receives an Address that allows it to rebuild the endpoint on the fly (and so `createEndpoint()` was converted into a method of endpointProfileTranslator).
  • Loading branch information
alpeb committed Sep 28, 2023
1 parent 0244282 commit 65ddba4
Show file tree
Hide file tree
Showing 10 changed files with 944 additions and 618 deletions.
114 changes: 78 additions & 36 deletions controller/api/destination/endpoint_profile_translator.go
@@ -1,65 +1,107 @@
package destination

import (
"fmt"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
log "github.com/sirupsen/logrus"
)

type endpointProfileTranslator struct {
pod *v1.Pod
port uint32
endpoint *pb.WeightedAddr
stream pb.Destination_GetProfileServer
log *logrus.Entry
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
stream pb.Destination_GetProfileServer
lastMessage string

k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
log *log.Entry
}

// newEndpointProfileTranslator translates protocol updates to
// DestinationProfiles for endpoints. When a Server on the cluster is updated
// it is possible that it selects an endpoint that is being watched, if that
// is the case then an update will be sent to the client if the Server has
// changed the endpoint's supported protocol—mainly being opaque or not.
func newEndpointProfileTranslator(pod *v1.Pod, port uint32, endpoint *pb.WeightedAddr, stream pb.Destination_GetProfileServer, log *logrus.Entry) *endpointProfileTranslator {
// newEndpointProfileTranslator translates pod updates and protocol updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
stream pb.Destination_GetProfileServer,
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
pod: pod,
port: port,
endpoint: endpoint,
stream: stream,
log: log,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
stream: stream,
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
log: log.WithField("component", "endpoint-profile-translator"),
}
}

func (ept *endpointProfileTranslator) UpdateProtocol(opaqueProtocol bool) {
// Update sends a DestinationProfile message into the stream, if the same
// message hasn't been sent already. If it has, false is returned.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) {
opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
endpoint, err := ept.createEndpoint(*address, opaquePorts)
if err != nil {
return false, fmt.Errorf("failed to create endpoint: %w", err)
}

// The protocol for an endpoint should only be updated if there is a pod,
// endpoint, and the endpoint has a protocol hint. If there is an endpoint
// but it does not have a protocol hint, that means we could not determine
// if it has a peer proxy so a opaque traffic would not be supported.
if ept.pod != nil && ept.endpoint != nil && ept.endpoint.ProtocolHint != nil {
if !opaqueProtocol {
ept.endpoint.ProtocolHint.OpaqueTransport = nil
} else if ept.endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&ept.pod.Spec)
if address.Pod != nil && endpoint != nil && endpoint.ProtocolHint != nil {
if !address.OpaqueProtocol {
endpoint.ProtocolHint.OpaqueTransport = nil
} else if endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&address.Pod.Spec)
if err != nil {
ept.log.Error(err)
} else {
ept.endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
return false, err
}

endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
}

}
profile := ept.createDefaultProfile(opaqueProtocol)
profile := &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: endpoint,
OpaqueProtocol: address.OpaqueProtocol,
}
msg := profile.String()
if msg == ept.lastMessage {
return false, nil
}
ept.lastMessage = msg
ept.log.Debugf("sending protocol update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
ept.log.Errorf("failed to send protocol update: %s", err)
return false, fmt.Errorf("failed to send protocol update: %w", err)
}

return true, nil
}

func (ept *endpointProfileTranslator) createDefaultProfile(opaqueProtocol bool) *pb.DestinationProfile {
return &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: ept.endpoint,
OpaqueProtocol: opaqueProtocol,
func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
weightedAddr, err := createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.log)
if err != nil {
return nil, err
}

// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
if address.Pod != nil {
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
}

return weightedAddr, err
}
2 changes: 1 addition & 1 deletion controller/api/destination/endpoint_translator.go
Expand Up @@ -273,7 +273,7 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
err error
)
if address.Pod != nil {
opaquePorts = getAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride
Expand Down

0 comments on commit 65ddba4

Please sign in to comment.