Skip to content

Commit

Permalink
egress ip: conditionally use grpc for health monitoring
Browse files Browse the repository at this point in the history
When ovnkube container, in both master and node pods, is started with
the newly introduced flag 'egressip-node-healthcheck-port', egressip
implementation will now use gRPC with that parameter.

Signed-off-by: Flavio Fernandes <flaviof@redhat.com>
  • Loading branch information
flavio-fernandes committed Aug 3, 2022
1 parent 3672dd7 commit a9f8ac0
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 0 deletions.
7 changes: 7 additions & 0 deletions go-controller/pkg/config/config.go
Expand Up @@ -336,6 +336,7 @@ type OVNKubernetesFeatureConfig struct {
EgressIPReachabiltyTotalTimeout int `gcfg:"egressip-reachability-total-timeout"`
EnableEgressFirewall bool `gcfg:"enable-egress-firewall"`
EnableEgressQoS bool `gcfg:"enable-egress-qos"`
EgressIPNodeHealthCheckPort int `gcfg:"egressip-node-healthcheck-port"`
}

// GatewayMode holds the node gateway mode
Expand Down Expand Up @@ -875,6 +876,12 @@ var OVNK8sFeatureFlags = []cli.Flag{
Destination: &cliConfig.OVNKubernetesFeature.EnableEgressQoS,
Value: OVNKubernetesFeature.EnableEgressQoS,
},
&cli.IntFlag{
Name: "egressip-node-healthcheck-port",
Usage: "Configure EgressIP node reachability using gRPC on this TCP port.",
Destination: &cliConfig.OVNKubernetesFeature.EgressIPNodeHealthCheckPort,
// Value: 0,
},
}

