From e8a8c8336faa9fc2d1ceb21978372948a63f3e28 Mon Sep 17 00:00:00 2001 From: Eric Wolinetz Date: Wed, 24 Jul 2019 17:11:07 -0500 Subject: [PATCH] perform a full cluster restart in case of cert redeploy --- hack/test-e2e.sh | 3 +- pkg/apis/logging/v1/elasticsearch_types.go | 9 +- pkg/k8shandler/cluster.go | 225 ++++++++++++++++----- pkg/k8shandler/deployment.go | 68 ++++++- pkg/k8shandler/nodetypefactory.go | 3 +- pkg/k8shandler/statefulset.go | 79 +++++++- pkg/k8shandler/status.go | 33 ++- test/e2e/elasticsearch_test.go | 88 +++++++- test/utils/utils.go | 75 +++++++ 9 files changed, 494 insertions(+), 89 deletions(-) diff --git a/hack/test-e2e.sh b/hack/test-e2e.sh index f9d68c1bf..7754c5559 100755 --- a/hack/test-e2e.sh +++ b/hack/test-e2e.sh @@ -51,6 +51,7 @@ TEST_NAMESPACE=${TEST_NAMESPACE} go test ./test/e2e/... \ -namespacedMan ${manifest} \ -v \ -parallel=1 \ - -singleNamespace + -singleNamespace \ + -timeout 900s oc delete namespace ${TEST_NAMESPACE} diff --git a/pkg/apis/logging/v1/elasticsearch_types.go b/pkg/apis/logging/v1/elasticsearch_types.go index 9066de59b..c7c0bb51e 100644 --- a/pkg/apis/logging/v1/elasticsearch_types.go +++ b/pkg/apis/logging/v1/elasticsearch_types.go @@ -113,10 +113,11 @@ type ElasticsearchNodeStatus struct { } type ElasticsearchNodeUpgradeStatus struct { - ScheduledForUpgrade v1.ConditionStatus `json:"scheduledUpgrade,omitempty"` - ScheduledForRedeploy v1.ConditionStatus `json:"scheduledRedeploy,omitempty"` - UnderUpgrade v1.ConditionStatus `json:"underUpgrade,omitempty"` - UpgradePhase ElasticsearchUpgradePhase `json:"upgradePhase,omitempty"` + ScheduledForUpgrade v1.ConditionStatus `json:"scheduledUpgrade,omitempty"` + ScheduledForRedeploy v1.ConditionStatus `json:"scheduledRedeploy,omitempty"` + ScheduledForCertRedeploy v1.ConditionStatus `json:"scheduledCertRedeploy,omitempty"` + UnderUpgrade v1.ConditionStatus `json:"underUpgrade,omitempty"` + UpgradePhase ElasticsearchUpgradePhase `json:"upgradePhase,omitempty"` } // ClusterCondition contains details for the current condition of this elasticsearch cluster. diff --git a/pkg/k8shandler/cluster.go b/pkg/k8shandler/cluster.go index fb320ae0e..2246e707c 100644 --- a/pkg/k8shandler/cluster.go +++ b/pkg/k8shandler/cluster.go @@ -44,6 +44,10 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu elasticsearchRequest.getNodes() progressUnshedulableNodes(elasticsearchRequest.cluster) + err = elasticsearchRequest.performFullClusterRestart() + if err != nil { + return elasticsearchRequest.UpdateClusterStatus() + } // if there is a node currently being upgraded, work on that first upgradeInProgressNode := getNodeUpgradeInProgress(elasticsearchRequest.cluster) @@ -51,28 +55,18 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu if upgradeInProgressNode != nil { clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy() - index, nodeStatus := getNodeStatus(upgradeInProgressNode.name(), clusterStatus) + _, nodeStatus := getNodeStatus(upgradeInProgressNode.name(), clusterStatus) if _, ok := containsNodeTypeInterface(upgradeInProgressNode, scheduledUpgradeNodes); ok { logrus.Debugf("Continuing update for %v", upgradeInProgressNode.name()) upgradeInProgressNode.update(nodeStatus) } else { logrus.Debugf("Continuing restart for %v", upgradeInProgressNode.name()) - upgradeInProgressNode.restart(nodeStatus) - } - - nodeState := upgradeInProgressNode.state() - - nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade - nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy - - if index == NOT_FOUND_INDEX { - clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus) - } else { - clusterStatus.Nodes[index] = *nodeStatus + upgradeInProgressNode.rollingRestart(nodeStatus) } - elasticsearchRequest.updateNodeStatus(*clusterStatus) + addNodeState(upgradeInProgressNode, nodeStatus) + elasticsearchRequest.setNodeStatus(upgradeInProgressNode, nodeStatus, clusterStatus) } else { @@ -80,21 +74,12 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu for _, node := range scheduledUpgradeNodes { logrus.Debugf("Perform a update for %v", node.name()) clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy() - index, nodeStatus := getNodeStatus(node.name(), clusterStatus) + _, nodeStatus := getNodeStatus(node.name(), clusterStatus) err := node.update(nodeStatus) - nodeState := node.state() - - nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade - nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy - - if index == NOT_FOUND_INDEX { - clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus) - } else { - clusterStatus.Nodes[index] = *nodeStatus - } - elasticsearchRequest.updateNodeStatus(*clusterStatus) + addNodeState(node, nodeStatus) + elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus) if err != nil { logrus.Warnf("Error occurred while updating node %v: %v", node.name(), err) @@ -110,28 +95,19 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu for _, node := range scheduledRedeployNodes { logrus.Debugf("Perform a redeploy for %v", node.name()) clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy() - index, nodeStatus := getNodeStatus(node.name(), clusterStatus) + _, nodeStatus := getNodeStatus(node.name(), clusterStatus) - node.restart(nodeStatus) - nodeState := node.state() + node.rollingRestart(nodeStatus) - nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade - nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy - - if index == NOT_FOUND_INDEX { - clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus) - } else { - clusterStatus.Nodes[index] = *nodeStatus - } - - elasticsearchRequest.updateNodeStatus(*clusterStatus) + addNodeState(node, nodeStatus) + elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus) } } else { for _, node := range nodes[nodeMapKey(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace)] { clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy() - index, nodeStatus := getNodeStatus(node.name(), clusterStatus) + _, nodeStatus := getNodeStatus(node.name(), clusterStatus) // Verify that we didn't scale up too many masters if err := elasticsearchRequest.isValidConf(); err != nil { @@ -147,23 +123,9 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu if err := node.create(); err != nil { return err } - nodeState := node.state() - - nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade - nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy - - if index == NOT_FOUND_INDEX { - // this is a new status, just append - nodeStatus.DeploymentName = nodeState.DeploymentName - nodeStatus.StatefulSetName = nodeState.StatefulSetName - clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus) - } else { - // this is an existing status, update in place - clusterStatus.Nodes[index] = *nodeStatus - } - // update status here - elasticsearchRequest.updateNodeStatus(*clusterStatus) + addNodeState(node, nodeStatus) + elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus) elasticsearchRequest.updateMinMasters() } @@ -366,6 +328,53 @@ func getScheduledRedeployOnlyNodes(cluster *api.Elasticsearch) []NodeTypeInterfa return redeployNodes } +func getScheduledCertRedeployNodes(cluster *api.Elasticsearch) []NodeTypeInterface { + redeployCertNodes := []NodeTypeInterface{} + dataNodes := []NodeTypeInterface{} + + for _, node := range cluster.Status.Nodes { + if node.UpgradeStatus.ScheduledForCertRedeploy == v1.ConditionTrue { + for _, nodeTypeInterface := range nodes[nodeMapKey(cluster.Name, cluster.Namespace)] { + if node.DeploymentName == nodeTypeInterface.name() { + dataNodes = append(dataNodes, nodeTypeInterface) + } + + if node.StatefulSetName == nodeTypeInterface.name() { + redeployCertNodes = append(redeployCertNodes, nodeTypeInterface) + } + } + } + } + + redeployCertNodes = append(redeployCertNodes, dataNodes...) + + return redeployCertNodes +} + +func addNodeState(node NodeTypeInterface, nodeStatus *api.ElasticsearchNodeStatus) { + + nodeState := node.state() + + nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade + nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy + nodeStatus.UpgradeStatus.ScheduledForCertRedeploy = nodeState.UpgradeStatus.ScheduledForCertRedeploy + nodeStatus.DeploymentName = nodeState.DeploymentName + nodeStatus.StatefulSetName = nodeState.StatefulSetName +} + +func (elasticsearchRequest *ElasticsearchRequest) setNodeStatus(node NodeTypeInterface, nodeStatus *api.ElasticsearchNodeStatus, clusterStatus *api.ElasticsearchStatus) { + + index, _ := getNodeStatus(node.name(), clusterStatus) + + if index == NOT_FOUND_INDEX { + clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus) + } else { + clusterStatus.Nodes[index] = *nodeStatus + } + + elasticsearchRequest.updateNodeStatus(*clusterStatus) +} + func (elasticsearchRequest *ElasticsearchRequest) updateNodeStatus(status api.ElasticsearchStatus) error { cluster := elasticsearchRequest.cluster @@ -399,3 +408,107 @@ func (elasticsearchRequest *ElasticsearchRequest) updateNodeStatus(status api.El return nil } + +// Full cluster restart is required when certs need to be refreshed +// this is a very high priority action since the cluster may be fractured/unusable +// in the case where certs aren't all rolled out correctly or are expired +func (elasticsearchRequest *ElasticsearchRequest) performFullClusterRestart() error { + + // make sure we have nodes that are scheduled for full cluster restart first + certRedeployNodes := getScheduledCertRedeployNodes(elasticsearchRequest.cluster) + clusterStatus := &elasticsearchRequest.cluster.Status + + // 1 -- precheck + // no restarting conditions set + if len(certRedeployNodes) > 0 { + + if containsClusterCondition(api.Restarting, v1.ConditionFalse, clusterStatus) && + containsClusterCondition(api.UpdatingSettings, v1.ConditionFalse, clusterStatus) { + + // We don't want to gate on cluster health -- we may be in a bad state + // because pods aren't all available + logrus.Infof("Beginning full cluster restart for cert redeploy on %v", elasticsearchRequest.cluster.Name) + + // set conditions here for next check + updateUpdatingSettingsCondition(clusterStatus, v1.ConditionTrue) + } + + // 2 -- prep for restart + // condition updatingsettings true + if containsClusterCondition(api.Restarting, v1.ConditionFalse, clusterStatus) && + containsClusterCondition(api.UpdatingSettings, v1.ConditionTrue, clusterStatus) { + + // disable shard allocation + if ok, err := SetShardAllocation(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, api.ShardAllocationNone, elasticsearchRequest.client); !ok { + logrus.Warnf("Unable to disable shard allocation: %v", err) + } + + // flush nodes + if ok, err := DoSynchronizedFlush(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, elasticsearchRequest.client); !ok { + logrus.Warnf("Unable to perform synchronized flush: %v", err) + } + + updateRestartingCondition(clusterStatus, v1.ConditionTrue) + updateUpdatingSettingsCondition(clusterStatus, v1.ConditionFalse) + } + + // 3 -- restart + // condition restarting true + if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) && + containsClusterCondition(api.UpdatingSettings, v1.ConditionFalse, clusterStatus) { + + // call fullClusterRestart on each node that is scheduled for a full cluster restart + for _, node := range certRedeployNodes { + _, nodeStatus := getNodeStatus(node.name(), clusterStatus) + node.fullClusterRestart(nodeStatus) + addNodeState(node, nodeStatus) + elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus) + } + + // check that all nodes have been restarted by seeing if they still have the need to cert restart + if len(getScheduledCertRedeployNodes(elasticsearchRequest.cluster)) > 0 { + return fmt.Errorf("Not all nodes were able to be restarted yet...") + } + + updateUpdatingSettingsCondition(clusterStatus, v1.ConditionTrue) + } + } + + // 4 -- post restart + // condition restarting true and updatingsettings true + if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) && + containsClusterCondition(api.UpdatingSettings, v1.ConditionTrue, clusterStatus) { + + // verify all nodes rejoined + // check that we have no failed/notReady nodes + podStatus := elasticsearchRequest.GetCurrentPodStateMap() + if len(podStatus[api.ElasticsearchRoleData][api.PodStateTypeNotReady]) > 0 || + len(podStatus[api.ElasticsearchRoleMaster][api.PodStateTypeNotReady]) > 0 { + + logrus.Warnf("Waiting for all cluster nodes to rejoin after full cluster restart...") + return fmt.Errorf("Waiting for all cluster nodes to rejoin after full cluster restart...") + } + + // reenable shard allocation + if ok, err := SetShardAllocation(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, api.ShardAllocationAll, elasticsearchRequest.client); !ok { + logrus.Warnf("Unable to enable shard allocation: %v", err) + return err + } + + updateUpdatingSettingsCondition(clusterStatus, v1.ConditionFalse) + } + + // 5 -- recovery + // wait for cluster to go green again + if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) { + if status, _ := GetClusterHealthStatus(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, elasticsearchRequest.client); status != "green" { + logrus.Infof("Waiting for cluster to complete recovery: %v / green", status) + return fmt.Errorf("Cluster has not completed recovery after restart: %v / green", status) + } + + logrus.Infof("Completed full cluster restart for cert redeploy on %v", elasticsearchRequest.cluster.Name) + updateRestartingCondition(clusterStatus, v1.ConditionFalse) + } + + return nil +} diff --git a/pkg/k8shandler/deployment.go b/pkg/k8shandler/deployment.go index d9591c75c..3d2af1d8b 100644 --- a/pkg/k8shandler/deployment.go +++ b/pkg/k8shandler/deployment.go @@ -85,8 +85,9 @@ func (node *deploymentNode) name() string { func (node *deploymentNode) state() api.ElasticsearchNodeStatus { - var rolloutForReload v1.ConditionStatus + //var rolloutForReload v1.ConditionStatus var rolloutForUpdate v1.ConditionStatus + var rolloutForCertReload v1.ConditionStatus // see if we need to update the deployment object if node.isChanged() { @@ -102,14 +103,14 @@ func (node *deploymentNode) state() api.ElasticsearchNodeStatus { // check if the secretHash changed newSecretHash := getSecretDataHash(node.clusterName, node.self.Namespace, node.client) if newSecretHash != node.secretHash { - rolloutForReload = v1.ConditionTrue + rolloutForCertReload = v1.ConditionTrue } return api.ElasticsearchNodeStatus{ DeploymentName: node.self.Name, UpgradeStatus: api.ElasticsearchNodeUpgradeStatus{ - ScheduledForUpgrade: rolloutForUpdate, - ScheduledForRedeploy: rolloutForReload, + ScheduledForUpgrade: rolloutForUpdate, + ScheduledForCertRedeploy: rolloutForCertReload, }, } } @@ -306,7 +307,7 @@ func (node *deploymentNode) isMissing() bool { return false } -func (node *deploymentNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) { +func (node *deploymentNode) rollingRestart(upgradeStatus *api.ElasticsearchNodeStatus) { if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { if status, _ := GetClusterHealthStatus(node.clusterName, node.self.Namespace, node.client); status != "green" { @@ -400,6 +401,63 @@ func (node *deploymentNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) } } +func (node *deploymentNode) fullClusterRestart(upgradeStatus *api.ElasticsearchNodeStatus) { + + if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { + upgradeStatus.UpgradeStatus.UnderUpgrade = v1.ConditionTrue + size, err := GetClusterNodeCount(node.clusterName, node.self.Namespace, node.client) + if err != nil { + logrus.Warnf("Unable to get cluster size prior to restart for %v", node.name()) + return + } + node.clusterSize = size + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == "" || + upgradeStatus.UpgradeStatus.UpgradePhase == api.ControllerUpdated { + + err, replicas := node.replicaCount() + if err != nil { + logrus.Warnf("Unable to get replica count for %v", node.name()) + } + + if replicas > 0 { + // check for available replicas empty + // node.self.Status.Replicas + // if we aren't at 0, then we need to scale down to 0 + if err = node.setReplicaCount(0); err != nil { + logrus.Warnf("Unable to scale down %v", node.name()) + return + } + + if err, _ = node.waitForNodeLeaveCluster(); err != nil { + logrus.Infof("Timed out waiting for %v to leave the cluster", node.name()) + return + } + } + + upgradeStatus.UpgradeStatus.UpgradePhase = api.NodeRestarting + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == api.NodeRestarting { + + if err := node.setReplicaCount(1); err != nil { + logrus.Warnf("Unable to scale up %v", node.name()) + return + } + + node.refreshHashes() + + upgradeStatus.UpgradeStatus.UpgradePhase = api.RecoveringData + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == api.RecoveringData { + + upgradeStatus.UpgradeStatus.UpgradePhase = api.ControllerUpdated + upgradeStatus.UpgradeStatus.UnderUpgrade = "" + } +} + func (node *deploymentNode) update(upgradeStatus *api.ElasticsearchNodeStatus) error { // set our state to being under upgrade diff --git a/pkg/k8shandler/nodetypefactory.go b/pkg/k8shandler/nodetypefactory.go index af725472a..de08fdf13 100644 --- a/pkg/k8shandler/nodetypefactory.go +++ b/pkg/k8shandler/nodetypefactory.go @@ -14,7 +14,8 @@ type NodeTypeInterface interface { state() api.ElasticsearchNodeStatus // this will get the current -- used for status create() error // this will create the node in the case where it is new update(upgradeStatus *api.ElasticsearchNodeStatus) error // this will handle updates - restart(upgradeStatus *api.ElasticsearchNodeStatus) + rollingRestart(upgradeStatus *api.ElasticsearchNodeStatus) + fullClusterRestart(upgradeStatus *api.ElasticsearchNodeStatus) progressUnshedulableNode(upgradeStatus *api.ElasticsearchNodeStatus) error name() string updateReference(node NodeTypeInterface) diff --git a/pkg/k8shandler/statefulset.go b/pkg/k8shandler/statefulset.go index 2f9c54a13..b9092985e 100644 --- a/pkg/k8shandler/statefulset.go +++ b/pkg/k8shandler/statefulset.go @@ -78,8 +78,9 @@ func (current *statefulSetNode) updateReference(desired NodeTypeInterface) { } func (node *statefulSetNode) state() api.ElasticsearchNodeStatus { - var rolloutForReload v1.ConditionStatus + //var rolloutForReload v1.ConditionStatus var rolloutForUpdate v1.ConditionStatus + var rolloutForCertReload v1.ConditionStatus // see if we need to update the deployment object if node.isChanged() { @@ -95,14 +96,14 @@ func (node *statefulSetNode) state() api.ElasticsearchNodeStatus { // check if the secretHash changed newSecretHash := getSecretDataHash(node.clusterName, node.self.Namespace, node.client) if newSecretHash != node.secretHash { - rolloutForReload = v1.ConditionTrue + rolloutForCertReload = v1.ConditionTrue } return api.ElasticsearchNodeStatus{ StatefulSetName: node.self.Name, UpgradeStatus: api.ElasticsearchNodeUpgradeStatus{ - ScheduledForUpgrade: rolloutForUpdate, - ScheduledForRedeploy: rolloutForReload, + ScheduledForUpgrade: rolloutForUpdate, + ScheduledForCertRedeploy: rolloutForCertReload, }, } } @@ -230,7 +231,7 @@ func (node *statefulSetNode) isMissing() bool { return false } -func (node *statefulSetNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) { +func (node *statefulSetNode) rollingRestart(upgradeStatus *api.ElasticsearchNodeStatus) { if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { if status, _ := GetClusterHealthStatus(node.clusterName, node.self.Namespace, node.client); status != "green" { @@ -320,6 +321,74 @@ func (node *statefulSetNode) restart(upgradeStatus *api.ElasticsearchNodeStatus) } } +func (node *statefulSetNode) fullClusterRestart(upgradeStatus *api.ElasticsearchNodeStatus) { + + if upgradeStatus.UpgradeStatus.UnderUpgrade != v1.ConditionTrue { + replicas, err := node.replicaCount() + if err != nil { + logrus.Warnf("Unable to get number of replicas prior to restart for %v", node.name()) + return + } + + size, err := GetClusterNodeCount(node.clusterName, node.self.Namespace, node.client) + if err != nil { + logrus.Warnf("Unable to get cluster size prior to restart for %v", node.name()) + return + } + + node.setPartition(replicas) + node.clusterSize = size + upgradeStatus.UpgradeStatus.UnderUpgrade = v1.ConditionTrue + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == "" || + upgradeStatus.UpgradeStatus.UpgradePhase == api.ControllerUpdated { + + // nothing to do here -- just maintaing a framework structure + + upgradeStatus.UpgradeStatus.UpgradePhase = api.NodeRestarting + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == api.NodeRestarting { + + ordinal, err := node.partition() + if err != nil { + logrus.Infof("Unable to get node ordinal value: %v", err) + return + } + + for index := ordinal; index > 0; index-- { + // get podName based on ordinal index and node.name() + podName := fmt.Sprintf("%v-%v", node.name(), index-1) + + // delete the pod + if err := DeletePod(podName, node.self.Namespace, node.client); err != nil { + logrus.Infof("Unable to delete pod %v for restart: %v", podName, err) + return + } + + // wait for node to leave cluster + if err, _ := node.waitForNodeLeaveCluster(); err != nil { + logrus.Infof("Timed out waiting for %v to leave the cluster", podName) + return + } + + // used for tracking in case of timeout + node.setPartition(index - 1) + } + + node.refreshHashes() + + upgradeStatus.UpgradeStatus.UpgradePhase = api.RecoveringData + } + + if upgradeStatus.UpgradeStatus.UpgradePhase == api.RecoveringData { + + upgradeStatus.UpgradeStatus.UpgradePhase = api.ControllerUpdated + upgradeStatus.UpgradeStatus.UnderUpgrade = "" + } +} + func (node *statefulSetNode) delete() { node.client.Delete(context.TODO(), &node.self) } diff --git a/pkg/k8shandler/status.go b/pkg/k8shandler/status.go index 4fbf5a2aa..470ef4a07 100644 --- a/pkg/k8shandler/status.go +++ b/pkg/k8shandler/status.go @@ -82,6 +82,27 @@ func (elasticsearchRequest *ElasticsearchRequest) UpdateClusterStatus() error { return nil } +func (elasticsearchRequest *ElasticsearchRequest) GetCurrentPodStateMap() map[api.ElasticsearchNodeRole]api.PodStateMap { + return rolePodStateMap(elasticsearchRequest.cluster.Namespace, elasticsearchRequest.cluster.Name, elasticsearchRequest.client) +} + +func containsClusterCondition(condition api.ClusterConditionType, status v1.ConditionStatus, elasticsearchStatus *api.ElasticsearchStatus) bool { + // if we're looking for a status of v1.ConditionTrue then we want to see if the + // condition is present and the status is the same + // + // if we're looking for a status of v1.ConditionFalse then we want the condition + // to either be present with status of false or to not find the condition + defaultValue := (status != v1.ConditionTrue) + + for _, clusterCondition := range elasticsearchStatus.Conditions { + if clusterCondition.Type == condition { + return clusterCondition.Status == status + } + } + + return defaultValue +} + // if a status doesn't exist, provide a new one func getNodeStatus(name string, status *api.ElasticsearchStatus) (int, *api.ElasticsearchNodeStatus) { for index, status := range status.Nodes { @@ -655,17 +676,9 @@ func updateInvalidReplicationCondition(status *api.ElasticsearchStatus, value v1 } func updateUpdatingSettingsCondition(status *api.ElasticsearchStatus, value v1.ConditionStatus) bool { - var message string - if value == v1.ConditionTrue { - message = "Config Map is different" - } else { - message = "Config Map is up to date" - } return updateESNodeCondition(status, &api.ClusterCondition{ - Type: api.UpdatingSettings, - Status: value, - Reason: "ConfigChange", - Message: message, + Type: api.UpdatingSettings, + Status: value, }) } diff --git a/test/e2e/elasticsearch_test.go b/test/e2e/elasticsearch_test.go index 5315463f6..d0f8edea9 100644 --- a/test/e2e/elasticsearch_test.go +++ b/test/e2e/elasticsearch_test.go @@ -71,6 +71,38 @@ func createRequiredSecret(f *framework.Framework, ctx *framework.TestCtx) error return nil } +func updateRequiredSecret(f *framework.Framework, ctx *framework.TestCtx) error { + namespace, err := ctx.GetNamespace() + if err != nil { + return fmt.Errorf("Could not get namespace: %v", err) + } + + elasticsearchSecret := &v1.Secret{} + + secretName := types.NamespacedName{Name: elasticsearchCRName, Namespace: namespace} + if err = f.Client.Get(goctx.TODO(), secretName, elasticsearchSecret); err != nil { + return fmt.Errorf("Could not get secret %s: %v", elasticsearchCRName, err) + } + + elasticsearchSecret.Data = map[string][]byte{ + "elasticsearch.key": utils.GetFileContents("test/files/elasticsearch.key"), + "elasticsearch.crt": utils.GetFileContents("test/files/elasticsearch.crt"), + "logging-es.key": utils.GetFileContents("test/files/logging-es.key"), + "logging-es.crt": utils.GetFileContents("test/files/logging-es.crt"), + "admin-key": utils.GetFileContents("test/files/system.admin.key"), + "admin-cert": utils.GetFileContents("test/files/system.admin.crt"), + "admin-ca": utils.GetFileContents("test/files/ca.crt"), + "dummy": []byte("blah"), + } + + err = f.Client.Update(goctx.TODO(), elasticsearchSecret) + if err != nil { + return err + } + + return nil +} + func elasticsearchFullClusterTest(t *testing.T, f *framework.Framework, ctx *framework.TestCtx) error { namespace, err := ctx.GetNamespace() if err != nil { @@ -208,18 +240,60 @@ func elasticsearchFullClusterTest(t *testing.T, f *framework.Framework, ctx *fra } /* - FIXME: this is commented out as we currently do not run our e2e tests in a container on the test cluster - to be added back in as a follow up - err = utils.WaitForIndexTemplateReplicas(t, f.KubeClient, namespace, "example-elasticsearch", 1, retryInterval, timeout) + FIXME: this is commented out as we currently do not run our e2e tests in a container on the test cluster + to be added back in as a follow up + err = utils.WaitForIndexTemplateReplicas(t, f.KubeClient, namespace, "example-elasticsearch", 1, retryInterval, timeout) + if err != nil { + return fmt.Errorf("timed out waiting for all index templates to have correct replica count") + } + + err = utils.WaitForIndexReplicas(t, f.KubeClient, namespace, "example-elasticsearch", 1, retryInterval, timeout) + if err != nil { + return fmt.Errorf("timed out waiting for all indices to have correct replica count") + } + */ + + // Update the secret to force a full cluster redeploy + err = updateRequiredSecret(f, ctx) if err != nil { - return fmt.Errorf("timed out waiting for all index templates to have correct replica count") + return fmt.Errorf("Unable to update secret") } - err = utils.WaitForIndexReplicas(t, f.KubeClient, namespace, "example-elasticsearch", 1, retryInterval, timeout) + // wait for pods to have "redeploy for certs" condition as true? + desiredCondition := elasticsearch.ElasticsearchNodeUpgradeStatus{ + ScheduledForCertRedeploy: v1.ConditionTrue, + } + + utils.WaitForNodeStatusCondition(t, f, namespace, elasticsearchCRName, desiredCondition, retryInterval, time.Second*30) if err != nil { - return fmt.Errorf("timed out waiting for all indices to have correct replica count") + return fmt.Errorf("Timed out waiting for full cluster restart to begin") + } + + // then wait for conditions to be gone + desiredClusterCondition := elasticsearch.ClusterCondition{ + Type: elasticsearch.Restarting, + Status: v1.ConditionFalse, + } + utils.WaitForClusterStatusCondition(t, f, namespace, elasticsearchCRName, desiredClusterCondition, retryInterval, time.Second*300) + if err != nil { + return fmt.Errorf("Timed out waiting for full cluster restart to complete") + } + + // ensure all prior nodes are ready again + err = e2eutil.WaitForDeployment(t, f.KubeClient, namespace, fmt.Sprintf("example-elasticsearch-cdm-%v-1", dataUUID), 1, retryInterval, timeout) + if err != nil { + return fmt.Errorf("timed out waiting for Deployment %v: %v", fmt.Sprintf("example-elasticsearch-cdm-%v-1", dataUUID), err) + } + + err = e2eutil.WaitForDeployment(t, f.KubeClient, namespace, fmt.Sprintf("example-elasticsearch-cdm-%v-2", dataUUID), 1, retryInterval, timeout) + if err != nil { + return fmt.Errorf("timed out waiting for Deployment %v: %v", fmt.Sprintf("example-elasticsearch-cdm-%v-1", dataUUID), err) + } + + err = utils.WaitForStatefulset(t, f.KubeClient, namespace, fmt.Sprintf("example-elasticsearch-cm-%v", nonDataUUID), 1, retryInterval, timeout) + if err != nil { + return fmt.Errorf("timed out waiting for Statefulset %v: %v", fmt.Sprintf("example-elasticsearch-cm-%v", nonDataUUID), err) } - */ // Incorrect scale up and verify we don't see a 4th master created if err = f.Client.Get(goctx.TODO(), exampleName, exampleElasticsearch); err != nil { diff --git a/test/utils/utils.go b/test/utils/utils.go index 86475734c..65c48391e 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "io/ioutil" + "reflect" "strconv" "strings" "testing" @@ -9,6 +10,7 @@ import ( "github.com/sirupsen/logrus" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -16,6 +18,9 @@ import ( "github.com/openshift/elasticsearch-operator/pkg/utils" "sigs.k8s.io/controller-runtime/pkg/client/fake" + goctx "context" + api "github.com/openshift/elasticsearch-operator/pkg/apis/logging/v1" + framework "github.com/operator-framework/operator-sdk/pkg/test" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -45,6 +50,76 @@ func Secret(secretName string, namespace string, data map[string][]byte) *v1.Sec } } +func WaitForNodeStatusCondition(t *testing.T, f *framework.Framework, namespace, name string, condition api.ElasticsearchNodeUpgradeStatus, retryInterval, timeout time.Duration) error { + elasticsearchCR := &api.Elasticsearch{} + elasticsearchName := types.NamespacedName{Name: name, Namespace: namespace} + + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { + err = f.Client.Get(goctx.TODO(), elasticsearchName, elasticsearchCR) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("Waiting for availability of %s elasticsearch\n", name) + return false, nil + } + return false, err + } + + allMatch := true + + for _, node := range elasticsearchCR.Status.Nodes { + if !reflect.DeepEqual(node.UpgradeStatus, condition) { + allMatch = false + } + } + + if allMatch { + return true, nil + } + t.Logf("Waiting for full condition match of %s elasticsearch\n", name) + return false, nil + }) + if err != nil { + return err + } + t.Logf("Full condition matches\n") + return nil +} + +func WaitForClusterStatusCondition(t *testing.T, f *framework.Framework, namespace, name string, condition api.ClusterCondition, retryInterval, timeout time.Duration) error { + elasticsearchCR := &api.Elasticsearch{} + elasticsearchName := types.NamespacedName{Name: name, Namespace: namespace} + + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { + err = f.Client.Get(goctx.TODO(), elasticsearchName, elasticsearchCR) + if err != nil { + if apierrors.IsNotFound(err) { + t.Logf("Waiting for availability of %s elasticsearch\n", name) + return false, nil + } + return false, err + } + + contained := false + + for _, clusterCondition := range elasticsearchCR.Status.Conditions { + if reflect.DeepEqual(clusterCondition, condition) { + contained = true + } + } + + if contained { + return true, nil + } + t.Logf("Waiting for full condition match of %s elasticsearch\n", name) + return false, nil + }) + if err != nil { + return err + } + t.Logf("Full condition matches\n") + return nil +} + func WaitForStatefulset(t *testing.T, kubeclient kubernetes.Interface, namespace, name string, replicas int, retryInterval, timeout time.Duration) error { err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { statefulset, err := kubeclient.AppsV1().StatefulSets(namespace).Get(name, metav1.GetOptions{IncludeUninitialized: true})