Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion hack/test-e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ TEST_NAMESPACE=${TEST_NAMESPACE} go test ./test/e2e/... \
-namespacedMan ${manifest} \
-v \
-parallel=1 \
-singleNamespace
-singleNamespace \
-timeout 900s

oc delete namespace ${TEST_NAMESPACE}
9 changes: 5 additions & 4 deletions pkg/apis/logging/v1/elasticsearch_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,11 @@ type ElasticsearchNodeStatus struct {
}

type ElasticsearchNodeUpgradeStatus struct {
ScheduledForUpgrade v1.ConditionStatus `json:"scheduledUpgrade,omitempty"`
ScheduledForRedeploy v1.ConditionStatus `json:"scheduledRedeploy,omitempty"`
UnderUpgrade v1.ConditionStatus `json:"underUpgrade,omitempty"`
UpgradePhase ElasticsearchUpgradePhase `json:"upgradePhase,omitempty"`
ScheduledForUpgrade v1.ConditionStatus `json:"scheduledUpgrade,omitempty"`
ScheduledForRedeploy v1.ConditionStatus `json:"scheduledRedeploy,omitempty"`
ScheduledForCertRedeploy v1.ConditionStatus `json:"scheduledCertRedeploy,omitempty"`
UnderUpgrade v1.ConditionStatus `json:"underUpgrade,omitempty"`
UpgradePhase ElasticsearchUpgradePhase `json:"upgradePhase,omitempty"`
}

// ClusterCondition contains details for the current condition of this elasticsearch cluster.
Expand Down
225 changes: 169 additions & 56 deletions pkg/k8shandler/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,57 +44,42 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu
elasticsearchRequest.getNodes()

progressUnshedulableNodes(elasticsearchRequest.cluster)
err = elasticsearchRequest.performFullClusterRestart()
if err != nil {
return elasticsearchRequest.UpdateClusterStatus()
}

// if there is a node currently being upgraded, work on that first
upgradeInProgressNode := getNodeUpgradeInProgress(elasticsearchRequest.cluster)
scheduledUpgradeNodes := getScheduledUpgradeNodes(elasticsearchRequest.cluster)
if upgradeInProgressNode != nil {

clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy()
index, nodeStatus := getNodeStatus(upgradeInProgressNode.name(), clusterStatus)
_, nodeStatus := getNodeStatus(upgradeInProgressNode.name(), clusterStatus)

if _, ok := containsNodeTypeInterface(upgradeInProgressNode, scheduledUpgradeNodes); ok {
logrus.Debugf("Continuing update for %v", upgradeInProgressNode.name())
upgradeInProgressNode.update(nodeStatus)
} else {
logrus.Debugf("Continuing restart for %v", upgradeInProgressNode.name())
upgradeInProgressNode.restart(nodeStatus)
}

nodeState := upgradeInProgressNode.state()

nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade
nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy

if index == NOT_FOUND_INDEX {
clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus)
} else {
clusterStatus.Nodes[index] = *nodeStatus
upgradeInProgressNode.rollingRestart(nodeStatus)
}

