Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reprocess services when an ip is available because of a service change #1645

Merged
merged 3 commits into from Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 10 additions & 3 deletions controller/main.go
Expand Up @@ -70,12 +70,20 @@ func (c *controller) SetBalancer(l log.Logger, name string, svcRo *v1.Service, _
// copy makes the code much easier to follow, and we have a GC for
// a reason.
svc := svcRo.DeepCopy()
successRes := controllers.SyncStateSuccess
wasAllocated := c.isServiceAllocated(name)
if !c.convergeBalancer(l, name, svc) {
return controllers.SyncStateError
}
if wasAllocated && !c.isServiceAllocated(name) { // convergeBalancer may deallocate our service and this means it did it.
// if the service was deallocated, it may have have left room
// for another one, so we reprocess
level.Info(l).Log("event", "serviceUpdated", "msg", "removed loadbalancer from service, services will be reprocessed")
successRes = controllers.SyncStateReprocessAll
}
if reflect.DeepEqual(svcRo, svc) {
level.Debug(l).Log("event", "noChange", "msg", "service converged, no change")
return controllers.SyncStateSuccess
return successRes
}

if !reflect.DeepEqual(svcRo.Status, svc.Status) {
Expand All @@ -88,8 +96,7 @@ func (c *controller) SetBalancer(l log.Logger, name string, svcRo *v1.Service, _
}
}
level.Info(l).Log("event", "serviceUpdated", "msg", "updated service object")

return controllers.SyncStateSuccess
return successRes
}

func (c *controller) deleteBalancer(l log.Logger, name string) {
Expand Down
6 changes: 5 additions & 1 deletion controller/service.go
Expand Up @@ -41,7 +41,7 @@ func (c *controller) convergeBalancer(l log.Logger, key string, svc *v1.Service)
var err error
// Not a LoadBalancer, early exit. It might have been a balancer
// in the past, so we still need to clear LB state.
if svc.Spec.Type != "LoadBalancer" {
if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
level.Debug(l).Log("event", "clearAssignment", "reason", "notLoadBalancer", "msg", "not a LoadBalancer")
c.clearServiceState(key, svc)
// Early return, we explicitly do *not* want to reallocate
Expand Down Expand Up @@ -220,6 +220,10 @@ func (c *controller) allocateIPs(key string, svc *v1.Service) ([]net.IP, error)
return c.ips.Allocate(key, serviceIPFamily, k8salloc.Ports(svc), k8salloc.SharingKey(svc), k8salloc.BackendKey(svc))
}

func (c *controller) isServiceAllocated(key string) bool {
return c.ips.Pool(key) != ""
}

func getDesiredLbIPs(svc *v1.Service) ([]net.IP, ipfamily.Family, error) {
var desiredLbIPs []net.IP
desiredLbIPsStr := svc.Annotations[annotationLoadBalancerIPs]
Expand Down
119 changes: 119 additions & 0 deletions e2etest/l2tests/assignment.go
@@ -0,0 +1,119 @@
// SPDX-License-Identifier:Apache-2.0

package l2tests

import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo"
"github.com/onsi/ginkgo/extensions/table"
"github.com/onsi/gomega"
metallbv1beta1 "go.universe.tf/metallb/api/v1beta1"
"go.universe.tf/metallb/e2etest/pkg/config"
"go.universe.tf/metallb/e2etest/pkg/k8s"
"go.universe.tf/metallb/e2etest/pkg/service"
internalconfig "go.universe.tf/metallb/internal/config"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/test/e2e/framework"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
admissionapi "k8s.io/pod-security-admission/api"
)

var _ = ginkgo.Describe("IP Assignment", func() {
var cs clientset.Interface

var f *framework.Framework
ginkgo.AfterEach(func() {
if ginkgo.CurrentGinkgoTestDescription().Failed {
k8s.DumpInfo(Reporter, ginkgo.CurrentGinkgoTestDescription().TestText)
}

// Clean previous configuration.
err := ConfigUpdater.Clean()
framework.ExpectNoError(err)
})

f = framework.NewDefaultFramework("assignment")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged

ginkgo.BeforeEach(func() {
cs = f.ClientSet

ginkgo.By("Clearing any previous configuration")
err := ConfigUpdater.Clean()
framework.ExpectNoError(err)
})

ginkgo.Context("IPV4 Assignment", func() {
table.DescribeTable("should remove the ip from a service assign it to a free one when", func(modify func(svc *v1.Service) error) {
ip, err := config.GetIPFromRangeByIndex(IPV4ServiceRange, 0)
framework.ExpectNoError(err)

resources := internalconfig.ClusterResources{
Pools: []metallbv1beta1.IPAddressPool{
{
ObjectMeta: metav1.ObjectMeta{
Name: "singleip-pool",
},
Spec: metallbv1beta1.IPAddressPoolSpec{
Addresses: []string{
fmt.Sprintf("%s/32", ip),
},
},
},
},
}
err = ConfigUpdater.Update(resources)
framework.ExpectNoError(err)

jig := e2eservice.NewTestJig(cs, f.Namespace.Name, "singleip")
svc, err := jig.CreateLoadBalancerService(10*time.Second, service.TrafficPolicyCluster)
framework.ExpectNoError(err)

ginkgo.By("Creating another service")
svc1, err := jig.CreateTCPService(func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeLoadBalancer
svc.Name = "singleip1"
})
framework.ExpectNoError(err)
gomega.Consistently(func() int {
s, err := cs.CoreV1().Services(svc1.Namespace).Get(context.Background(), svc1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
return len(s.Status.LoadBalancer.Ingress)
}, 5*time.Second, 1*time.Second).Should(gomega.BeZero())

err = modify(svc)
framework.ExpectNoError(err)

ginkgo.By("Changing the service type so the ip is free to be used again")
framework.ExpectNoError(err)

ginkgo.By("Checking the second service gets the ip assigned")

gomega.Eventually(func() string {
s, err := cs.CoreV1().Services(svc1.Namespace).Get(context.Background(), svc1.Name, metav1.GetOptions{})
framework.ExpectNoError(err)
if len(s.Status.LoadBalancer.Ingress) == 0 {
return ""
}
return s.Status.LoadBalancer.Ingress[0].IP
}, time.Minute, 1*time.Second).Should(gomega.Equal(ip))
},
table.Entry("changing the service type to clusterIP",
func(svc *v1.Service) error {
svc.Spec.Type = v1.ServiceTypeClusterIP
_, err := cs.CoreV1().Services(svc.Namespace).Update(context.Background(), svc, metav1.UpdateOptions{})
return err
}),
table.Entry("deleting the service",
func(svc *v1.Service) error {
err := cs.CoreV1().Services(svc.Namespace).Delete(context.Background(), svc.Name, metav1.DeleteOptions{})
return err
}))
})
})