Skip to content

Commit

Permalink
Make the controller preserve state on reboot
Browse files Browse the repository at this point in the history
This commit changes the behavior of the service reconciler
to fix a bug that the controller de-assign an ip for a service
after reboot.

Make the service reconciler initially ignore the services,
up until the first reprocessAll event finishes, where we sort
and handle all of the services with assigned IP first.
By doing so, we make the controller aware of the LB services with
existing external IPs and sync the internal state.
Only after we reprocessed all services once, and know what
services are allocated and what ips are in use, return to
work as normal.

Add unit tests for the service controller

Add unit test cases to cover the FirstCongifurtaion flag.
Testcase 1: Testing the service reconcile with the flag set to true.
Testcase 2: Testing the reprocessAll with the flag set to true:
validate that the value is modifeid to false by the controller.

Signed-off-by: liornoy <lnoy@redhat.com>
  • Loading branch information
liornoy committed Nov 1, 2023
1 parent 6ec5dfa commit c60f711
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 9 deletions.
9 changes: 9 additions & 0 deletions internal/k8s/controllers/service_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ type ServiceReconciler struct {
Endpoints NeedEndPoints
LoadBalancerClass string
Reload chan event.GenericEvent
// initialLoadPerformed is set after the first time we call reprocessAll.
// This is required because we want the first time we load the services to follow the assigned first, non assigned later order.
// This allows avoiding to have services with already assigned IP to get their IP stolen by other services.
initialLoadPerformed bool
}

func (r *ServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand All @@ -63,6 +67,11 @@ func (r *ServiceReconciler) reconcileService(ctx context.Context, req ctrl.Reque

var service *v1.Service

if !r.initialLoadPerformed {
level.Debug(r.Logger).Log("controller", "ServiceReconciler", "message", "filtered service, still waiting for the initial load to be performed")
return ctrl.Result{}, nil
}

service, err := r.serviceFor(ctx, req.NamespacedName)
if err != nil {
level.Error(r.Logger).Log("controller", "ServiceReconciler", "message", "failed to get service", "service", req.NamespacedName, "error", err)
Expand Down
11 changes: 10 additions & 1 deletion internal/k8s/controllers/service_controller_reload.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"sort"

"github.com/go-kit/log/level"

Expand Down Expand Up @@ -67,8 +68,14 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, err
}

// Make it process the already assigned services first
sortedServices := services.Items
sort.Slice(sortedServices, func(i, j int) bool {
return len(sortedServices[i].Status.LoadBalancer.Ingress) > len(sortedServices[j].Status.LoadBalancer.Ingress)
})

retry := false
for _, service := range services.Items {
for _, service := range sortedServices {
service := service // so we can use &service
if filterByLoadBalancerClass(&service, r.LoadBalancerClass) {
level.Debug(r.Logger).Log("controller", "ServiceReconciler", "filtered service", req.NamespacedName)
Expand Down Expand Up @@ -101,6 +108,8 @@ func (r *ServiceReconciler) reprocessAll(ctx context.Context, req ctrl.Request)
level.Info(r.Logger).Log("controller", "ServiceReconciler - reprocessAll", "event", "force service reload")
return ctrl.Result{}, errRetry
}
r.initialLoadPerformed = true

return ctrl.Result{}, nil
}

Expand Down
53 changes: 45 additions & 8 deletions internal/k8s/controllers/service_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll bool
expectReconcileFails bool
expectForceReloadCalled bool
initialLoadPerformed bool
}{
{
desc: "call reconcileService, handler returns SyncStateSuccess",
Expand All @@ -79,6 +80,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, handler returns SyncStateSuccess - with endpoints",
Expand All @@ -88,6 +90,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, handler returns SyncStateSuccess - with endpointSlices",
Expand All @@ -97,6 +100,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, handler returns SyncStateError",
Expand All @@ -106,6 +110,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: true,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, handler returns SyncStateErrorNoRetry",
Expand All @@ -115,6 +120,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, handler returns SyncStateReprocessAll",
Expand All @@ -124,6 +130,17 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: true,
initialLoadPerformed: true,
},
{
desc: "call reconcileService, initialLoadPerformed initiated to false",
handlerRes: SyncStateReprocessAll,
needEndPoints: NoNeed,
initObjects: []client.Object{testService},
shouldReprocessAll: false,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: false,
},
{
desc: "call reprocessAll, handler returns SyncStateSuccess",
Expand All @@ -133,6 +150,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, handler returns SyncStateSuccess - with endpoints",
Expand All @@ -142,6 +160,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, handler returns SyncStateSuccess - with endpointSlices",
Expand All @@ -151,6 +170,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, handler returns SyncStateError",
Expand All @@ -160,6 +180,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: true,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, handler returns SyncStateErrorNoRetry",
Expand All @@ -169,6 +190,7 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, handler returns SyncStateReprocessAll",
Expand All @@ -178,6 +200,17 @@ func TestServiceController(t *testing.T) {
shouldReprocessAll: true,
expectReconcileFails: true,
expectForceReloadCalled: false,
initialLoadPerformed: true,
},
{
desc: "call reprocessAll, initialLoadPerformed initiated to false",
handlerRes: SyncStateSuccess,
needEndPoints: NoNeed,
initObjects: []client.Object{testService},
shouldReprocessAll: true,
expectReconcileFails: false,
expectForceReloadCalled: false,
initialLoadPerformed: false,
},
}
for _, test := range tests {
Expand Down Expand Up @@ -207,15 +240,16 @@ func TestServiceController(t *testing.T) {
mockReload := make(chan event.GenericEvent, 1)

r := &ServiceReconciler{
Client: fakeClient,
Logger: log.NewNopLogger(),
Scheme: scheme,
Namespace: testNamespace,
Handler: mockHandler,
Endpoints: test.needEndPoints,
Reload: mockReload,
Client: fakeClient,
Logger: log.NewNopLogger(),
Scheme: scheme,
Namespace: testNamespace,
Handler: mockHandler,
Endpoints: test.needEndPoints,
Reload: mockReload,
initialLoadPerformed: false,
}

r.initialLoadPerformed = test.initialLoadPerformed
var req reconcile.Request
if test.shouldReprocessAll {
req = reconcile.Request{
Expand Down Expand Up @@ -254,6 +288,9 @@ func TestServiceController(t *testing.T) {
t.Errorf("test %s failed: call force reload expected: %v, got: %v",
test.desc, test.expectForceReloadCalled, calledForceReload)
}
if test.shouldReprocessAll && !r.initialLoadPerformed {
t.Errorf("test %s failed: reconciler's initialLoadPerformed flag didn't change to true", test.desc)
}
}
}

Expand Down

0 comments on commit c60f711

Please sign in to comment.