Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type Client interface {
CreateIndexTemplate(name string, template *estypes.IndexTemplate) error
DeleteIndexTemplate(name string) error
ListTemplates() (sets.String, error)
GetIndexTemplates() (map[string]interface{}, error)
GetIndexTemplates() (map[string]estypes.GetIndexTemplate, error)
UpdateTemplatePrimaryShards(shardCount int32) error

SetSendRequestFn(fn FnEsSendRequest)
}
Expand Down
44 changes: 0 additions & 44 deletions pkg/elasticsearch/replicas.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -64,49 +63,6 @@ func (ec *esClient) GetIndexReplicaCounts() (map[string]interface{}, error) {
return payload.ResponseBody, payload.Error
}

func (ec *esClient) updateAllIndexTemplateReplicas(replicaCount int32) (bool, error) {

// get the index template and then update the replica and put it
indexTemplates, _ := ec.GetIndexTemplates()

for templateName := range indexTemplates {

if template, ok := indexTemplates[templateName].(map[string]interface{}); ok {
if settings, ok := template["settings"].(map[string]interface{}); ok {
if index, ok := settings["index"].(map[string]interface{}); ok {
currentReplicas, ok := index["number_of_replicas"].(string)

if ok && currentReplicas != fmt.Sprintf("%d", replicaCount) {
template["settings"].(map[string]interface{})["index"].(map[string]interface{})["number_of_replicas"] = fmt.Sprintf("%d", replicaCount)

templateJson, _ := json.Marshal(template)

payload := &EsRequest{
Method: http.MethodPut,
URI: fmt.Sprintf("_template/%s", templateName),
RequestBody: string(templateJson),
}

ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient)

acknowledged := false
if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok {
acknowledged = acknowledgedBool
}

if !(payload.StatusCode == 200 && acknowledged) {
log.Error(payload.Error, "unable to update tmeplate", "template", templateName)
}
}
}
}
}

}

return true, nil
}

