Skip to content

Commit

Permalink
Complete fix for hostport staleness - pod watcher
Browse files Browse the repository at this point in the history
Followup to #11328, based off of `alpeb/hostport-fixup-stopgap`.

Implements a new pod watcher, instantiated along the other ones in the
Destination server. It's generic enough to catch all pod events in the
cluster, so it's up to the subscribers to filter out the ones they're
interested in, and to set up any metrics.

In the Destination server's `subscribeToEndpointProfile` method, we
create a new `HostPortAdaptor` that is subscribed to the pod watcher,
and forwards the pod and protocol updates to the
`endpointProfileTranslator`. Handling of Server subscriptions are now
handled by this adaptor, which are recycled whenever the pod changes.

A new gauge metric `host_port_subscribers` has been created, tracking
the number of subscribers for a given HostIP+port combination.

## Other Changes

- Moved the `server.createAddress` method into a static function in
  `endpoints_watcher.go`, for better reusability.
- The "Return profile for host port pods" test introduced in #11328 was
  extended to track the ensuing events after a pod is deleted and then
  recreated (:taco: to @adleong for the test).
- Given that test consumes multiple events, we had to change the
  `profileStream` test helper to allow for the `GetProfile` call to
  block. Callers to `profileStream` now need to manually cancel the
  returned stream.
  • Loading branch information
alpeb committed Sep 4, 2023
1 parent 31a6623 commit 87ffe11
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 107 deletions.
36 changes: 16 additions & 20 deletions controller/api/destination/endpoint_profile_translator.go
Expand Up @@ -7,59 +7,55 @@ import (
)

type endpointProfileTranslator struct {
pod *v1.Pod
port uint32
endpoint *pb.WeightedAddr
stream pb.Destination_GetProfileServer
log *logrus.Entry
port uint32
stream pb.Destination_GetProfileServer
log *logrus.Entry
}

// newEndpointProfileTranslator translates protocol updates to
// DestinationProfiles for endpoints. When a Server on the cluster is updated
// it is possible that it selects an endpoint that is being watched, if that
// is the case then an update will be sent to the client if the Server has
// changed the endpoint's supported protocol—mainly being opaque or not.
func newEndpointProfileTranslator(pod *v1.Pod, port uint32, endpoint *pb.WeightedAddr, stream pb.Destination_GetProfileServer, log *logrus.Entry) *endpointProfileTranslator {
func newEndpointProfileTranslator(port uint32, stream pb.Destination_GetProfileServer, log *logrus.Entry) *endpointProfileTranslator {
return &endpointProfileTranslator{
pod: pod,
port: port,
endpoint: endpoint,
stream: stream,
log: log,
port: port,
stream: stream,
log: log,
}
}

func (ept *endpointProfileTranslator) UpdateProtocol(opaqueProtocol bool) {
func (ept *endpointProfileTranslator) UpdateProtocol(pod *v1.Pod, endpoint *pb.WeightedAddr, opaqueProtocol bool) {
// The protocol for an endpoint should only be updated if there is a pod,
// endpoint, and the endpoint has a protocol hint. If there is an endpoint
// but it does not have a protocol hint, that means we could not determine
// if it has a peer proxy so a opaque traffic would not be supported.
if ept.pod != nil && ept.endpoint != nil && ept.endpoint.ProtocolHint != nil {
if pod != nil && endpoint != nil && endpoint.ProtocolHint != nil {
if !opaqueProtocol {
ept.endpoint.ProtocolHint.OpaqueTransport = nil
} else if ept.endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&ept.pod.Spec)
endpoint.ProtocolHint.OpaqueTransport = nil
} else if endpoint.ProtocolHint.OpaqueTransport == nil {
port, err := getInboundPort(&pod.Spec)
if err != nil {
ept.log.Error(err)
} else {
ept.endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
endpoint.ProtocolHint.OpaqueTransport = &pb.ProtocolHint_OpaqueTransport{
InboundPort: port,
}
}
}

}
profile := ept.createDefaultProfile(opaqueProtocol)
profile := ept.createDefaultProfile(endpoint, opaqueProtocol)
ept.log.Debugf("sending protocol update: %+v", profile)
if err := ept.stream.Send(profile); err != nil {
ept.log.Errorf("failed to send protocol update: %s", err)
}
}

func (ept *endpointProfileTranslator) createDefaultProfile(opaqueProtocol bool) *pb.DestinationProfile {
func (ept *endpointProfileTranslator) createDefaultProfile(endpoint *pb.WeightedAddr, opaqueProtocol bool) *pb.DestinationProfile {
return &pb.DestinationProfile{
RetryBudget: defaultRetryBudget(),
Endpoint: ept.endpoint,
Endpoint: endpoint,
OpaqueProtocol: opaqueProtocol,
}
}
257 changes: 257 additions & 0 deletions controller/api/destination/hostport_adaptor.go
@@ -0,0 +1,257 @@
package destination

import (
"fmt"
"strconv"
"sync"

pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)

// hostPortAdaptor receives events from a podWatcher and forwards the pod and
// protocol updates to an endpointProfileTranslator. If required, it subscribes
// to associated Server updates. Pod updates are only taken into account to the
// extent they imply a change in its readiness
type hostPortAdaptor struct {
servers *watcher.ServerWatcher
listener *endpointProfileTranslator
ip string
port uint32
address *watcher.Address
endpoint *pb.WeightedAddr
subscribed bool
podReady bool

enableH2Upgrade bool
controllerNS string
identityTrustDomain string
defaultOpaquePorts map[uint32]struct{}

k8sAPI *k8s.API
metadataAPI *k8s.MetadataAPI
metrics prometheus.Gauge
log *logging.Entry

mu sync.Mutex
}

// hostIPMetrics is a prometheus gauge shared amongst hostPortAdaptor instances
var hostIPMetrics = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "host_port_subscribers",
Help: "Counter of subscribes to Pod changes for a given hostIP and port",
},
[]string{"hostIP", "port"},
)

