Skip to content

Commit

Permalink
Merge pull request #3806 from sedefsavas/refactor_kcp_healthcheck
Browse files Browse the repository at this point in the history
馃尡Refactor controlplane health check in KCP
  • Loading branch information
k8s-ci-robot committed Oct 23, 2020
2 parents b1444af + 572e7f0 commit 3886ffa
Show file tree
Hide file tree
Showing 11 changed files with 363 additions and 365 deletions.
78 changes: 63 additions & 15 deletions controlplane/kubeadm/controllers/controller.go
Expand Up @@ -31,13 +31,6 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1alpha3"
kubeadmv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/types/v1beta1"
Expand All @@ -51,6 +44,12 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/util/secret"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
Expand Down Expand Up @@ -303,7 +302,13 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef())
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, controlPlane.Machines.ConditionGetters(), conditions.AddSourceRef())

// reconcileControlPlaneHealth returns err if there is a machine being deleted or if the control plane is unhealthy.
// If the control plane is not yet initialized, this call shouldn't fail.
if result, err := r.reconcileControlPlaneHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout()
Expand Down Expand Up @@ -379,6 +384,28 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)
logger.Info("Reconcile KubeadmControlPlane deletion")

// Ignore the health check results here as well as the errors, health check functions are to set health related conditions on Machines.
// Errors may be due to not being able to get workload cluster nodes.
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
r.Log.V(2).Info("Cannot get remote client to workload cluster during delete reconciliation", "err", err.Error())
} else {
// Do a health check of the Control Plane components
_, err = workloadCluster.ControlPlaneIsHealthy(ctx)
if err != nil {
// Do nothing
r.Log.V(2).Info("Control plane did not pass control plane health check during delete reconciliation", "err", err.Error())
}

// Do a health check of the etcd
_, err = workloadCluster.EtcdIsHealthy(ctx)
if err != nil {
// Do nothing
r.Log.V(2).Info("Control plane did not pass etcd health check during delete reconciliation", "err", err.Error())
}
}

// Gets all machines, not just control plane machines.
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -442,22 +469,40 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M
return nil
}

// reconcileHealth performs health checks for control plane components and etcd
// reconcileControlPlaneHealth performs health checks for control plane components and etcd
// It removes any etcd members that do not have a corresponding node.
// Also, as a final step, checks if there is any machines that is being deleted.
func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
// If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet.
if controlPlane.Machines.Len() == 0 {
return ctrl.Result{}, nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
// Failing at connecting to the workload cluster can mean workload cluster is unhealthy for a variety of reasons such as etcd quorum loss.
return ctrl.Result{}, errors.Wrap(err, "cannot get remote client to workload cluster")
}

errList := []error{}

// Do a health check of the Control Plane components
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
checkResult, err := workloadCluster.ControlPlaneIsHealthy(ctx)
if err != nil {
errList = append(errList, errors.Wrap(err, "failed to pass control-plane health check"))
} else if err := checkResult.Aggregate(controlPlane); err != nil {
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check to continue reconciliation: %v", err)
return ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}, nil
errList = append(errList, errors.Wrap(err, "failed to pass control-plane health check"))
}

// If KCP should manage etcd, ensure etcd is healthy.
if controlPlane.IsEtcdManaged() {
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
errList := []error{errors.Wrap(err, "failed to pass etcd health check")}
checkResult, err := workloadCluster.EtcdIsHealthy(ctx)
if err != nil {
errList = append(errList, errors.Wrap(err, "failed to pass etcd health check"))
} else if err := checkResult.Aggregate(controlPlane); err != nil {
errList = append(errList, errors.Wrap(err, "failed to pass etcd health check"))
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check to continue reconciliation: %v", err)
// If there are any etcd members that do not have corresponding nodes, remove them from etcd and from the kubeadm configmap.
Expand All @@ -468,10 +513,13 @@ func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, clu
} else if err := workloadCluster.ReconcileEtcdMembers(ctx); err != nil {
errList = append(errList, errors.Wrap(err, "failed attempt to remove potential hanging etcd members to pass etcd health check to continue reconciliation"))
}
return ctrl.Result{}, kerrors.NewAggregate(errList)
}
}

if len(errList) > 0 {
return ctrl.Result{}, kerrors.NewAggregate(errList)
}