elasticsearchRequest.updateNodeStatus(*clusterStatus)
addNodeState(upgradeInProgressNode, nodeStatus)
elasticsearchRequest.setNodeStatus(upgradeInProgressNode, nodeStatus, clusterStatus)

} else {

if len(scheduledUpgradeNodes) > 0 {
for _, node := range scheduledUpgradeNodes {
logrus.Debugf("Perform a update for %v", node.name())
clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy()
index, nodeStatus := getNodeStatus(node.name(), clusterStatus)
_, nodeStatus := getNodeStatus(node.name(), clusterStatus)

err := node.update(nodeStatus)
nodeState := node.state()

nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade
nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy

if index == NOT_FOUND_INDEX {
clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus)
} else {
clusterStatus.Nodes[index] = *nodeStatus
}

elasticsearchRequest.updateNodeStatus(*clusterStatus)
addNodeState(node, nodeStatus)
elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus)

if err != nil {
logrus.Warnf("Error occurred while updating node %v: %v", node.name(), err)
Expand All @@ -110,28 +95,19 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu
for _, node := range scheduledRedeployNodes {
logrus.Debugf("Perform a redeploy for %v", node.name())
clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy()
index, nodeStatus := getNodeStatus(node.name(), clusterStatus)
_, nodeStatus := getNodeStatus(node.name(), clusterStatus)

node.restart(nodeStatus)
nodeState := node.state()
node.rollingRestart(nodeStatus)

nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade
nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy

if index == NOT_FOUND_INDEX {
clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus)
} else {
clusterStatus.Nodes[index] = *nodeStatus
}

elasticsearchRequest.updateNodeStatus(*clusterStatus)
addNodeState(node, nodeStatus)
elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus)
}

} else {

for _, node := range nodes[nodeMapKey(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace)] {
clusterStatus := elasticsearchRequest.cluster.Status.DeepCopy()
index, nodeStatus := getNodeStatus(node.name(), clusterStatus)
_, nodeStatus := getNodeStatus(node.name(), clusterStatus)

// Verify that we didn't scale up too many masters
if err := elasticsearchRequest.isValidConf(); err != nil {
Expand All @@ -147,23 +123,9 @@ func (elasticsearchRequest *ElasticsearchRequest) CreateOrUpdateElasticsearchClu
if err := node.create(); err != nil {
return err
}
nodeState := node.state()

nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade
nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy

if index == NOT_FOUND_INDEX {
// this is a new status, just append
nodeStatus.DeploymentName = nodeState.DeploymentName
nodeStatus.StatefulSetName = nodeState.StatefulSetName
clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus)
} else {
// this is an existing status, update in place
clusterStatus.Nodes[index] = *nodeStatus
}

// update status here
elasticsearchRequest.updateNodeStatus(*clusterStatus)
addNodeState(node, nodeStatus)
elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus)

elasticsearchRequest.updateMinMasters()
}
Expand Down Expand Up @@ -366,6 +328,53 @@ func getScheduledRedeployOnlyNodes(cluster *api.Elasticsearch) []NodeTypeInterfa
return redeployNodes
}

func getScheduledCertRedeployNodes(cluster *api.Elasticsearch) []NodeTypeInterface {
redeployCertNodes := []NodeTypeInterface{}
dataNodes := []NodeTypeInterface{}

for _, node := range cluster.Status.Nodes {
if node.UpgradeStatus.ScheduledForCertRedeploy == v1.ConditionTrue {
for _, nodeTypeInterface := range nodes[nodeMapKey(cluster.Name, cluster.Namespace)] {
if node.DeploymentName == nodeTypeInterface.name() {
dataNodes = append(dataNodes, nodeTypeInterface)
}

if node.StatefulSetName == nodeTypeInterface.name() {
redeployCertNodes = append(redeployCertNodes, nodeTypeInterface)
}
}
}
}

redeployCertNodes = append(redeployCertNodes, dataNodes...)

return redeployCertNodes
}

func addNodeState(node NodeTypeInterface, nodeStatus *api.ElasticsearchNodeStatus) {

nodeState := node.state()

nodeStatus.UpgradeStatus.ScheduledForUpgrade = nodeState.UpgradeStatus.ScheduledForUpgrade
nodeStatus.UpgradeStatus.ScheduledForRedeploy = nodeState.UpgradeStatus.ScheduledForRedeploy
nodeStatus.UpgradeStatus.ScheduledForCertRedeploy = nodeState.UpgradeStatus.ScheduledForCertRedeploy
nodeStatus.DeploymentName = nodeState.DeploymentName
nodeStatus.StatefulSetName = nodeState.StatefulSetName
}

func (elasticsearchRequest *ElasticsearchRequest) setNodeStatus(node NodeTypeInterface, nodeStatus *api.ElasticsearchNodeStatus, clusterStatus *api.ElasticsearchStatus) {

index, _ := getNodeStatus(node.name(), clusterStatus)

if index == NOT_FOUND_INDEX {
clusterStatus.Nodes = append(clusterStatus.Nodes, *nodeStatus)
} else {
clusterStatus.Nodes[index] = *nodeStatus
}

elasticsearchRequest.updateNodeStatus(*clusterStatus)
}