func newHostPortAdaptor(
k8sAPI *k8s.API,
metadataAPI *k8s.MetadataAPI,
servers *watcher.ServerWatcher,
enableH2Upgrade bool,
controllerNS string,
identityTrustDomain string,
defaultOpaquePorts map[uint32]struct{},
ip string,
port uint32,
listener *endpointProfileTranslator,
address *watcher.Address,
log *logging.Entry,
) *hostPortAdaptor {
log = log.WithField("component", "hostport-adaptor").WithField("ip", ip)

podReady := isRunningAndReady(address.Pod)

// if the label map has already been created, it'll get reused
metrics := hostIPMetrics.With(prometheus.Labels{
"hostIP": ip,
"port": strconv.FormatUint(uint64(port), 10),
})

return &hostPortAdaptor{
servers: servers,
listener: listener,
ip: ip,
port: port,
address: address,
defaultOpaquePorts: defaultOpaquePorts,
podReady: podReady,
enableH2Upgrade: enableH2Upgrade,
controllerNS: controllerNS,
identityTrustDomain: identityTrustDomain,
k8sAPI: k8sAPI,
metadataAPI: metadataAPI,
metrics: metrics,
log: log,
}
}

func (pt *hostPortAdaptor) Sync() error {
pt.mu.Lock()
defer pt.mu.Unlock()

opaquePorts := getAnnotatedOpaquePorts(pt.address.Pod, pt.defaultOpaquePorts)
endpoint, err := pt.createEndpoint(*pt.address, opaquePorts)
if err != nil {
return fmt.Errorf("failed to create endpoint: %w", err)
}
pt.endpoint = endpoint
pt.log.Debugf("Sync for endpoint %s", pt.endpoint)
pt.subscribed = false

// If the endpoint's port is annotated as opaque, we don't need to
// subscribe for updates because it will always be opaque
// regardless of any Servers that may select it.
if _, ok := opaquePorts[pt.port]; ok {
pt.UpdateProtocol(true)
} else if pt.address.Pod == nil {
pt.UpdateProtocol(false)
} else {
pt.UpdateProtocol(pt.address.OpaqueProtocol)
pt.servers.Subscribe(pt.address.Pod, pt.port, pt)
pt.subscribed = true
}

return nil
}

