Skip to content

Commit

Permalink
Merge pull request #151 from openshift-cherrypick-robot/cherry-pick-1…
Browse files Browse the repository at this point in the history
…49-to-release-4.12

[release-4.12] OCPBUGS-4847: Fix stale cache issue on createMachine
  • Loading branch information
openshift-merge-robot committed Dec 19, 2022
2 parents 3a90f74 + 714c9b4 commit a7723fa
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 79 deletions.
15 changes: 14 additions & 1 deletion cmd/control-plane-machine-set-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/klog/v2/klogr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/healthz"

cpmscontroller "github.com/openshift/cluster-control-plane-machine-set-operator/pkg/controllers/controlplanemachineset"
Expand All @@ -59,7 +60,7 @@ const (
unknownVersionValue = "unknown"
)

func main() { //nolint:funlen
func main() { //nolint:funlen,cyclop
scheme := runtime.NewScheme()
setupLog := ctrl.Log.WithName("setup")

Expand Down Expand Up @@ -119,8 +120,20 @@ func main() { //nolint:funlen
os.Exit(1)
}

// Define an uncached client.
// More resource intensive than the default client,
// is to be used only in situations where we want to avoid the cache.
// We specifically declare an uncached client.Client rather than a client.Reader
// for it to be wire compatible with the default client, so that we can easily
// override it as needed.
uncachedClient, err := client.New(cfg, client.Options{Scheme: mgr.GetScheme(), Mapper: mgr.GetRESTMapper()})
if err != nil {
setupLog.Error(err, "unable to set up uncached client")
}

if err := (&cpmscontroller.ControlPlaneMachineSetReconciler{
Client: mgr.GetClient(),
UncachedClient: client.NewNamespacedClient(uncachedClient, managedNamespace),
Scheme: mgr.GetScheme(),
Namespace: managedNamespace,
OperatorName: "control-plane-machine-set",
Expand Down
14 changes: 8 additions & 6 deletions pkg/controllers/controlplanemachineset/cluster_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ var _ = Describe("Cluster Operator Status with a running controller", func() {
Expect(err).ToNot(HaveOccurred(), "Manager should be able to be created")

reconciler := &ControlPlaneMachineSetReconciler{
Client: k8sClient,
Namespace: namespaceName,
OperatorName: operatorName,
Client: k8sClient,
UncachedClient: k8sClient,
Namespace: namespaceName,
OperatorName: operatorName,
}
Expect(reconciler.SetupWithManager(mgr)).To(Succeed(), "Reconciler should be able to setup with manager")

Expand Down Expand Up @@ -196,9 +197,10 @@ var _ = Describe("Cluster Operator Status", func() {
cpmsBuilder = resourcebuilder.ControlPlaneMachineSet().WithName(clusterControlPlaneMachineSetName).WithNamespace(namespaceName)

reconciler = &ControlPlaneMachineSetReconciler{
Client: k8sClient,
Namespace: namespaceName,
OperatorName: operatorName,
Client: k8sClient,
UncachedClient: k8sClient,
Namespace: namespaceName,
OperatorName: operatorName,
}

// CVO will create a blank cluster operator for us before the operator starts.
Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/controlplanemachineset/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ var (
// ControlPlaneMachineSetReconciler reconciles a ControlPlaneMachineSet object.
type ControlPlaneMachineSetReconciler struct {
client.Client
Scheme *runtime.Scheme
RESTMapper meta.RESTMapper
Scheme *runtime.Scheme
RESTMapper meta.RESTMapper
UncachedClient client.Client

// Namespace is the namespace in which the ControlPlaneMachineSet controller should operate.
// Any ControlPlaneMachineSet not in this namespace should be ignored.
Expand Down
28 changes: 16 additions & 12 deletions pkg/controllers/controlplanemachineset/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ var _ = Describe("With a running controller", func() {
Expect(err).ToNot(HaveOccurred(), "Manager should be able to be created")

reconciler := &ControlPlaneMachineSetReconciler{
Client: mgr.GetClient(),
Namespace: namespaceName,
OperatorName: operatorName,
Client: mgr.GetClient(),
UncachedClient: mgr.GetClient(),
Namespace: namespaceName,
OperatorName: operatorName,
}
Expect(reconciler.SetupWithManager(mgr)).To(Succeed(), "Reconciler should be able to setup with manager")

Expand Down Expand Up @@ -1073,9 +1074,10 @@ var _ = Describe("ensureFinalizer", func() {
namespaceName = ns.GetName()

reconciler = &ControlPlaneMachineSetReconciler{
Client: k8sClient,
Scheme: testScheme,
Namespace: namespaceName,
Client: k8sClient,
UncachedClient: k8sClient,
Scheme: testScheme,
Namespace: namespaceName,
}

// The ControlPlaneMachineSet should already exist by the time we get here.
Expand Down Expand Up @@ -1216,10 +1218,11 @@ var _ = Describe("ensureOwnerRefrences", func() {
namespaceName = ns.GetName()

reconciler = &ControlPlaneMachineSetReconciler{
Client: k8sClient,
Scheme: testScheme,
RESTMapper: testRESTMapper,
Namespace: namespaceName,
Client: k8sClient,
UncachedClient: k8sClient,
Scheme: testScheme,
RESTMapper: testRESTMapper,
Namespace: namespaceName,
}

// The ControlPlaneMachineSet should already exist by the time we get here.
Expand Down Expand Up @@ -1723,8 +1726,9 @@ var _ = Describe("validateClusterState", func() {
}

reconciler := &ControlPlaneMachineSetReconciler{
Client: k8sClient,
Namespace: namespaceName,
Client: k8sClient,
UncachedClient: k8sClient,
Namespace: namespaceName,
}

cpms := in.cpmsBuilder.Build()
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/controlplanemachineset/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ var _ = Describe("Status", func() {
By("Setting up the reconciler")
logger = test.NewTestLogger()
reconciler = &ControlPlaneMachineSetReconciler{
Namespace: namespaceName,
Scheme: testScheme,
Client: k8sClient,
Namespace: namespaceName,
Scheme: testScheme,
Client: k8sClient,
UncachedClient: k8sClient,
}

By("Setting up supporting resources")
Expand Down
66 changes: 59 additions & 7 deletions pkg/controllers/controlplanemachineset/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ const (
// attempting to delete replacement Machine.
errorDeletingMachine = "Error deleting machine"

// alreadyPresentReplacement is a log message used to inform the user that a new Machine was not created to
// replace an existing Machine, because a replacement has already been created.
alreadyPresentReplacement = "Replacement machine already present"

// invalidStrategyMessage is used to inform the user that they have provided an invalid value
// for the update strategy.
invalidStrategyMessage = "invalid value for spec.strategy.type"
Expand Down Expand Up @@ -353,7 +357,7 @@ func (r *ControlPlaneMachineSetReconciler) createOnDeleteReplacementMachines(ctx
// Trigger a Machine creation.
logger := logger.WithValues("index", idx, "namespace", r.Namespace, "name", unknownMachineName)

result, err := createMachine(ctx, logger, machineProvider, idx)
result, err := r.createMachine(ctx, logger, machineProvider, idx)
if err != nil {
return false, result, err
}
Expand All @@ -368,7 +372,7 @@ func (r *ControlPlaneMachineSetReconciler) createOnDeleteReplacementMachines(ctx

if isDeletedMachine(machines[0]) {
// if deleted create the replacement
result, err := createMachine(ctx, logger, machineProvider, idx)
result, err := r.createMachine(ctx, logger, machineProvider, idx)
if err != nil {
return false, result, err
}
Expand Down Expand Up @@ -399,7 +403,7 @@ func (r *ControlPlaneMachineSetReconciler) createRollingUpdateReplacementMachine
// Trigger a Machine creation.
logger := logger.WithValues("index", idx, "namespace", r.Namespace, "name", unknownMachineName)

result, err := createMachineWithSurge(ctx, logger, machineProvider, idx, maxSurge, surgeCount)
result, err := r.createMachineWithSurge(ctx, logger, machineProvider, idx, maxSurge, surgeCount)
if err != nil {
return false, result, err
}
Expand All @@ -415,7 +419,7 @@ func (r *ControlPlaneMachineSetReconciler) createRollingUpdateReplacementMachine
outdatedMachine := machinesNeedingReplacement[0]
logger := logger.WithValues("index", outdatedMachine.Index, "namespace", r.Namespace, "name", outdatedMachine.MachineRef.ObjectMeta.Name)

result, err := createMachineWithSurge(ctx, logger, machineProvider, outdatedMachine.Index, maxSurge, surgeCount)
result, err := r.createMachineWithSurge(ctx, logger, machineProvider, outdatedMachine.Index, maxSurge, surgeCount)
if err != nil {
return false, result, err
}
Expand All @@ -441,7 +445,22 @@ func deleteMachine(ctx context.Context, logger logr.Logger, machineProvider mach
}

// createMachine creates the Machine provided.
func createMachine(ctx context.Context, logger logr.Logger, machineProvider machineproviders.MachineProvider, idx int32) (ctrl.Result, error) {
func (r *ControlPlaneMachineSetReconciler) createMachine(ctx context.Context, logger logr.Logger, machineProvider machineproviders.MachineProvider, idx int32) (ctrl.Result, error) {
// Check if a replacement machine already exists and
// was not previously detected due to potential stale cache.
exists, err := r.checkForExistingReplacement(ctx, logger, machineProvider, idx)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error checking for existing replacement: %w", err)
}

if exists {
// A machine within the index for which we are about to
// create a replacement already has a replacement machine.
// This means the machine provider cache was stale when we previously checked.
// No need to create a replacement.
logger.V(2).Info(alreadyPresentReplacement)
}

if err := machineProvider.CreateMachine(ctx, logger, idx); err != nil {
werr := fmt.Errorf("error creating new Machine for index %d: %w", idx, err)
logger.Error(werr, errorCreatingMachine)
Expand All @@ -457,7 +476,7 @@ func createMachine(ctx context.Context, logger logr.Logger, machineProvider mach
// createMachineWithSurge creates the Machine provided while observing the surge count.
// This function will not create machines if the current surgeCount is greater
// than the maxSurge. If it does create a machine, it will increase the surgeCount.
func createMachineWithSurge(ctx context.Context, logger logr.Logger, machineProvider machineproviders.MachineProvider, idx int32, maxSurge int, surgeCount *int) (ctrl.Result, error) {
func (r *ControlPlaneMachineSetReconciler) createMachineWithSurge(ctx context.Context, logger logr.Logger, machineProvider machineproviders.MachineProvider, idx int32, maxSurge int, surgeCount *int) (ctrl.Result, error) {
// Check if a surge in Machines is allowed.
if *surgeCount >= maxSurge {
// No more room to surge
Expand All @@ -468,7 +487,7 @@ func createMachineWithSurge(ctx context.Context, logger logr.Logger, machineProv

// There is still room to surge,
// trigger a Replacement Machine creation.
result, err := createMachine(ctx, logger, machineProvider, idx)
result, err := r.createMachine(ctx, logger, machineProvider, idx)
if err != nil {
return result, err
}
Expand All @@ -478,6 +497,28 @@ func createMachineWithSurge(ctx context.Context, logger logr.Logger, machineProv
return result, nil
}

// checkForExistingReplacement checks with an uncached API client if a specific index,
// already has an existing, up to date, replacement machine.
func (r *ControlPlaneMachineSetReconciler) checkForExistingReplacement(ctx context.Context, logger logr.Logger, machineProvider machineproviders.MachineProvider, idx int32) (bool, error) {
// Define an uncached machine provider.
uncachedMachineProvider := machineProvider.WithClient(r.UncachedClient)

mInfos, err := uncachedMachineProvider.GetMachineInfos(ctx, logger)
if err != nil {
return false, fmt.Errorf("error getting Machines: %w", err)
}

for _, m := range mInfos {
if m.Index == idx && !m.NeedsUpdate && m.MachineRef.ObjectMeta.DeletionTimestamp == nil {
// An up to date Machine has been found for the specified index,
// meaning an updated replacement already exists for this index.
return true, nil
}
}

return false, nil
}

// isDeletedMachine checks if a machine is deleted.
func isDeletedMachine(m machineproviders.MachineInfo) bool {
return m.MachineRef.ObjectMeta.DeletionTimestamp != nil
Expand Down Expand Up @@ -604,6 +645,17 @@ func sortMachineInfosByIndex(indexedMachineInfos map[int32][]machineproviders.Ma
return slice
}

// machineInfosMaptoSlice returns a slice of MachineInfos from a map of MachineInfos slices.
func machineInfosMaptoSlice(indexedMachineInfos map[int32][]machineproviders.MachineInfo) []machineproviders.MachineInfo {
slice := []machineproviders.MachineInfo{}

for _, machines := range indexedMachineInfos {
slice = append(slice, machines...)
}

return slice
}

// deviseExistingSurge computes the current amount of replicas surge for the ControlPlaneMachineSet.
func deviseExistingSurge(cpms *machinev1.ControlPlaneMachineSet, mis []indexToMachineInfos) int {
desiredReplicas := int(*cpms.Spec.Replicas)
Expand Down
Loading

0 comments on commit a7723fa

Please sign in to comment.