func (ec *esClient) updateIndexReplicas(index string, replicaCount int32) (bool, error) {
payload := &EsRequest{
Method: http.MethodPut,
Expand Down
99 changes: 97 additions & 2 deletions pkg/elasticsearch/templates.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"net/http"

"github.com/openshift/elasticsearch-operator/pkg/log"
estypes "github.com/openshift/elasticsearch-operator/pkg/types/elasticsearch"
"github.com/openshift/elasticsearch-operator/pkg/utils"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -67,13 +69,106 @@ func (ec *esClient) ListTemplates() (sets.String, error) {
return response, nil
}

func (ec *esClient) GetIndexTemplates() (map[string]interface{}, error) {
func (ec *esClient) GetIndexTemplates() (map[string]estypes.GetIndexTemplate, error) {
payload := &EsRequest{
Method: http.MethodGet,
URI: "_template/common.*",
}

ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient)

return payload.ResponseBody, payload.Error
// unmarshal response body and return that
templates := map[string]estypes.GetIndexTemplate{}
err := json.Unmarshal([]byte(payload.RawResponseBody), &templates)
if err != nil {
return templates, fmt.Errorf("failed decoding raw response body into `map[string]estypes.GetIndexTemplate` for %s in namespace %s: %v", ec.cluster, ec.namespace, err)
}

return templates, payload.Error
}

func (ec *esClient) updateAllIndexTemplateReplicas(replicaCount int32) (bool, error) {

// get the index template and then update the replica and put it
indexTemplates, err := ec.GetIndexTemplates()
if err != nil {
return false, err
}

replicaString := fmt.Sprintf("%d", replicaCount)

for templateName, template := range indexTemplates {

currentReplicas := template.Settings.Index.NumberOfReplicas
if currentReplicas != replicaString {
template.Settings.Index.NumberOfReplicas = replicaString

templateJson, err := json.Marshal(template)
if err != nil {
return false, err
}

payload := &EsRequest{
Method: http.MethodPut,
URI: fmt.Sprintf("_template/%s", templateName),
RequestBody: string(templateJson),
}

ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient)

acknowledged := false
if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok {
acknowledged = acknowledgedBool
}

if !(payload.StatusCode == 200 && acknowledged) {
log.Error(payload.Error, "unable to update template", "cluster", ec.cluster, "namespace", ec.namespace, "template", templateName)
}
}
}

return true, nil
}

func (ec *esClient) UpdateTemplatePrimaryShards(shardCount int32) error {

// get the index template and then update the shards and put it
indexTemplates, err := ec.GetIndexTemplates()
if err != nil {
return err
}

shardString := fmt.Sprintf("%d", shardCount)

for templateName, template := range indexTemplates {

currentShards := template.Settings.Index.NumberOfShards
if currentShards != shardString {
template.Settings.Index.NumberOfShards = shardString

templateJson, err := json.Marshal(template)
if err != nil {
return err
}

payload := &EsRequest{
Method: http.MethodPut,
URI: fmt.Sprintf("_template/%s", templateName),
RequestBody: string(templateJson),
}

ec.fnSendEsRequest(ec.cluster, ec.namespace, payload, ec.k8sClient)

acknowledged := false
if acknowledgedBool, ok := payload.ResponseBody["acknowledged"].(bool); ok {
acknowledged = acknowledgedBool
}

if !(payload.StatusCode == 200 && acknowledged) {
log.Error(payload.Error, "unable to update template", "cluster", ec.cluster, "namespace", ec.namespace, "template", templateName)
}
}
}

return nil
}
3 changes: 3 additions & 0 deletions pkg/k8shandler/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ func (er *ElasticsearchRequest) CreateOrUpdateElasticsearchCluster() error {
// ensure that MinMasters is (n / 2 + 1)
er.updateMinMasters()

// update our template primary shard counts in case they changed
er.updatePrimaryShards()

// ensure we always have shard allocation to All if we aren't doing an update...
er.tryEnsureAllShardAllocation()

Expand Down
32 changes: 16 additions & 16 deletions pkg/k8shandler/configmaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ const (

// esYmlStruct is used to render esYmlTmpl to a proper elasticsearch.yml format
type esYmlStruct struct {
KibanaIndexMode string
EsUnicastHost string
NodeQuorum string
RecoverExpectedShards string
SystemCallFilter string
KibanaIndexMode string
EsUnicastHost string
NodeQuorum string
RecoverExpectedNodes string
SystemCallFilter string
}

type log4j2PropertiesStruct struct {
Expand Down Expand Up @@ -114,7 +114,7 @@ func (er *ElasticsearchRequest) CreateOrUpdateConfigMaps() (err error) {
esUnicastHost(dpl.Name, dpl.Namespace),
strconv.Itoa(masterNodeCount/2+1),
strconv.Itoa(dataNodeCount),
strconv.Itoa(dataNodeCount),
strconv.Itoa(calculatePrimaryCount(dpl)),
strconv.Itoa(calculateReplicaCount(dpl)),
strconv.FormatBool(runtime.GOARCH == "amd64"),
logConfig,
Expand Down Expand Up @@ -166,11 +166,11 @@ func (er *ElasticsearchRequest) CreateOrUpdateConfigMaps() (err error) {
return nil
}

func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) (error, map[string]string) {
func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) (error, map[string]string) {

data := map[string]string{}
buf := &bytes.Buffer{}
if err := renderEsYml(buf, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, systemCallFilter); err != nil {
if err := renderEsYml(buf, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, systemCallFilter); err != nil {
return err, data
}
data[esConfig] = buf.String()
Expand All @@ -192,9 +192,9 @@ func renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShard

// newConfigMap returns a v1.ConfigMap object
func newConfigMap(configMapName, namespace string, labels map[string]string,
kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) *v1.ConfigMap {
kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter string, logConfig LogConfig) *v1.ConfigMap {

err, data := renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, primaryShardsCount, replicaShardsCount, systemCallFilter, logConfig)
err, data := renderData(kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, primaryShardsCount, replicaShardsCount, systemCallFilter, logConfig)
if err != nil {
return nil
}
Expand Down Expand Up @@ -238,19 +238,19 @@ func configMapContentChanged(old, new *v1.ConfigMap) bool {
return false
}

func renderEsYml(w io.Writer, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedShards, systemCallFilter string) error {
func renderEsYml(w io.Writer, kibanaIndexMode, esUnicastHost, nodeQuorum, recoverExpectedNodes, systemCallFilter string) error {
t := template.New("elasticsearch.yml")
config := esYmlTmpl
t, err := t.Parse(config)
if err != nil {
return err
}
esy := esYmlStruct{
KibanaIndexMode: kibanaIndexMode,
EsUnicastHost: esUnicastHost,
NodeQuorum: nodeQuorum,
RecoverExpectedShards: recoverExpectedShards,
SystemCallFilter: systemCallFilter,
KibanaIndexMode: kibanaIndexMode,
EsUnicastHost: esUnicastHost,
NodeQuorum: nodeQuorum,
RecoverExpectedNodes: recoverExpectedNodes,
SystemCallFilter: systemCallFilter,
}

return t.Execute(w, esy)
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8shandler/configuration_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ discovery.zen:

gateway:
recover_after_nodes: {{.NodeQuorum}}
expected_nodes: {{.RecoverExpectedShards}}
expected_nodes: {{.RecoverExpectedNodes}}
recover_after_time: ${RECOVER_AFTER_TIME}

path:
Expand Down
16 changes: 14 additions & 2 deletions pkg/k8shandler/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const (
defaultESProxyMemoryLimit = "64Mi"
defaultESProxyMemoryRequest = "64Mi"

maxMasterCount = 3
maxMasterCount = 3
maxPrimaryShardCount = 5

elasticsearchCertsPath = "/etc/openshift/elasticsearch/secret"
elasticsearchConfigPath = "/usr/share/java/elasticsearch/config"
Expand Down Expand Up @@ -48,8 +49,19 @@ func esUnicastHost(clusterName, namespace string) string {
return fmt.Sprintf("%v-cluster.%v.svc", clusterName, namespace)
}

func calculatePrimaryCount(dpl *api.Elasticsearch) int {
dataNodeCount := int(getDataCount(dpl))
if dataNodeCount > maxPrimaryShardCount {
return maxPrimaryShardCount
}

// we can just return this without error checking because we validate
// we have at least one data node in the cluster
return dataNodeCount
}

func calculateReplicaCount(dpl *api.Elasticsearch) int {
dataNodeCount := int((getDataCount(dpl)))
dataNodeCount := int(getDataCount(dpl))
repType := dpl.Spec.RedundancyPolicy
switch repType {
case api.FullRedundancy:
Expand Down
67 changes: 67 additions & 0 deletions pkg/k8shandler/defaults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package k8shandler

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
api "github.com/openshift/elasticsearch-operator/pkg/apis/logging/v1"
)

var (
dpl *api.Elasticsearch
dataNodeCount int
dataNode api.ElasticsearchNode
)

var _ = Describe("defaults", func() {
defer GinkgoRecover()

BeforeEach(func() {

dataNode = api.ElasticsearchNode{
Roles: []api.ElasticsearchNodeRole{
api.ElasticsearchRoleClient,
api.ElasticsearchRoleData,
},
}
})

Describe("#getPrimaryShardCount with excess data nodes", func() {

JustBeforeEach(func() {

dataNodeCount = 20
dataNode.NodeCount = int32(dataNodeCount)

dpl = &api.Elasticsearch{
Spec: api.ElasticsearchSpec{
Nodes: []api.ElasticsearchNode{
dataNode,
},
},
}
})
It("should return maxPrimaryShardCount", func() {
Expect(calculatePrimaryCount(dpl)).To(Equal(maxPrimaryShardCount))
})
})

Describe("#getPrimaryShardCount with 3 data nodes", func() {

JustBeforeEach(func() {

dataNodeCount = 3
dataNode.NodeCount = int32(dataNodeCount)

dpl = &api.Elasticsearch{
Spec: api.ElasticsearchSpec{
Nodes: []api.ElasticsearchNode{
dataNode,
},
},
}
})
It("should return data node count", func() {
Expect(calculatePrimaryCount(dpl)).To(Equal(dataNodeCount))
})
})
})
9 changes: 9 additions & 0 deletions pkg/k8shandler/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,12 @@ func (er *ElasticsearchRequest) updateReplicas() {
}
}
}

func (er *ElasticsearchRequest) updatePrimaryShards() {
if er.ClusterReady() {
primaryCount := int32(calculatePrimaryCount(er.cluster))
if err := er.esClient.UpdateTemplatePrimaryShards(primaryCount); err != nil {
er.L().Error(err, "Unable to update primary count")
}
}
}
Loading