diff --git a/pkg/elasticsearch/client.go b/pkg/elasticsearch/client.go index ad27728b6..c3ff122a9 100644 --- a/pkg/elasticsearch/client.go +++ b/pkg/elasticsearch/client.go @@ -83,7 +83,8 @@ type Client interface { CreateIndexTemplate(name string, template *estypes.IndexTemplate) error DeleteIndexTemplate(name string) error ListTemplates() (sets.String, error) - GetIndexTemplates() (map[string]interface{}, error) + GetIndexTemplates() (map[string]estypes.GetIndexTemplate, error) + UpdateTemplatePrimaryShards(shardCount int32) error SetSendRequestFn(fn FnEsSendRequest) } diff --git a/pkg/elasticsearch/replicas.go b/pkg/elasticsearch/replicas.go index e240bb437..2d9374f3b 100644 --- a/pkg/elasticsearch/replicas.go +++ b/pkg/elasticsearch/replicas.go @@ -1,7 +1,6 @@ package elasticsearch import ( - "encoding/json" "fmt" "net/http" "strconv" @@ -64,49 +63,6 @@ func (ec *esClient) GetIndexReplicaCounts() (map[string]interface{}, error) { return payload.ResponseBody, payload.Error } -func (ec *esClient) updateAllIndexTemplateReplicas(replicaCount int32) (bool, error) { - - // get the index template and then update the replica and put it - indexTemplates, _ := ec.GetIndexTemplates() - - for templateName := range indexTemplates { - - if template, ok := indexTemplates[templateName].(map[string]interface{}); ok { - if settings, ok := template["settings"].(map[string]interface{}); ok { - if index, ok := settings["index"].(map[string]interface{}); ok { - currentReplicas, ok := index["number_of_replicas"].(string) - - if ok && currentReplicas != fmt.Sprintf("%d", replicaCount) { - template["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = fmt.Sprintf("%d", replicaCount) - - templateJson, _ := json.Marshal(template) - - payload := &EsRequest{ - Method: http.MethodPut, - URI: fmt.Sprintf("_template/%s", templateName), - RequestBody: string(templateJson), - } - - ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient) - - acknowledged := false - if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok { - acknowledged = acknowledgedBool - } - - if !(payload.StatusCode == 200 && acknowledged) { - log.Error(payload.Error, "unable to update tmeplate", "template", templateName) - } - } - } - } - } - - } - - return true, nil -} - func (ec *esClient) updateIndexReplicas(index string, replicaCount int32) (bool, error) { payload := &EsRequest{ Method: http.MethodPut, diff --git a/pkg/elasticsearch/templates.go b/pkg/elasticsearch/templates.go index 3850ab3ea..45c45fe8d 100644 --- a/pkg/elasticsearch/templates.go +++ b/pkg/elasticsearch/templates.go @@ -1,9 +1,11 @@ package elasticsearch import ( + "encoding/json" "fmt" "net/http" + "github.com/openshift/elasticsearch-operator/pkg/log" estypes "github.com/openshift/elasticsearch-operator/pkg/types/elasticsearch" "github.com/openshift/elasticsearch-operator/pkg/utils" "k8s.io/apimachinery/pkg/util/sets" @@ -67,7 +69,7 @@ func (ec *esClient) ListTemplates() (sets.String, error) { return response, nil } -func (ec *esClient) GetIndexTemplates() (map[string]interface{}, error) { +func (ec *esClient) GetIndexTemplates() (map[string]estypes.GetIndexTemplate, error) { payload := &EsRequest{ Method: http.MethodGet, URI: "_template/common.*", @@ -75,5 +77,98 @@ func (ec *esClient) GetIndexTemplates() (map[string]interface{}, error) { ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient) - return payload.ResponseBody, payload.Error + // unmarshal response body and return that + templates := map[string]estypes.GetIndexTemplate{} + err := json.Unmarshal([]byte(payload.RawResponseBody), &templates) + if err != nil { + return templates, fmt.Errorf("failed decoding raw response body into `map[string]estypes.GetIndexTemplate` for %s in namespace %s: %v", ec.cluster, ec.namespace, err) + } + + return templates, payload.Error +} + +func (ec *esClient) updateAllIndexTemplateReplicas(replicaCount int32) (bool, error) { + + // get the index template and then update the replica and put it + indexTemplates, err := ec.GetIndexTemplates() + if err != nil { + return false, err + } + + replicaString := fmt.Sprintf("%d", replicaCount) + + for templateName, template := range indexTemplates { + + currentReplicas := template.Settings.Index.NumberOfReplicas + if currentReplicas != replicaString { + template.Settings.Index.NumberOfReplicas = replicaString + + templateJson, err := json.Marshal(template) + if err != nil { + return false, err + } + + payload := &EsRequest{ + Method: http.MethodPut, + URI: fmt.Sprintf("_template/%s", templateName), + RequestBody: string(templateJson), + } + + ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient) + + acknowledged := false + if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok { + acknowledged = acknowledgedBool + } + + if !(payload.StatusCode == 200 && acknowledged) { + log.Error(payload.Error, "unable to update template", "cluster", ec.cluster, "namespace", ec.namespace, "template", templateName) + } + } + } + + return true, nil +} + +func (ec *esClient) UpdateTemplatePrimaryShards(shardCount int32) error { + + // get the index template and then update the shards and put it + indexTemplates, err := ec.GetIndexTemplates() + if err != nil { + return err + } + + shardString := fmt.Sprintf("%d", shardCount) + + for templateName, template := range indexTemplates { + + currentShards := template.Settings.Index.NumberOfShards + if currentShards != shardString { + template.Settings.Index.NumberOfShards = shardString + + templateJson, err := json.Marshal(template) + if err != nil { + return err + } + + payload := &EsRequest{ + Method: http.MethodPut, + URI: fmt.Sprintf("_template/%s", templateName), + RequestBody: string(templateJson), + } + + ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient) + + acknowledged := false + if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok { + acknowledged = acknowledgedBool + } + + if !(payload.StatusCode == 200 && acknowledged) { + log.Error(payload.Error, "unable to update template", "cluster", ec.cluster, "namespace", ec.namespace, "template", templateName) + } + } + } + + return nil } diff --git a/pkg/k8shandler/cluster.go b/pkg/k8shandler/cluster.go index a73f31fd4..2dc3a1fb0 100644 --- a/pkg/k8shandler/cluster.go +++ b/pkg/k8shandler/cluster.go @@ -156,6 +156,9 @@ func (er *ElasticsearchRequest) CreateOrUpdateElasticsearchCluster() error { // ensure that MinMasters is (n / 2 + 1) er.updateMinMasters() + // update our template primary shard counts in case they changed + er.updatePrimaryShards() + // ensure we always have shard allocation to All if we aren't doing an update... er.tryEnsureAllShardAllocation() diff --git a/pkg/k8shandler/configmaps.go b/pkg/k8shandler/configmaps.go index 1a6dde325..e19ce4ed4 100644 --- a/pkg/k8shandler/configmaps.go +++ b/pkg/k8shandler/configmaps.go @@ -28,11 +28,11 @@ const ( // esYmlStruct is used to render esYmlTmpl to a proper elasticsearch.yml format type esYmlStruct struct { - KibanaIndexMode string - EsUnicastHost string - NodeQuorum string - RecoverExpectedShards string - SystemCallFilter string + KibanaIndexMode string + EsUnicastHost string + NodeQuorum string + RecoverExpectedNodes string + SystemCallFilter string } type log4j2PropertiesStruct struct { @@ -114,7 +114,7 @@ func (er *ElasticsearchRequest) CreateOrUpdateConfigMaps() (err error) { esUnicastHost(dpl.Name, dpl.Namespace), strconv.Itoa(masterNodeCount/2+1), strconv.Itoa(dataNodeCount), - strconv.Itoa(dataNodeCount), + strconv.Itoa(calculatePrimaryCount(dpl)), strconv.Itoa(calculateReplicaCount(dpl)), strconv.FormatBool(runtime.GOARCH == "amd64"), logConfig, @@ -166,11 +166,11 @@ func (er *ElasticsearchRequest) CreateOrUpdateConfigMaps() (err error) { return nil } -func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) (error, map[string]string) { +func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) (error, map[string]string) { data := map[string]string{} buf := &bytes.Buffer{} - if err := renderEsYml(buf, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, systemCallFilter); err != nil { + if err := renderEsYml(buf, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, systemCallFilter); err != nil { return err, data } data[esConfig] = buf.String() @@ -192,9 +192,9 @@ func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShard // newConfigMap returns a v1.ConfigMap object func newConfigMap(configMapName, namespace string, labels map[string]string, - kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) *v1.ConfigMap { + kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) *v1.ConfigMap { - err, data := renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter, logConfig) + err, data := renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter, logConfig) if err != nil { return nil } @@ -238,7 +238,7 @@ func configMapContentChanged(old, new *v1.ConfigMap) bool { return false } -func renderEsYml(w io.Writer, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, systemCallFilter string) error { +func renderEsYml(w io.Writer, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, systemCallFilter string) error { t := template.New("elasticsearch.yml") config := esYmlTmpl t, err := t.Parse(config) @@ -246,11 +246,11 @@ func renderEsYml(w io.Writer, kibanaIndexMode, esUnicastHost, nodeQuorum, recove return err } esy := esYmlStruct{ - KibanaIndexMode: kibanaIndexMode, - EsUnicastHost: esUnicastHost, - NodeQuorum: nodeQuorum, - RecoverExpectedShards: recoverExpectedShards, - SystemCallFilter: systemCallFilter, + KibanaIndexMode: kibanaIndexMode, + EsUnicastHost: esUnicastHost, + NodeQuorum: nodeQuorum, + RecoverExpectedNodes: recoverExpectedNodes, + SystemCallFilter: systemCallFilter, } return t.Execute(w, esy) diff --git a/pkg/k8shandler/configuration_tmpl.go b/pkg/k8shandler/configuration_tmpl.go index 30bf16c23..3afc2dcab 100644 --- a/pkg/k8shandler/configuration_tmpl.go +++ b/pkg/k8shandler/configuration_tmpl.go @@ -25,7 +25,7 @@ discovery.zen: gateway: recover_after_nodes: {{.NodeQuorum}} - expected_nodes: {{.RecoverExpectedShards}} + expected_nodes: {{.RecoverExpectedNodes}} recover_after_time: ${RECOVER_AFTER_TIME} path: diff --git a/pkg/k8shandler/defaults.go b/pkg/k8shandler/defaults.go index 73a033f4c..876ea5b0d 100644 --- a/pkg/k8shandler/defaults.go +++ b/pkg/k8shandler/defaults.go @@ -20,7 +20,8 @@ const ( defaultESProxyMemoryLimit = "64Mi" defaultESProxyMemoryRequest = "64Mi" - maxMasterCount = 3 + maxMasterCount = 3 + maxPrimaryShardCount = 5 elasticsearchCertsPath = "/etc/openshift/elasticsearch/secret" elasticsearchConfigPath = "/usr/share/java/elasticsearch/config" @@ -48,8 +49,19 @@ func esUnicastHost(clusterName, namespace string) string { return fmt.Sprintf("%v-cluster.%v.svc", clusterName, namespace) } +func calculatePrimaryCount(dpl *api.Elasticsearch) int { + dataNodeCount := int(getDataCount(dpl)) + if dataNodeCount > maxPrimaryShardCount { + return maxPrimaryShardCount + } + + // we can just return this without error checking because we validate + // we have at least one data node in the cluster + return dataNodeCount +} + func calculateReplicaCount(dpl *api.Elasticsearch) int { - dataNodeCount := int((getDataCount(dpl))) + dataNodeCount := int(getDataCount(dpl)) repType := dpl.Spec.RedundancyPolicy switch repType { case api.FullRedundancy: diff --git a/pkg/k8shandler/defaults_test.go b/pkg/k8shandler/defaults_test.go new file mode 100644 index 000000000..d00830995 --- /dev/null +++ b/pkg/k8shandler/defaults_test.go @@ -0,0 +1,67 @@ +package k8shandler + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + api "github.com/openshift/elasticsearch-operator/pkg/apis/logging/v1" +) + +var ( + dpl *api.Elasticsearch + dataNodeCount int + dataNode api.ElasticsearchNode +) + +var _ = Describe("defaults", func() { + defer GinkgoRecover() + + BeforeEach(func() { + + dataNode = api.ElasticsearchNode{ + Roles: []api.ElasticsearchNodeRole{ + api.ElasticsearchRoleClient, + api.ElasticsearchRoleData, + }, + } + }) + + Describe("#getPrimaryShardCount with excess data nodes", func() { + + JustBeforeEach(func() { + + dataNodeCount = 20 + dataNode.NodeCount = int32(dataNodeCount) + + dpl = &api.Elasticsearch{ + Spec: api.ElasticsearchSpec{ + Nodes: []api.ElasticsearchNode{ + dataNode, + }, + }, + } + }) + It("should return maxPrimaryShardCount", func() { + Expect(calculatePrimaryCount(dpl)).To(Equal(maxPrimaryShardCount)) + }) + }) + + Describe("#getPrimaryShardCount with 3 data nodes", func() { + + JustBeforeEach(func() { + + dataNodeCount = 3 + dataNode.NodeCount = int32(dataNodeCount) + + dpl = &api.Elasticsearch{ + Spec: api.ElasticsearchSpec{ + Nodes: []api.ElasticsearchNode{ + dataNode, + }, + }, + } + }) + It("should return data node count", func() { + Expect(calculatePrimaryCount(dpl)).To(Equal(dataNodeCount)) + }) + }) +}) diff --git a/pkg/k8shandler/elasticsearch.go b/pkg/k8shandler/elasticsearch.go index c84c58e5d..ce0c8a39c 100644 --- a/pkg/k8shandler/elasticsearch.go +++ b/pkg/k8shandler/elasticsearch.go @@ -91,3 +91,12 @@ func (er *ElasticsearchRequest) updateReplicas() { } } } + +func (er *ElasticsearchRequest) updatePrimaryShards() { + if er.ClusterReady() { + primaryCount := int32(calculatePrimaryCount(er.cluster)) + if err := er.esClient.UpdateTemplatePrimaryShards(primaryCount); err != nil { + er.L().Error(err, "Unable to update primary count") + } + } +} diff --git a/pkg/types/elasticsearch/types.go b/pkg/types/elasticsearch/types.go index ea92640e3..cca888edf 100644 --- a/pkg/types/elasticsearch/types.go +++ b/pkg/types/elasticsearch/types.go @@ -50,6 +50,38 @@ type IndexTemplate struct { Aliases map[string]IndexAlias `json:"aliases,omitempty"` } +type GetIndexTemplate struct { + Order int32 `json:"order,omitempty"` + IndexPatterns []string `json:"index_patterns,omitempty"` + Settings GetIndexTemplateSettings `json:"settings,omitempty"` + Aliases map[string]IndexAlias `json:"aliases,omitempty"` + Mappings map[string]IndexMappingSettings `json:"mappings,omitempty"` +} + +type GetIndexTemplateSettings struct { + Index IndexTemplateSettings `json:"index,omitempty"` +} + +type IndexTemplateSettings struct { + Unassigned UnassignedIndexSetting `json:"unassigned,omitempty"` + Translog TranslogIndexSetting `json:"translog,omitempty"` + RefreshInterval string `json:"refresh_interval,omitempty"` + NumberOfShards string `json:"number_of_shards,omitempty"` + NumberOfReplicas string `json:"number_of_replicas,omitempty"` +} + +type UnassignedIndexSetting struct { + NodeLeft NodeLeftSetting `json:"node_left,omitempty"` +} + +type NodeLeftSetting struct { + DelayedTimeout string `json:"delayed_timeout,omitempty"` +} + +type TranslogIndexSetting struct { + FlushThresholdSize string `json:"flush_threshold_size,omitempty"` +} + type Aliases struct { } diff --git a/test/utils/utils.go b/test/utils/utils.go index 897ae2bba..64bc2bc3c 100644 --- a/test/utils/utils.go +++ b/test/utils/utils.go @@ -2,6 +2,7 @@ package utils import ( "context" + "fmt" "io/ioutil" "reflect" "strconv" @@ -279,6 +280,8 @@ func WaitForIndexTemplateReplicas(t *testing.T, kubeclient kubernetes.Interface, mockClient := fake.NewFakeClient(getMockedSecret(clusterName, namespace)) esClient := elasticsearch.NewClient(clusterName, namespace, mockClient) + stringReplicas := fmt.Sprintf("%d", replicas) + err := wait.Poll(retryInterval, timeout, func() (done bool, err error) { // get all index replica count indexTemplates, err := esClient.GetIndexTemplates() @@ -289,21 +292,15 @@ func WaitForIndexTemplateReplicas(t *testing.T, kubeclient kubernetes.Interface, // for each index -- check replica count for templateName, template := range indexTemplates { - if numberOfReplicas := parseString("settings.index.number_of_replicas", template.(map[string]interface{})); numberOfReplicas != "" { - currentReplicas, err := strconv.ParseInt(numberOfReplicas, 10, 32) - if err != nil { - return false, err - } + currentReplicas := template.Settings.Index.NumberOfReplicas - if int32(currentReplicas) == replicas { - continue - } - - t.Logf("Index template %s did not have correct replica count (%d/%d)", templateName, currentReplicas, replicas) - return false, nil - } else { - return false, nil + if currentReplicas == stringReplicas { + continue } + + t.Logf("Index template %s did not have correct replica count (%s/%d)", templateName, currentReplicas, replicas) + return false, nil + } return true, nil