Skip to content

Commit

Permalink
egress ip: conditionally use grpc for health monitoring
Browse files Browse the repository at this point in the history
When ovnkube container, in both master and node pods, is started with
the newly introduced flag 'egressip-node-healthcheck-port', egressip
implementation will now use gRPC with that parameter.

Signed-off-by: Flavio Fernandes <flaviof@redhat.com>
  • Loading branch information
flavio-fernandes committed Jul 28, 2022
1 parent c48e6c5 commit 2773b7f
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 0 deletions.
7 changes: 7 additions & 0 deletions go-controller/pkg/config/config.go
Expand Up @@ -336,6 +336,7 @@ type OVNKubernetesFeatureConfig struct {
EgressIPReachabiltyTotalTimeout int `gcfg:"egressip-reachability-total-timeout"`
EnableEgressFirewall bool `gcfg:"enable-egress-firewall"`
EnableEgressQoS bool `gcfg:"enable-egress-qos"`
EgressIPNodeHealthCheckPort int `gcfg:"egressip-node-healthcheck-port"`
}

// GatewayMode holds the node gateway mode
Expand Down Expand Up @@ -875,6 +876,12 @@ var OVNK8sFeatureFlags = []cli.Flag{
Destination: &cliConfig.OVNKubernetesFeature.EnableEgressQoS,
Value: OVNKubernetesFeature.EnableEgressQoS,
},
&cli.IntFlag{
Name: "egressip-node-healthcheck-port",
Usage: "Configure EgressIP node reachability using gRPC on this TCP port.",
Destination: &cliConfig.OVNKubernetesFeature.EgressIPNodeHealthCheckPort,
// Value: 0,
},
}

// K8sFlags capture Kubernetes-related options
Expand Down
27 changes: 27 additions & 0 deletions go-controller/pkg/node/node.go
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/controllers/upgrade"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/healthcheck"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/sbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
Expand Down Expand Up @@ -580,6 +581,32 @@ func (n *OvnNode) Start(ctx context.Context, wg *sync.WaitGroup) error {
}
}

// start the health checking server (used by egressip)
health_check_port := config.OVNKubernetesFeature.EgressIPNodeHealthCheckPort
if health_check_port != 0 {
var node_mgmt_ip net.IP
if mgmtPortConfig.ipv4 != nil {
node_mgmt_ip = mgmtPortConfig.ipv4.ifAddr.IP
} else if mgmtPortConfig.ipv6 != nil {
node_mgmt_ip = mgmtPortConfig.ipv6.ifAddr.IP
} else {
return fmt.Errorf("unable to start health checking server: no mgmt ip")
}

health_server, err := healthcheck.NewEgressIPHealthServer(node_mgmt_ip, health_check_port)
if err != nil {
return fmt.Errorf("unable to allocate health checking server: %v", err)
}

wg.Add(1)
go func() {
defer wg.Done()
health_server.Run(n.stopChan)
}()
} else {
klog.Infof("Egress IP health check server skipped: no port specified")
}

