Skip to content

Commit

Permalink
Implement NodeExcludeBalancers to exclude nodes as external loadbalancer
Browse files Browse the repository at this point in the history
If the node has labeled "node.kubernetes.io/exclude-from-external-load-balancers", It specifies that the node should not be considered
 as a target for external load-balancers which use nodes as a second hop.

Signed-off-by: cyclinder <kuocyclinder@gmail.com>
  • Loading branch information
cyclinder committed Jan 23, 2024
1 parent 4aaee51 commit 1751aab
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 2 deletions.
84 changes: 84 additions & 0 deletions e2etest/l2tests/l2.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"errors"
"fmt"
"net"
"sort"
"strconv"
"time"

Expand Down Expand Up @@ -286,6 +287,89 @@ var _ = ginkgo.Describe("L2", func() {
return node.Name
}, time.Minute, time.Second).Should(gomega.Equal(nodeToSet))
})

ginkgo.It("It should be work when adding NodeExcludeBalancers label to a node", func() {
svc, _ := service.CreateWithBackend(cs, f.Namespace.Name, "external-local-lb", service.TrafficPolicyCluster)
defer func() {
err := cs.CoreV1().Services(svc.Namespace).Delete(context.TODO(), svc.Name, metav1.DeleteOptions{})
framework.ExpectNoError(err)
}()

allNodes, err := cs.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{})
framework.ExpectNoError(err)

ginkgo.By("getting the advertising node")
var nodeToSet string

gomega.Eventually(func() error {
node, err := k8s.GetSvcNode(cs, svc.Namespace, svc.Name, allNodes)
if err != nil {
return err
}
nodeToSet = node.Name
return nil
}, time.Minute, time.Second).ShouldNot(gomega.HaveOccurred())

ginkgo.By("add the NodeExcludeBalancers label of the node")
k8s.AddLabelToNode(nodeToSet, corev1.LabelNodeExcludeBalancers, "", cs)
defer func() {
ginkgo.By("removing the NodeExcludeBalancers label of the node")
k8s.RemoveLabelFromNode(nodeToSet, corev1.LabelNodeExcludeBalancers, cs)
}()
time.Sleep(time.Second)

events, _ := cs.CoreV1().Events(svc.Namespace).List(context.Background(), metav1.ListOptions{FieldSelector: "reason=nodeAssigned"})

svcEvents := make([]corev1.Event, 0)
for _, e := range events.Items {
if e.InvolvedObject.Name == svc.Name {
svcEvents = append(svcEvents, e)
}
}

for _, event := range svcEvents {
ginkgo.By(fmt.Sprintf("1 event.Message = %v, event.Time = %v \n", event.Message, event.LastTimestamp.Time.UnixMicro()))
}

sort.Slice(svcEvents, func(i, j int) bool {
// return svcEvents[i].LastTimestamp.Time.UnixNano() > svcEvents[j].LastTimestamp.Time.UnixNano()
return svcEvents[i].LastTimestamp.After(svcEvents[j].LastTimestamp.Time)
})

for _, event := range svcEvents {
ginkgo.By(fmt.Sprintf("2 event.Message = %v, event.Time = %v \n", event.Message, event.LastTimestamp.Time.UnixMicro()))
}

sort.Slice(svcEvents, func(i, j int) bool {
return svcEvents[i].LastTimestamp.Time.UnixMicro() > svcEvents[j].LastTimestamp.Time.UnixMicro()
// return svcEvents[i].LastTimestamp.After(svcEvents[j].LastTimestamp.Time)
})

for _, event := range svcEvents {
ginkgo.By(fmt.Sprintf("3 event.Message = %v, event.Time = %v \n", event.Message, event.LastTimestamp.Time.UnixMicro()))
}

ginkgo.By("validating the service is announced from a different node")
gomega.Eventually(func() string {
node, err := k8s.GetSvcNode(cs, svc.Namespace, svc.Name, allNodes)
if err != nil {
return err.Error()
}
return node.Name
}, time.Minute, time.Second).ShouldNot(gomega.Equal(nodeToSet))

ginkgo.By("removing the NodeExcludeBalancers label of the node")
k8s.RemoveLabelFromNode(nodeToSet, corev1.LabelNodeExcludeBalancers, cs)

ginkgo.By("validating the service is announced back again from the previous node")
gomega.Eventually(func() string {
node, err := k8s.GetSvcNode(cs, svc.Namespace, svc.Name, allNodes)
if err != nil {
return err.Error()
}
return node.Name
}, time.Minute, time.Second).Should(gomega.Equal(nodeToSet))
})
})