func (elasticsearchRequest *ElasticsearchRequest) updateNodeStatus(status api.ElasticsearchStatus) error {

cluster := elasticsearchRequest.cluster
Expand Down Expand Up @@ -399,3 +408,107 @@ func (elasticsearchRequest *ElasticsearchRequest) updateNodeStatus(status api.El

return nil
}

// Full cluster restart is required when certs need to be refreshed
// this is a very high priority action since the cluster may be fractured/unusable
// in the case where certs aren't all rolled out correctly or are expired
func (elasticsearchRequest *ElasticsearchRequest) performFullClusterRestart() error {

// make sure we have nodes that are scheduled for full cluster restart first
certRedeployNodes := getScheduledCertRedeployNodes(elasticsearchRequest.cluster)
clusterStatus := &elasticsearchRequest.cluster.Status

// 1 -- precheck
// no restarting conditions set
if len(certRedeployNodes) > 0 {

if containsClusterCondition(api.Restarting, v1.ConditionFalse, clusterStatus) &&
containsClusterCondition(api.UpdatingSettings, v1.ConditionFalse, clusterStatus) {

// We don't want to gate on cluster health -- we may be in a bad state
// because pods aren't all available
logrus.Infof("Beginning full cluster restart for cert redeploy on %v", elasticsearchRequest.cluster.Name)

// set conditions here for next check
updateUpdatingSettingsCondition(clusterStatus, v1.ConditionTrue)
}

// 2 -- prep for restart
// condition updatingsettings true
if containsClusterCondition(api.Restarting, v1.ConditionFalse, clusterStatus) &&
containsClusterCondition(api.UpdatingSettings, v1.ConditionTrue, clusterStatus) {

// disable shard allocation
if ok, err := SetShardAllocation(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, api.ShardAllocationNone, elasticsearchRequest.client); !ok {
logrus.Warnf("Unable to disable shard allocation: %v", err)
}

// flush nodes
if ok, err := DoSynchronizedFlush(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, elasticsearchRequest.client); !ok {
logrus.Warnf("Unable to perform synchronized flush: %v", err)
}

updateRestartingCondition(clusterStatus, v1.ConditionTrue)
updateUpdatingSettingsCondition(clusterStatus, v1.ConditionFalse)
}

// 3 -- restart
// condition restarting true
if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) &&
containsClusterCondition(api.UpdatingSettings, v1.ConditionFalse, clusterStatus) {

// call fullClusterRestart on each node that is scheduled for a full cluster restart
for _, node := range certRedeployNodes {
_, nodeStatus := getNodeStatus(node.name(), clusterStatus)
node.fullClusterRestart(nodeStatus)
addNodeState(node, nodeStatus)
elasticsearchRequest.setNodeStatus(node, nodeStatus, clusterStatus)
}

// check that all nodes have been restarted by seeing if they still have the need to cert restart
if len(getScheduledCertRedeployNodes(elasticsearchRequest.cluster)) > 0 {
return fmt.Errorf("Not all nodes were able to be restarted yet...")
}

updateUpdatingSettingsCondition(clusterStatus, v1.ConditionTrue)
}
}

// 4 -- post restart
// condition restarting true and updatingsettings true
if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) &&
containsClusterCondition(api.UpdatingSettings, v1.ConditionTrue, clusterStatus) {

// verify all nodes rejoined
// check that we have no failed/notReady nodes
podStatus := elasticsearchRequest.GetCurrentPodStateMap()
if len(podStatus[api.ElasticsearchRoleData][api.PodStateTypeNotReady]) > 0 ||
len(podStatus[api.ElasticsearchRoleMaster][api.PodStateTypeNotReady]) > 0 {

logrus.Warnf("Waiting for all cluster nodes to rejoin after full cluster restart...")
return fmt.Errorf("Waiting for all cluster nodes to rejoin after full cluster restart...")
}

// reenable shard allocation
if ok, err := SetShardAllocation(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, api.ShardAllocationAll, elasticsearchRequest.client); !ok {
logrus.Warnf("Unable to enable shard allocation: %v", err)
return err
}

updateUpdatingSettingsCondition(clusterStatus, v1.ConditionFalse)
}

// 5 -- recovery
// wait for cluster to go green again
if containsClusterCondition(api.Restarting, v1.ConditionTrue, clusterStatus) {
if status, _ := GetClusterHealthStatus(elasticsearchRequest.cluster.Name, elasticsearchRequest.cluster.Namespace, elasticsearchRequest.client); status != "green" {
logrus.Infof("Waiting for cluster to complete recovery: %v / green", status)
return fmt.Errorf("Cluster has not completed recovery after restart: %v / green", status)
}

logrus.Infof("Completed full cluster restart for cert redeploy on %v", elasticsearchRequest.cluster.Name)
updateRestartingCondition(clusterStatus, v1.ConditionFalse)
}

return nil
}
Loading