Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dst: Update GetProfile's stream when pod associated to HostPort lookup changes #11334

Merged
merged 9 commits into from Sep 28, 2023
114 changes: 78 additions & 36 deletions controller/api/destination/endpoint_profile_translator.go
@@ -1,65 +1,107 @@
package destination

import (
"fmt"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
log "github.com/sirupsen/logrus"
)

type endpointProfileTranslator struct {
pod *v1.Pod
port uint32
endpoint *pb.WeightedAddr
stream pb.Destination_GetProfileServer
log *logrus.Entry
enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}
stream pb.Destination_GetProfileServer
lastMessage string

k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
log *log.Entry
}

// newEndpointProfileTranslator translates protocol updates to
// DestinationProfiles for endpoints. When a Server on the cluster is updated
// it is possible that it selects an endpoint that is being watched, if that
// is the case then an update will be sent to the client if the Server has
// changed the endpoint's supported protocol—mainly being opaque or not.
func newEndpointProfileTranslator(pod *v1.Pod, port uint32, endpoint *pb.WeightedAddr, stream pb.Destination_GetProfileServer, log *logrus.Entry) *endpointProfileTranslator {
// newEndpointProfileTranslator translates pod updates and protocol updates to
// DestinationProfiles for endpoints
func newEndpointProfileTranslator(
enableH2Upgrade bool,
controllerNS,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
stream pb.Destination_GetProfileServer,
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
) *endpointProfileTranslator {
return &endpointProfileTranslator{
pod: pod,
port: port,
endpoint: endpoint,
stream: stream,
log: log,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
defaultOpaquePorts: defaultOpaquePorts,
stream: stream,
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
log: log.WithField("component", "endpoint-profile-translator"),
}
}

func (ept *endpointProfileTranslator) UpdateProtocol(opaqueProtocol bool) {
// Update sends a DestinationProfile message into the stream, if the same
// message hasn't been sent already. If it has, false is returned.
func (ept *endpointProfileTranslator) Update(address *watcher.Address) (bool, error) {
opaquePorts := watcher.GetAnnotatedOpaquePorts(address.Pod, ept.defaultOpaquePorts)
endpoint, err := ept.createEndpoint(*address, opaquePorts)
if err != nil {
return false, fmt.Errorf("failed to create endpoint: %w", err)
}

// The protocol for an endpoint should only be updated if there is a pod,
// endpoint, and the endpoint has a protocol hint. If there is an endpoint
// but it does not have a protocol hint, that means we could not determine
// if it has a peer proxy so a opaque traffic would not be supported.
if ept.pod != nil && ept.endpoint != nil && ept.endpoint.ProtocolHint != nil {
if !opaqueProtocol {
ept.endpoint.ProtocolHint.OpaqueTransport = nil
} else if ept.endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&ept.pod.Spec)
if address.Pod != nil && endpoint != nil && endpoint.ProtocolHint != nil {
if !address.OpaqueProtocol {
endpoint.ProtocolHint.OpaqueTransport = nil
} else if endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&address.Pod.Spec)
if err != nil {
ept.log.Error(err)
} else {
ept.endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
return false, err
}

endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
}

}
profile := ept.createDefaultProfile(opaqueProtocol)
profile := &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: endpoint,
OpaqueProtocol: address.OpaqueProtocol,
}
msg := profile.String()
if msg == ept.lastMessage {
return false, nil
}
ept.lastMessage = msg
ept.log.Debugf("sending protocol update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
ept.log.Errorf("failed to send protocol update: %s", err)
return false, fmt.Errorf("failed to send protocol update: %w", err)
}

return true, nil
}

func (ept *endpointProfileTranslator) createDefaultProfile(opaqueProtocol bool) *pb.DestinationProfile {
return &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: ept.endpoint,
OpaqueProtocol: opaqueProtocol,
func (ept *endpointProfileTranslator) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{}) (*pb.WeightedAddr, error) {
weightedAddr, err := createWeightedAddr(address, opaquePorts, ept.enableH2Upgrade, ept.identityTrustDomain, ept.controllerNS, ept.log)
if err != nil {
return nil, err
}

// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
if address.Pod != nil {
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
}

return weightedAddr, err
}
2 changes: 1 addition & 1 deletion controller/api/destination/endpoint_translator.go
Expand Up @@ -273,7 +273,7 @@ func (et *endpointTranslator) sendClientAdd(set watcher.AddressSet) {
err error
)
if address.Pod != nil {
opaquePorts = getAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
opaquePorts = watcher.GetAnnotatedOpaquePorts(address.Pod, et.defaultOpaquePorts)
wa, err = createWeightedAddr(address, opaquePorts, et.enableH2Upgrade, et.identityTrustDomain, et.controllerNS, et.log)
} else {
var authOverride *pb.AuthorityOverride
Expand Down