func (pt *hostPortAdaptor) Clean() {
if pt.subscribed {
pt.mu.Lock()
defer pt.mu.Unlock()

pt.log.Debugf("Clean for endpoint %s", pt.endpoint)
pt.servers.Unsubscribe(pt.address.Pod, pt.port, pt)
pt.subscribed = false
}
}

func (pt *hostPortAdaptor) UpdateProtocol(opaqueProtocol bool) {
pt.listener.UpdateProtocol(pt.address.Pod, pt.endpoint, opaqueProtocol)
}

// Update is an informer event handler - All operations should be non-blocking
func (pt *hostPortAdaptor) Update(pod *corev1.Pod) {
pt.mu.Lock()
defer pt.mu.Unlock()

if !pt.matchesIPPort(pod) {
return
}

if pt.podReady && pt.address.Pod.UID != pod.UID {
pt.log.Tracef("Current pod still ready, ignoring event on %s.%s", pod.Name, pod.Namespace)
return
}

if pt.podReady && !isRunningAndReady(pod) {
pt.log.Debugf("Pod %s.%s became unready - remove", pod.Name, pod.Namespace)
pt.updateAddress(nil)
return
}

if !pt.podReady && !isRunningAndReady(pod) {
pt.log.Tracef("Ignore event on %s.%s until it becomes ready", pod.Name, pod.Namespace)
return
}

if !pt.podReady && isRunningAndReady(pod) {
pt.log.Debugf("Pod %s.%s became ready", pod.Name, pod.Namespace)
pt.updateAddress(pod)
return
}

pt.log.Tracef("Ignored event on pod %s.%s", pod.Name, pod.Namespace)
}

// Remove is an informer event handler - All operations should be non-blocking
func (pt *hostPortAdaptor) Remove(pod *corev1.Pod) {
pt.mu.Lock()
defer pt.mu.Unlock()

if !pt.matchesIPPort(pod) {
return
}

if pt.address.Pod == nil {
pt.log.Tracef("Pod %s.%s already removed; discard event", pod.Name, pod.Namespace)
return
}

pt.log.Debugf("Remove pod %s.%s", pod.Name, pod.Namespace)
pt.updateAddress(nil)
}

func (pt *hostPortAdaptor) MetricsInc() {
pt.metrics.Inc()
}

func (pt *hostPortAdaptor) MetricsDec() {
pt.metrics.Dec()
}

func (pt *hostPortAdaptor) updateAddress(pod *corev1.Pod) {
go func() {
pt.podReady = pod != nil
address, err := watcher.CreateAddress(pt.k8sAPI, pt.metadataAPI, pod, pt.ip, pt.port)
if err != nil {
pt.log.Errorf("failed to create address: %s", err)
} else {
pt.Clean()
pt.address = &address
if err := pt.Sync(); err != nil {
pt.log.Errorf("error syncing hostport adaptor: %s", err)
}
}
}()
}

func (pt *hostPortAdaptor) createEndpoint(address watcher.Address, opaquePorts map[uint32]struct{},
) (*pb.WeightedAddr, error) {
weightedAddr, err := createWeightedAddr(address, opaquePorts, pt.enableH2Upgrade, pt.identityTrustDomain, pt.controllerNS, pt.log)
if err != nil {
return nil, err
}

// `Get` doesn't include the namespace in the per-endpoint
// metadata, so it needs to be special-cased.
if address.Pod != nil {
weightedAddr.MetricLabels["namespace"] = address.Pod.Namespace
}

return weightedAddr, err
}

func (pt *hostPortAdaptor) matchesIPPort(pod *corev1.Pod) bool {
if pod.Status.HostIP != pt.ip {
return false
}
for _, container := range pod.Spec.Containers {
for _, containerPort := range container.Ports {
if uint32(containerPort.HostPort) == pt.port {
return true
}
}
}

return false
}

func isRunningAndReady(pod *corev1.Pod) bool {
if pod == nil || pod.Status.Phase != corev1.PodRunning {
return false
}
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
return true
}
}

return false
}

0 comments on commit 87ffe11

Please sign in to comment.