// We need this check for scale up as well as down to avoid scaling up when there is a machine being deleted.
// This should be at the end of this method as no need to wait for machine to be completely deleted to reconcile etcd.
// TODO: Revisit during machine remediation implementation which may need to cover other machine phases.
Expand Down
61 changes: 36 additions & 25 deletions controlplane/kubeadm/controllers/controller_test.go
Expand Up @@ -396,9 +396,11 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
ControlPlaneHealthy: true,
EtcdHealthy: true,
Machines: internal.FilterableMachineCollection{},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
}
objs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -465,9 +467,11 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
ControlPlaneHealthy: true,
EtcdHealthy: true,
Machines: internal.FilterableMachineCollection{},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
}
objs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -572,17 +576,19 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com1"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
kcp.Spec.Version = version

now := metav1.Now()
kcp.DeletionTimestamp = &now

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
ControlPlaneHealthy: true,
EtcdHealthy: true,
Machines: internal.FilterableMachineCollection{},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
}
objs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
for i := 0; i < 3; i++ {
Expand Down Expand Up @@ -642,7 +648,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com2"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
kcp.Spec.Version = "v1.17.0"

Expand All @@ -665,8 +671,10 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
},
},
},
ControlPlaneHealthy: true,
EtcdHealthy: true,
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
}

fakeClient := newFakeClient(g, cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy(), fmc.Machines["test0"].DeepCopy())
Expand Down Expand Up @@ -1178,10 +1186,11 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: &fakeManagementCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{},
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
},
Log: log.Log,
recorder: record.NewFakeRecorder(32),
Expand Down Expand Up @@ -1230,10 +1239,11 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: &fakeManagementCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{},
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
},
Log: log.Log,
recorder: record.NewFakeRecorder(32),
Expand Down Expand Up @@ -1264,10 +1274,11 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {
r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: &fakeManagementCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{},
Management: &internal.Management{Client: fakeClient},
Workload: fakeWorkloadCluster{
ControlPlaneHealthy: true,
EtcdHealthy: true,
},
},
recorder: record.NewFakeRecorder(32),
Log: log.Log,
Expand Down
42 changes: 21 additions & 21 deletions controlplane/kubeadm/controllers/fakes_test.go
Expand Up @@ -30,12 +30,10 @@ import (

type fakeManagementCluster struct {
// TODO: once all client interactions are moved to the Management cluster this can go away
Management *internal.Management
ControlPlaneHealthy bool
EtcdHealthy bool
Machines internal.FilterableMachineCollection
Workload fakeWorkloadCluster
Reader client.Reader
Management *internal.Management
Machines internal.FilterableMachineCollection
Workload fakeWorkloadCluster
Reader client.Reader
}

func (f *fakeManagementCluster) Get(ctx context.Context, key client.ObjectKey, obj runtime.Object) error {
Expand All @@ -57,23 +55,11 @@ func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n clien
return f.Machines, nil
}

func (f *fakeManagementCluster) TargetClusterControlPlaneIsHealthy(_ context.Context, _ client.ObjectKey) error {
if !f.ControlPlaneHealthy {
return errors.New("control plane is not healthy")
}
return nil
}

func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _ client.ObjectKey) error {
if !f.EtcdHealthy {
return errors.New("etcd is not healthy")
}
return nil
}

type fakeWorkloadCluster struct {
*internal.Workload
Status internal.ClusterStatus
Status internal.ClusterStatus
ControlPlaneHealthy bool
EtcdHealthy bool
}

func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *clusterv1.Machine, _ *clusterv1.Machine) error {
Expand Down Expand Up @@ -112,6 +98,20 @@ func (f fakeWorkloadCluster) UpdateKubeletConfigMap(ctx context.Context, version
return nil
}

func (f fakeWorkloadCluster) ControlPlaneIsHealthy(ctx context.Context) (internal.HealthCheckResult, error) {
if !f.ControlPlaneHealthy {
return nil, errors.New("control plane is not healthy")
}
return nil, nil
}

func (f fakeWorkloadCluster) EtcdIsHealthy(ctx context.Context) (internal.HealthCheckResult, error) {
if !f.EtcdHealthy {
return nil, errors.New("etcd is not healthy")
}
return nil, nil
}

type fakeMigrator struct {
migrateCalled bool
migrateErr error
Expand Down
17 changes: 0 additions & 17 deletions controlplane/kubeadm/controllers/scale.go
Expand Up @@ -26,7 +26,6 @@ import (
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -63,11 +62,6 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

// reconcileHealth returns err if there is a machine being delete which is a required condition to check before scaling up
if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp()
Expand All @@ -90,10 +84,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
) (ctrl.Result, error) {
logger := controlPlane.Logger()

if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "Failed to create client to workload cluster")
Expand Down Expand Up @@ -123,13 +113,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
}
}

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}

}
if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
Expand Down

0 comments on commit 3886ffa

Please sign in to comment.