Skip to content
This repository is currently being migrated. It's locked while the migration is in progress.

report if etcd peer is unreachable #20

Merged
merged 2 commits into from
Jun 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/v1alpha1/etcdpeer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ type EtcdPeerStatus struct {
// ServerVersion contains the Member server version
ServerVersion string `json:"serverVersion"`

// Healthy states whether the peer is reachable from the controller
// +optional
Healthy bool `json:"healthy"`

// LastHealthy states when the peer was last healthy
// +optional
LastHealthy metav1.Time `json:"lastHealthy"`

// TLS configuration
// +optional
TLSEnabled bool `json:"tlsEnabled"`
Expand Down
4 changes: 3 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions config/bases/crd/bases/etcd.improbable.io_etcdpeers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,14 @@ spec:
status:
description: EtcdPeerStatus defines the observed state of EtcdPeer
properties:
healthy:
description: Healthy states whether the peer is reachable from the
controller
type: boolean
lastHealthy:
description: LastHealthy states when the peer was last healthy
format: date-time
type: string
serverVersion:
description: ServerVersion contains the Member server version
type: string
Expand Down
15 changes: 15 additions & 0 deletions config/rbac/role.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

170 changes: 165 additions & 5 deletions controllers/etcdpeer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ import (
"strings"
"time"

"github.com/pkg/errors"

storage "k8s.io/api/storage/v1"

"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -40,17 +46,23 @@ type EtcdPeerReconciler struct {
Log logr.Logger
EtcdRepository string
Etcd etcd.APIBuilder
Recorder record.EventRecorder
}

const (
peerLabel = "etcd.improbable.io/peer-name"
pvcCleanupFinalizer = "etcdpeer.etcd.improbable.io/pvc-cleanup"
)

var NodeNotOffline = errors.New("node not offline")

// +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers,verbs=get;list;watch;patch
// +kubebuilder:rbac:groups=etcd.improbable.io,resources=etcdpeers/status;etcdpeers/finalizers,verbs=get;update;patch
// +kubebuilder:rbac:groups=apps,resources=replicasets,verbs=list;get;create;watch
// +kubebuilder:rbac:groups=core,resources=pods,verbs=list;delete
// +kubebuilder:rbac:groups=core,resources=persistentvolumeclaims,verbs=list;get;create;watch;delete
// +kubebuilder:rbac:groups=storage,resources=volumeattachments,verbs=list;delete
// +kubebuilder:rbac:groups=core,resources=nodes,verbs=list;get

func initialMemberURL(member etcdv1alpha1.InitialClusterMember, etcdScheme string) *url.URL {
return &url.URL{
Expand Down Expand Up @@ -580,6 +592,133 @@ func (r *EtcdPeerReconciler) updateStatus(peer *etcdv1alpha1.EtcdPeer, serverVer
peer.Status.ServerVersion = serverVersion
}

func (r *EtcdPeerReconciler) updateHealth(peer *etcdv1alpha1.EtcdPeer, healthy bool) {
peer.Status.Healthy = healthy
}

func (r *EtcdPeerReconciler) updateLastHealthy(peer *etcdv1alpha1.EtcdPeer, lastHealthy time.Time) {
peer.Status.LastHealthy = metav1.NewTime(lastHealthy)
}

func (r *EtcdPeerReconciler) getPVCForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.PersistentVolumeClaim, error) {
expectedPvc := pvcForPeer(peer)
expectedPvcNamespacedName := client.ObjectKeyFromObject(expectedPvc)

var actualPvc corev1.PersistentVolumeClaim
err := r.Client.Get(ctx, expectedPvcNamespacedName, &actualPvc)
if err != nil {
return nil, err
}

return &actualPvc, nil
}

func (r *EtcdPeerReconciler) findPodForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*corev1.Pod, error) {
var podList corev1.PodList
err := r.Client.List(ctx, &podList, &client.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{
peerLabel: peer.Name,
clusterLabel: peer.Spec.ClusterName,
}),
Namespace: peer.Namespace,
})

if err != nil {
return nil, errors.Wrap(err, "failed to list pods")
}

if len(podList.Items) != 1 {
return nil, errors.Errorf("number of pods for peer not as expected, found %v, expected to find 1", len(podList.Items))
}

return &podList.Items[0], nil
}

func (r *EtcdPeerReconciler) findVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) (*storage.VolumeAttachment, error) {
pvc, err := r.getPVCForPeer(ctx, peer)
if err != nil {
return nil, errors.Wrap(err, "failed to find pvc whilst looking for volume attachment")
}

volumeAttachments := &storage.VolumeAttachmentList{}
err = r.Client.List(ctx, volumeAttachments, &client.ListOptions{
// VolumeAttachments aren't namespaced (i.e they all exist in the "" namespace)
// so this needs to be here - otherwise it defaults to "default", which doesn't contain any VolumeAttachments
Namespace: "",
})
if err != nil {
return nil, errors.Wrap(err, "failed to list volume attachments")
}

for _, va := range volumeAttachments.Items {
if va.Spec.Source.PersistentVolumeName != nil && *va.Spec.Source.PersistentVolumeName == pvc.Spec.VolumeName {
return &va, nil
}
}
return nil, fmt.Errorf("failed to find volume attachment for pvc: %v", pvc)
}