ginkgo.Context("validate different AddressPools for type=Loadbalancer", func() {
Expand Down
1 change: 1 addition & 0 deletions e2etest/pkg/k8s/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func GetSvcNode(cs clientset.Interface, svcNS string, svcName string, allNodes *
}

sort.Slice(svcEvents, func(i, j int) bool {
// return svcEvents[i].LastTimestamp.Time.UnixNano() > svcEvents[j].LastTimestamp.Time.UnixNano()
return svcEvents[i].LastTimestamp.After(svcEvents[j].LastTimestamp.Time)
})

Expand Down
12 changes: 12 additions & 0 deletions internal/k8s/nodes/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,15 @@ func conditionStatus(n *corev1.Node, ct corev1.NodeConditionType) corev1.Conditi

return corev1.ConditionUnknown
}

// IsNodeExcludedFromBalancers returns true if the given node has labeld node.kubernetes.io/exclude-from-external-load-balancers".
func IsNodeExcludedFromBalancers(n *corev1.Node) bool {
if n == nil {
return false
}

if _, ok := n.Labels[corev1.LabelNodeExcludeBalancers]; ok {
return true
}
return false
}
6 changes: 6 additions & 0 deletions speaker/bgp_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,12 @@ func (c *bgpController) ShouldAnnounce(l log.Logger, name string, _ []net.IP, po
level.Debug(l).Log("event", "skipping should announce bgp", "service", name, "reason", "speaker's node has NodeNetworkUnavailable condition")
return "nodeNetworkUnavailable"
}

if k8snodes.IsNodeExcludedFromBalancers(nodes[c.myNode]) {
level.Debug(l).Log("event", "skipping should announce bgp", "service", name, "reason", "speaker's node has labeled 'node.kubernetes.io/exclude-from-external-load-balancers'")
return "nodeLabeledExcludeBalancers"
}

// Should we advertise?
// Yes, if externalTrafficPolicy is
// Cluster && any healthy endpoint exists
Expand Down
5 changes: 5 additions & 0 deletions speaker/layer2_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func speakersForPool(speakers map[string]bool, pool *config.Pool, nodes map[stri
if k8snodes.IsNetworkUnavailable(nodes[s]) {
continue
}

if k8snodes.IsNodeExcludedFromBalancers(nodes[s]) {
continue
}

if poolMatchesNodeL2(pool, s) {
res[s] = true
}
Expand Down
7 changes: 6 additions & 1 deletion speaker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (c *controller) SetConfig(l log.Logger, cfg *config.Config) controllers.Syn

func (c *controller) SetNode(l log.Logger, node *v1.Node) controllers.SyncState {
conditionChanged := isNetworkConditionChanged(node.Name, c.nodes, node)
labelNodeExcludeBalancersChanged := isLabelNodeExcludeBalancersChanged(node.Name, c.nodes, node)
c.nodes[node.Name] = node

for proto, handler := range c.protocolHandlers {
Expand All @@ -533,7 +534,7 @@ func (c *controller) SetNode(l log.Logger, node *v1.Node) controllers.SyncState
}
}

if conditionChanged {
if conditionChanged || labelNodeExcludeBalancersChanged {
return controllers.SyncStateReprocessAll
}

Expand All @@ -544,6 +545,10 @@ func isNetworkConditionChanged(nodeName string, oldNodes map[string]*v1.Node, ne
return k8snodes.IsNetworkUnavailable(oldNodes[nodeName]) != k8snodes.IsNetworkUnavailable(newNode)
}

func isLabelNodeExcludeBalancersChanged(nodeName string, oldNodes map[string]*v1.Node, newNode *v1.Node) bool {
return k8snodes.IsNodeExcludedFromBalancers(oldNodes[nodeName]) != k8snodes.IsNodeExcludedFromBalancers(newNode)
}

// A Protocol can advertise an IP address.
type Protocol interface {
SetConfig(log.Logger, *config.Config) error
Expand Down
15 changes: 14 additions & 1 deletion tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,8 +842,14 @@ def e2etest(ctx, name="kind", export=None, kubeconfig=None, system_namespaces="k
for ns in namespaces:
run("{} -n {} wait --for=condition=Ready --all pods --timeout 300s".format(kubectl_path, ns), hide=True)

"""
the control-plane node will be labeled "node.kubernetes.io/exclude-from-external-load-balancers" by default
when the cluster is created. and when https://github.com/metallb/metallb/pull/2073 was merged in, which will
affect the currently e2e tests. In order to code minimal changes, we should remove the label here.
"""
nodes = dont_exclude_from_lb(name)

if node_nics == "kind":
nodes = run("kind get nodes --name {name}".format(name=name)).stdout.strip().split("\n")
node_nics = _get_node_nics(nodes[0])

if local_nics == "kind":
Expand Down Expand Up @@ -880,6 +886,13 @@ def e2etest(ctx, name="kind", export=None, kubeconfig=None, system_namespaces="k
if testrun.failed:
raise Exit(message="E2E tests failed", code=testrun.return_code)

def dont_exclude_from_lb(name):
nodes = run("kind get nodes --name {name}".format(name=name)).stdout.strip().split("\n")
for node in nodes:
run("{} label nodes {} node.kubernetes.io/exclude-from-external-load-balancers-".format(kubectl_path,node), hide=True)

return nodes

@task
def bumplicense(ctx):
"""Bumps the license header on all go files that have it missing"""
Expand Down
1 change: 1 addition & 0 deletions website/content/release-notes/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ Chores:
- Metrics: add ipv4/6 addresses_in_use_total and addresses_total ([PR 2151](https://github.com/metallb/metallb/pull/2151))
- Squash the prefixes in FRR mode, avoiding duplicate prefixes in the FRR configuration ([PR 2234](https://github.com/metallb/metallb/pull/2234))
- BGP: move the matching peer logic one level up ([PR 2233](https://github.com/metallb/metallb/pull/2233))
- Implement NodeExcludeBalancers to exclude nodes as external loadbalancer ([PR 2073](https://github.com/metallb/metallb/pull/2073), [ISSUE 2021](https://github.com/metallb/metallb/issues/2021))

## Version 0.13.12

Expand Down

0 comments on commit 1751aab

Please sign in to comment.