// K8sFlags capture Kubernetes-related options
Expand Down
7 changes: 7 additions & 0 deletions go-controller/pkg/config/config_test.go
Expand Up @@ -209,6 +209,7 @@ mode=full
[ovnkubernetesfeature]
egressip-reachability-total-timeout=3
egressip-node-healthcheck-port=1234
`

var newData string
Expand Down Expand Up @@ -297,6 +298,7 @@ var _ = Describe("Config Operations", func() {
gomega.Expect(OvnKubeNode.MgmtPortNetdev).To(gomega.Equal(""))
gomega.Expect(Gateway.RouterSubnet).To(gomega.Equal(""))
gomega.Expect(OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout).To(gomega.Equal(1))
gomega.Expect(OVNKubernetesFeature.EgressIPNodeHealthCheckPort).To(gomega.Equal(0))

for _, a := range []OvnAuthConfig{OvnNorth, OvnSouth} {
gomega.Expect(a.Scheme).To(gomega.Equal(OvnDBSchemeUnix))
Expand Down Expand Up @@ -600,6 +602,7 @@ var _ = Describe("Config Operations", func() {

gomega.Expect(HybridOverlay.Enabled).To(gomega.BeTrue())
gomega.Expect(OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout).To(gomega.Equal(3))
gomega.Expect(OVNKubernetesFeature.EgressIPNodeHealthCheckPort).To(gomega.Equal(1234))
gomega.Expect(HybridOverlay.ClusterSubnets).To(gomega.Equal([]CIDRNetworkEntry{
{ovntest.MustParseIPNet("11.132.0.0/14"), 23},
}))
Expand Down Expand Up @@ -680,6 +683,7 @@ var _ = Describe("Config Operations", func() {

gomega.Expect(HybridOverlay.Enabled).To(gomega.BeTrue())
gomega.Expect(OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout).To(gomega.Equal(5))
gomega.Expect(OVNKubernetesFeature.EgressIPNodeHealthCheckPort).To(gomega.Equal(4321))
gomega.Expect(HybridOverlay.ClusterSubnets).To(gomega.Equal([]CIDRNetworkEntry{
{ovntest.MustParseIPNet("11.132.0.0/14"), 23},
}))
Expand Down Expand Up @@ -735,6 +739,7 @@ var _ = Describe("Config Operations", func() {
"-ofctrl-wait-before-clear=5000",
"-metrics-enable-config-duration=true",
"-egressip-reachability-total-timeout=5",
"-egressip-node-healthcheck-port=4321",
}
err = app.Run(cliArgs)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down Expand Up @@ -1071,6 +1076,7 @@ enable-pprof=true
gomega.Equal("ssl:6.5.4.1:6652,ssl:6.5.4.2:6652,ssl:6.5.4.3:6652"))
gomega.Expect(OvnSouth.CertCommonName).To(gomega.Equal("testsbcommonname"))
gomega.Expect(OVNKubernetesFeature.EgressIPReachabiltyTotalTimeout).To(gomega.Equal(3))
gomega.Expect(OVNKubernetesFeature.EgressIPNodeHealthCheckPort).To(gomega.Equal(12345))
return nil
}
cliArgs := []string{
Expand Down Expand Up @@ -1107,6 +1113,7 @@ enable-pprof=true
"-sb-client-cacert=/client/cacert2",
"-sb-cert-common-name=testsbcommonname",
"-egressip-reachability-total-timeout=3",
"-egressip-node-healthcheck-port=12345",
}
err = app.Run(cliArgs)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
Expand Down
34 changes: 34 additions & 0 deletions go-controller/pkg/node/node.go
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/informer"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/kube"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/node/controllers/upgrade"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/healthcheck"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
"github.com/vishvananda/netlink"
Expand Down Expand Up @@ -570,10 +571,43 @@ func (n *OvnNode) Start(ctx context.Context, wg *sync.WaitGroup) error {
}
}

// Start the health checking server used by egressip, if EgressIPNodeHealthCheckPort is specified
if err := n.startEgressIPHealthCheckingServer(wg, mgmtPortConfig); err != nil {
return err
}

klog.Infof("OVN Kube Node initialized and ready.")
return nil
}

func (n *OvnNode) startEgressIPHealthCheckingServer(wg *sync.WaitGroup, mgmtPortConfig *managementPortConfig) error {
healthCheckPort := config.OVNKubernetesFeature.EgressIPNodeHealthCheckPort
if healthCheckPort != 0 {
var nodeMgmtIP net.IP
if mgmtPortConfig.ipv4 != nil {
nodeMgmtIP = mgmtPortConfig.ipv4.ifAddr.IP
} else if mgmtPortConfig.ipv6 != nil {
nodeMgmtIP = mgmtPortConfig.ipv6.ifAddr.IP
} else {
return fmt.Errorf("unable to start health checking server: no mgmt ip")
}

healthServer, err := healthcheck.NewEgressIPHealthServer(nodeMgmtIP, healthCheckPort)
if err != nil {
return fmt.Errorf("unable to allocate health checking server: %v", err)
}

wg.Add(1)
go func() {
defer wg.Done()
healthServer.Run(n.stopChan)
}()
} else {
klog.Infof("Egress IP health check server skipped: no port specified")
}
return nil
}

func (n *OvnNode) WatchEndpoints() error {
_, err := n.watchFactory.AddEndpointsHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(old, new interface{}) {
Expand Down
38 changes: 38 additions & 0 deletions go-controller/pkg/ovn/egressip.go
@@ -1,6 +1,7 @@
package ovn

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/libovsdbops"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/metrics"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/nbdb"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/ovn/healthcheck"

"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/types"
"github.com/ovn-org/ovn-kubernetes/go-controller/pkg/util"
Expand Down Expand Up @@ -1754,6 +1756,7 @@ func (oc *Controller) initEgressIPAllocator(node *kapi.Node) (err error) {
egressIPConfig: parsedEgressIPConfig,
mgmtIPs: mgmtIPs,
allocations: make(map[string]string),
healthClient: healthcheck.NewEgressIPHealthClient(node.Name),
}
}
return nil
Expand Down Expand Up @@ -1785,6 +1788,9 @@ func (oc *Controller) deleteNodeForEgress(node *v1.Node) error {
return err
}
oc.eIPC.allocator.Lock()
if eNode, exists := oc.eIPC.allocator.cache[node.Name]; exists {
eNode.healthClient.Disconnect()
}
delete(oc.eIPC.allocator.cache, node.Name)
oc.eIPC.allocator.Unlock()
return nil
Expand Down Expand Up @@ -1815,6 +1821,7 @@ type egressNode struct {
egressIPConfig *util.ParsedNodeEgressIPConfiguration
mgmtIPs []net.IP
allocations map[string]string
healthClient healthcheck.EgressIPHealthClient
isReady bool
isReachable bool
isEgressAssignable bool
Expand Down Expand Up @@ -1878,6 +1885,8 @@ type egressIPController struct {
watchFactory *factory.WatchFactory
// EgressIP Node reachability total timeout configuration
egressIPTotalTimeout int
// EgressIP Node reachability gRPC port (0 means it should use dial instead)
egressIPNodeHealthCheckPort int
}

// addPodEgressIPAssignment will program OVN with logical router policies
Expand Down Expand Up @@ -2158,6 +2167,13 @@ func checkEgressNodesReachabilityIterate(oc *Controller) {
}

func (oc *Controller) isReachable(node *egressNode) bool {
if oc.eIPC.egressIPNodeHealthCheckPort == 0 {
return oc.isReachableViaDial(node)
}
return oc.isReachableViaGRPC(node, oc.eIPC.egressIPNodeHealthCheckPort)
}

func (oc *Controller) isReachableViaDial(node *egressNode) bool {
var retryTimeOut, initialRetryTimeOut time.Duration

numMgmtIPs := len(node.mgmtIPs)
Expand Down Expand Up @@ -2221,6 +2237,28 @@ func (e *egressIPDial) dial(ip net.IP, timeout time.Duration) bool {
return true
}

func (oc *Controller) isReachableViaGRPC(node *egressNode, healthCheckPort int) bool {
// Check if we need to do node reachability check
if oc.eIPC.egressIPTotalTimeout == 0 {
return true
}

if len(node.mgmtIPs) == 0 {
return false
}

dialCtx, dialCancel := context.WithTimeout(context.Background(), time.Duration(oc.eIPC.egressIPTotalTimeout)*time.Second)
defer dialCancel()

if !node.healthClient.IsConnected() {
// gRPC session is not up. Attempt to connect and if that suceeds, we will declare node as reacheable.
return node.healthClient.Connect(dialCtx, node.mgmtIPs, healthCheckPort)
}

// gRPC session is already established. Send a probe, which will succeed, or close the session.
return node.healthClient.Probe(dialCtx)
}

func getClusterSubnets() ([]*net.IPNet, []*net.IPNet) {
var v4ClusterSubnets = []*net.IPNet{}
var v6ClusterSubnets = []*net.IPNet{}
Expand Down

0 comments on commit a9f8ac0

Please sign in to comment.