klog.Infof("OVN Kube Node initialized and ready.")
return nil
}
Expand Down
38 changes: 38 additions & 0 deletions go-controller/pkg/ovn/egressip.go
@@ -1,6 +1,7 @@
package ovn

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/healthcheck"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
Expand Down Expand Up @@ -1754,6 +1756,7 @@ func (oc *Controller) initEgressIPAllocator(node *kapi.Node) (err error) {
egressIPConfig: parsedEgressIPConfig,
mgmtIPs: mgmtIPs,
allocations: make(map[string]string),
healthClient: healthcheck.NewEgressIPHealthClient(node.Name),
}
}
return nil
Expand Down Expand Up @@ -1785,6 +1788,9 @@ func (oc *Controller) deleteNodeForEgress(node *v1.Node) error {
return err
}
oc.eIPC.allocator.Lock()
if eNode, exists := oc.eIPC.allocator.cache[node.Name]; exists {
eNode.healthClient.Disconnect()
}
delete(oc.eIPC.allocator.cache, node.Name)
oc.eIPC.allocator.Unlock()
return nil
Expand Down Expand Up @@ -1815,6 +1821,7 @@ type egressNode struct {
egressIPConfig *util.ParsedNodeEgressIPConfiguration
mgmtIPs []net.IP
allocations map[string]string
healthClient healthcheck.EgressIPHealthClient
isReady bool
isReachable bool
isEgressAssignable bool
Expand Down Expand Up @@ -1878,6 +1885,8 @@ type egressIPController struct {
watchFactory *factory.WatchFactory
// EgressIP Node reachability total timeout configuration
egressIPTotalTimeout int
// EgressIP Node reachability gRPC port (0 means it should use dial instead)
egressIPNodeHealthCheckPort int
}

// addPodEgressIPAssignment will program OVN with logical router policies
Expand Down Expand Up @@ -2159,6 +2168,13 @@ func checkEgressNodesReachabilityIterate(oc *Controller) {
}

func (oc *Controller) isReachable(node *egressNode) bool {
if oc.eIPC.egressIPNodeHealthCheckPort == 0 {
return oc.isReachableViaDial(node)
}
return oc.isReachableViaGRPC(node, oc.eIPC.egressIPNodeHealthCheckPort)
}

func (oc *Controller) isReachableViaDial(node *egressNode) bool {
var retryTimeOut, initialRetryTimeOut time.Duration

numMgmtIPs := len(node.mgmtIPs)
Expand Down Expand Up @@ -2222,6 +2238,28 @@ func (e *egressIPDial) dial(ip net.IP, timeout time.Duration) bool {
return true
}

func (oc *Controller) isReachableViaGRPC(node *egressNode, health_check_port int) bool {
// Check if we need to do node reachability check
if oc.eIPC.egressIPTotalTimeout == 0 {
return true
}

if len(node.mgmtIPs) == 0 {
return false
}

dial_ctx, dial_cancel := context.WithTimeout(context.Background(), time.Duration(oc.eIPC.egressIPTotalTimeout)*time.Second)
defer dial_cancel()

if !node.healthClient.IsConnected() {
// gRPC session is not up. Attempt to connect and if that suceeds, we will declare node as reacheable.
return node.healthClient.Connect(dial_ctx, node.mgmtIPs, health_check_port)
}

// gRPC session is already established. Send a probe, which will succeed, or close the session.
return node.healthClient.Probe(dial_ctx)
}

func getClusterSubnets() ([]*net.IPNet, []*net.IPNet) {
var v4ClusterSubnets = []*net.IPNet{}
var v6ClusterSubnets = []*net.IPNet{}
Expand Down
166 changes: 166 additions & 0 deletions go-controller/pkg/ovn/healthcheck/egressip_healthcheck.go
@@ -0,0 +1,166 @@
package healthcheck

import (
"fmt"
"net"
"sync"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
)

const (
ServiceEgressIpNode = "Service_Egress_IP"
)

// UnimplementedHealthServer must be embedded to have forward compatible implementations.
type healthServer struct {
UnimplementedHealthServer
}

func (healthServer) Check(_ context.Context, req *HealthCheckRequest) (*HealthCheckResponse, error) {
response := HealthCheckResponse{}

if req.GetService() == ServiceEgressIpNode {
response.Status = HealthCheckResponse_SERVING
} else {
response.Status = HealthCheckResponse_NOT_SERVING
}
return &response, nil
}

type egressIPHealthServer struct {
// Management port bound by server
node_mgmt_ip net.IP

// EgressIP Node reachability gRPC port (0 means it should use dial instead)
health_check_port int
}

func NewEgressIPHealthServer(node_mgmt_ip net.IP, health_check_port int) (*egressIPHealthServer, error) {
return &egressIPHealthServer{
node_mgmt_ip: node_mgmt_ip,
health_check_port: health_check_port,
}, nil
}

func (ehs *egressIPHealthServer) Run(stopCh <-chan struct{}) {
lis, err := net.Listen("tcp", fmt.Sprintf("%s:%d", ehs.node_mgmt_ip.String(), ehs.health_check_port))
if err != nil {
klog.Fatalf("Health checking listen failed: %v", err)
return
}

wg := &sync.WaitGroup{}

// TODO (FF): add TLS credentials support
grpcServer := grpc.NewServer()

wg.Add(1)
go func() {
defer wg.Done()

RegisterHealthServer(grpcServer, &healthServer{})
klog.Infof("Starting Egress IP Health Server on %s:%d", ehs.node_mgmt_ip.String(), ehs.health_check_port)
if err := grpcServer.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Health checking serve failed: %v", err)
}
klog.Infof("Stopped Egress IP Health Server on %s:%d", ehs.node_mgmt_ip.String(), ehs.health_check_port)
}()

<-stopCh

klog.Info("Shutting down Egress IP Health Server")
grpcServer.Stop()
wg.Wait()
klog.Info("Shut down Egress IP Health Server")
}

type EgressIPHealthClient interface {
IsConnected() bool
Connect(dial_ctx context.Context, mgmtIPs []net.IP, health_check_port int) bool
Disconnect()
Probe(dial_ctx context.Context) bool
}

type egressIPHealthClient struct {
nodeName string // debug
nodeAddr string // debug
conn *grpc.ClientConn
// the probe_failed state is used to mitigate situations when
// connection just went down. With that, we do not declare node
// unreachable unless connection could not be re-established
probe_failed bool
}

func NewEgressIPHealthClient(nodeName string) EgressIPHealthClient {
return &egressIPHealthClient{nodeName: nodeName}
}

func (ehc *egressIPHealthClient) IsConnected() bool {
return ehc.conn != nil
}

func (ehc *egressIPHealthClient) Connect(dial_ctx context.Context, mgmtIPs []net.IP, health_check_port int) bool {

var conn *grpc.ClientConn
var node_addr string
var err error

for _, node_mgmt_ip := range mgmtIPs {
options := []grpc.DialOption{
grpc.WithBlock(),
// TODO (FF): replace with TLS credentials
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
node_addr = fmt.Sprintf("%s:%d", node_mgmt_ip.String(), health_check_port)
conn, err = grpc.DialContext(dial_ctx, node_addr, options...)
if err == nil && conn != nil {
break
}
}
if conn == nil {
klog.Warningf("Could not connect to %s (%s): %v", ehc.nodeName, node_addr, err)
return false
}

klog.Infof("Connected to %s (%s)", ehc.nodeName, node_addr)
ehc.nodeAddr = node_addr
ehc.conn = conn
return true
}

func (ehc *egressIPHealthClient) Disconnect() {
if ehc.conn != nil {
klog.Infof("Closing connection with %s (%s)", ehc.nodeName, ehc.nodeAddr)
ehc.conn.Close()
ehc.conn = nil
}
}

func (ehc *egressIPHealthClient) Probe(dial_ctx context.Context) bool {

if ehc.conn == nil {
// should never happen
klog.Warningf("Unexpected probing before connecting %s", ehc.nodeName)
return false
}

response, err := NewHealthClient(ehc.conn).Check(dial_ctx, &HealthCheckRequest{Service: ServiceEgressIpNode})
if err != nil {
// check failed. What we will return here will depend on ehc.probe_failed. If this is the first failure,
// let's tolerate it to account for cases where session went down and we just need it re-established.
// Otherwise, declare it failed.
klog.V(5).Infof("Probe failed %s (%s): %s", ehc.nodeName, ehc.nodeAddr, err)
ehc.Disconnect()
prev_probe_failed := ehc.probe_failed
ehc.probe_failed = true
return !prev_probe_failed
}

ehc.probe_failed = false
klog.V(5).Infof("Got response from %s (%s): %v", ehc.nodeName, ehc.nodeAddr, response.GetStatus())
return response.GetStatus() == HealthCheckResponse_SERVING
}
4 changes: 4 additions & 0 deletions go-controller/pkg/ovn/ovn.go
Expand Up @@ -300,6 +300,7 @@ func NewOvnController(ovnClient *util.OVNClientset, wf *factory.WatchFactory, st
nbClient: libovsdbOvnNBClient,
watchFactory: wf,
egressIPTotalTimeout: config.OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout,
egressIPNodeHealthCheckPort: config.OVNKubernetesFeature.EgressIPNodeHealthCheckPort,
},
loadbalancerClusterCache: make(map[kapi.Protocol]string),
multicastSupport: config.EnableMulticast,
Expand Down Expand Up @@ -399,6 +400,9 @@ func (oc *Controller) Run(ctx context.Context, wg *sync.WaitGroup) error {
}
if config.OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout == 0 {
klog.V(2).Infof("EgressIP node reachability check disabled")
} else if config.OVNKubernetesFeature.EgressIPNodeHealthCheckPort != 0 {
klog.Infof("EgressIP node reachability enabled and using gRPC port %d",
config.OVNKubernetesFeature.EgressIPNodeHealthCheckPort)
}
}

Expand Down

0 comments on commit 2773b7f

Please sign in to comment.