func (r *EtcdPeerReconciler) cleanupOrphanedVolumeAttachmentForPeer(ctx context.Context, peer *etcdv1alpha1.EtcdPeer, expectedNodeName string) error {
va, err := r.findVolumeAttachmentForPeer(ctx, peer)
if err != nil {
return errors.Wrap(err, "failed to find volume attachment to delete")
}

// perform a sanity check to ensure we're deleting the old volume attachment and not the new one
if va.Spec.NodeName != expectedNodeName {
return errors.New("volume attachment is not on the expected node - not deleting")
}

gracePeriod := int64(0)
return r.Client.Delete(ctx, va, &client.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
}

// restartUnhealthyPod attempts to kickstart a new etcd pod It does this by
// deleting the volume attachment to the old associated pod (the volume
// attachment can take a while to be deleted in the event of a node going down
// which is why we do it manually). It then deletes the old associated pod
func (r *EtcdPeerReconciler) restartUnhealthyPod(ctx context.Context, peer *etcdv1alpha1.EtcdPeer) error {
pod, err := r.findPodForPeer(ctx, peer)
if err != nil {
return errors.Wrap(err, "failed to find pod for peer")
}

nodeKey := client.ObjectKey{Namespace: "", Name: pod.Spec.NodeName}
var node corev1.Node
err = r.Client.Get(ctx, nodeKey, &node)
if err != nil {
return errors.Wrap(err, "failed to get node hosting broken etcd peer")
}
nodeOffline := false
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady && condition.Status != corev1.ConditionTrue {
nodeOffline = true
break
}
}

if !nodeOffline {
return NodeNotOffline
}

err = r.cleanupOrphanedVolumeAttachmentForPeer(ctx, peer, pod.Spec.NodeName)
if err != nil {
return errors.Wrap(err, "failed to cleanup volume attachment for unhealthy pod")
}

gracePeriod := int64(0)
err = r.Client.Delete(ctx, pod, &client.DeleteOptions{
GracePeriodSeconds: &gracePeriod,
})
if err != nil {
return errors.Wrap(err, "failed to delete unhealthy pod")
}

return nil
}

func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
Expand Down Expand Up @@ -646,23 +785,44 @@ func (r *EtcdPeerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
TLS: tlsConfig,
}

// Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made
// to the etcd membership API.
// TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter`
result := ctrl.Result{RequeueAfter: time.Second * 10}

var healthy bool
lastHealthy := peer.Status.LastHealthy.Time

if c, err := r.Etcd.New(etcdConfig); err != nil {
log.Error(err, "Unable to connect to etcd")
healthy = false
r.Recorder.Event(&peer, "Warning", "MemberConnectionFailure", "Cannot connect to etcd member")
if lastHealthy.IsZero() {
log.Info("Peer has never been healthy")
} else if time.Since(lastHealthy) > time.Second*30 {
log.Info("Peer unhealthy for more than 30s - attempting to delete pod", "unhealthy_time", time.Since(lastHealthy).Seconds())
err := r.restartUnhealthyPod(ctx, &peer)
if err != nil {
log.Error(err, "failed to restart unhealthy pod - continuing")
} else {
r.updateLastHealthy(&peer, time.Time{})
}

}
} else {
defer c.Close()
if version, err := c.GetVersion(ctx); err != nil {
log.Error(err, "Unable to get Etcd version", "error", err)
} else {
serverVersion = version.Server
}
healthy = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very very nitpick but in case the c.GetVersion takes a long time for some reason it would be better to have recorded the lastHealthy just before that call

r.updateLastHealthy(&peer, time.Now())

}

r.updateStatus(&peer, serverVersion)

// Always requeue after ten seconds, as we don't watch on the membership list. So we don't auto-detect changes made
// to the etcd membership API.
// TODO(#76) Implement custom watch on etcd membership API, and remove this `requeueAfter`
result := ctrl.Result{RequeueAfter: time.Second * 10}
r.updateHealth(&peer, healthy)

// Check if the peer has been marked for deletion
if !peer.ObjectMeta.DeletionTimestamp.IsZero() {
Expand Down
1 change: 1 addition & 0 deletions controllers/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *controllerSuite) setupTest(t *testing.T, etcdAPI etcd.APIBuilder) (tear
Log: logger.WithName("EtcdPeer"),
Etcd: etcdAPI,
EtcdRepository: "quay.io/coreos/etcd",
Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"),
}
err = peerController.SetupWithManager(mgr)
require.NoError(t, err, "failed to set up EtcdPeer controller")
Expand Down
6 changes: 4 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@ func main() {
))

if err = (&controllers.EtcdPeerReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"),
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("EtcdPeer"),
Recorder: mgr.GetEventRecorderFor("etcdpeer-reconciler"),

Etcd: &etcd.ClientEtcdAPIBuilder{},
EtcdRepository: etcdRepository,
}).SetupWithManager(mgr); err != nil {
Expand Down