From 9e0083e0806b1afc3e0bf58b5668a10d5cb314ee Mon Sep 17 00:00:00 2001 From: Eric Wolinetz Date: Wed, 11 Sep 2019 13:25:36 -0500 Subject: [PATCH] for nodes that have been deleted during upgrade, recreate them --- pkg/k8shandler/deployment.go | 16 ++++++++++++++++ pkg/k8shandler/nodetypefactory.go | 1 + pkg/k8shandler/persistentvolumeclaims.go | 24 +++++++++++++++++++----- pkg/k8shandler/statefulset.go | 17 +++++++++++++++++ 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/pkg/k8shandler/deployment.go b/pkg/k8shandler/deployment.go index 4c6236b7b..d9591c75c 100644 --- a/pkg/k8shandler/deployment.go +++ b/pkg/k8shandler/deployment.go @@ -295,6 +295,17 @@ func (node *deploymentNode) waitForNodeLeaveCluster() (error, bool) { return err, (err == nil) } +func (node *deploymentNode) isMissing() bool { + getNode := &apps.Deployment{} + if getErr := node.client.Get(context.TODO(), types.NamespacedName{Name: node.name(), Namespace: node.self.Namespace}, getNode); getErr != nil { + if errors.IsNotFound(getErr) { + return true + } + } + + return false +} + func (node *deploymentNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) { if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { @@ -351,6 +362,11 @@ func (node *deploymentNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) if upgradeStatus.UpgradeStatus.UpgradePhase == api.NodeRestarting { + // if the node doesn't exist -- create it + if node.isMissing() { + node.create() + } + if err := node.setReplicaCount(1); err != nil { logrus.Warnf("Unable to scale up %v", node.name()) return diff --git a/pkg/k8shandler/nodetypefactory.go b/pkg/k8shandler/nodetypefactory.go index a12924082..af725472a 100644 --- a/pkg/k8shandler/nodetypefactory.go +++ b/pkg/k8shandler/nodetypefactory.go @@ -19,6 +19,7 @@ type NodeTypeInterface interface { name() string updateReference(node NodeTypeInterface) delete() + isMissing() bool } // NodeTypeFactory is a factory to construct either statefulset or deployment diff --git a/pkg/k8shandler/persistentvolumeclaims.go b/pkg/k8shandler/persistentvolumeclaims.go index 12a57a008..f85f7416f 100644 --- a/pkg/k8shandler/persistentvolumeclaims.go +++ b/pkg/k8shandler/persistentvolumeclaims.go @@ -4,8 +4,10 @@ import ( "context" "fmt" + "github.com/sirupsen/logrus" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,11 +15,23 @@ import ( func createOrUpdatePersistentVolumeClaim(pvc v1.PersistentVolumeClaimSpec, newName, namespace string, client client.Client) error { - claim := createPersistentVolumeClaim(newName, namespace, pvc) - err := client.Create(context.TODO(), claim) - if err != nil { - if !errors.IsAlreadyExists(err) { - return fmt.Errorf("Unable to create PVC: %v", err) + // for some reason if the PVC already exists but creating it again would violate + // quota we get an error regarding quota not that it already exists + // so check to see if it already exists + claim := &v1.PersistentVolumeClaim{} + + if getErr := client.Get(context.TODO(), types.NamespacedName{Name: newName, Namespace: namespace}, claim); getErr != nil { + if errors.IsNotFound(getErr) { + claim = createPersistentVolumeClaim(newName, namespace, pvc) + err := client.Create(context.TODO(), claim) + if err != nil { + if !errors.IsAlreadyExists(err) { + return fmt.Errorf("Unable to create PVC: %v", err) + } + } + } else { + logrus.Debugf("Could not get PVC %v: %v", newName, getErr) + return getErr } } diff --git a/pkg/k8shandler/statefulset.go b/pkg/k8shandler/statefulset.go index 27826490d..2f9c54a13 100644 --- a/pkg/k8shandler/statefulset.go +++ b/pkg/k8shandler/statefulset.go @@ -219,6 +219,17 @@ func (node *statefulSetNode) replicaCount() (int32, error) { return desired.Status.Replicas, nil } +func (node *statefulSetNode) isMissing() bool { + getNode := &apps.StatefulSet{} + if getErr := node.client.Get(context.TODO(), types.NamespacedName{Name: node.name(), Namespace: node.self.Namespace}, getNode); getErr != nil { + if errors.IsNotFound(getErr) { + return true + } + } + + return false +} + func (node *statefulSetNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) { if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { @@ -254,6 +265,12 @@ func (node *statefulSetNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) if upgradeStatus.UpgradeStatus.UpgradePhase == api.NodeRestarting { + // if the node doesn't exist -- create it + // TODO: we can skip this logic after + if node.isMissing() { + node.create() + } + ordinal, err := node.partition() if err != nil { logrus.Infof("Unable to get node ordinal value: %v", err)