Skip to content
5 changes: 3 additions & 2 deletions pkg/apis/elasticsearch/v1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ const (

// ElasticsearchNodeSpec represents configuration of an individual Elasticsearch node
type ElasticsearchNodeSpec struct {
Image string `json:"image,omitempty"`
Resources v1.ResourceRequirements `json:"resources"`
Image string `json:"image,omitempty"`
Resources v1.ResourceRequirements `json:"resources"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
}

type ElasticsearchRequiredAction string
Expand Down
12 changes: 12 additions & 0 deletions pkg/apis/elasticsearch/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/k8shandler/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func newPodTemplateSpec(nodeName, clusterName, namespace string, node api.Elasti
),
proxyContainer,
},
NodeSelector: node.NodeSelector,
NodeSelector: mergeSelectors(node.NodeSelector, commonSpec.NodeSelector),
ServiceAccountName: clusterName,
Volumes: newVolumes(clusterName, nodeName, namespace, node),
},
Expand Down
13 changes: 13 additions & 0 deletions pkg/k8shandler/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ func (node *deploymentNode) create() error {
if err != nil {
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("Could not create node resource: %v", err)
} else {
return node.pause()
}
}

Expand Down Expand Up @@ -487,13 +489,24 @@ func (node *deploymentNode) isChanged() bool {
changed := false

desired := node.self.DeepCopy()

// we want to blank this out before a get to ensure we get the correct information back (possible sdk issue with maps?)
node.self.Spec = apps.DeploymentSpec{}

err := sdk.Get(&node.self)
// error check that it exists, etc
if err != nil {
// if it doesn't exist, return true
return false
}

// check the pod's nodeselector
if !areSelectorsSame(node.self.Spec.Template.Spec.NodeSelector, desired.Spec.Template.Spec.NodeSelector) {
logrus.Debugf("Resource '%s' has different nodeSelector than desired", node.self.Name)
node.self.Spec.Template.Spec.NodeSelector = desired.Spec.Template.Spec.NodeSelector
changed = true
}

// Only Image and Resources (CPU & memory) differences trigger rolling restart
for index := 0; index < len(node.self.Spec.Template.Spec.Containers); index++ {
nodeContainer := node.self.Spec.Template.Spec.Containers[index]
Expand Down
11 changes: 8 additions & 3 deletions pkg/k8shandler/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func CreateOrUpdateServices(dpl *api.Elasticsearch) error {
annotations,
true,
ownerRef,
map[string]string{},
)
if err != nil {
return fmt.Errorf("Failure creating service %v", err)
Expand All @@ -43,6 +44,7 @@ func CreateOrUpdateServices(dpl *api.Elasticsearch) error {
annotations,
false,
ownerRef,
map[string]string{},
)
if err != nil {
return fmt.Errorf("Failure creating service %v", err)
Expand All @@ -54,21 +56,24 @@ func CreateOrUpdateServices(dpl *api.Elasticsearch) error {
dpl.Namespace,
dpl.Name,
"metrics",
9200,
60000,
selectorForES("es-node-client", dpl.Name),
annotations,
false,
ownerRef,
map[string]string{
"scrape-metrics": "enabled",
},
)
if err != nil {
return fmt.Errorf("Failure creating service %v", err)
}
return nil
}

func createOrUpdateService(serviceName, namespace, clusterName, targetPortName string, port int32, selector, annotations map[string]string, publishNotReady bool, owner metav1.OwnerReference) error {
func createOrUpdateService(serviceName, namespace, clusterName, targetPortName string, port int32, selector, annotations map[string]string, publishNotReady bool, owner metav1.OwnerReference, labels map[string]string) error {

labels := appendDefaultLabel(clusterName, map[string]string{})
labels = appendDefaultLabel(clusterName, labels)

service := newService(
serviceName,
Expand Down
3 changes: 2 additions & 1 deletion pkg/k8shandler/service_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func CreateOrUpdateServiceMonitors(dpl *v1.Elasticsearch) error {
owner := getOwnerRef(dpl)

labelsWithDefault := appendDefaultLabel(dpl.Name, dpl.Labels)
labelsWithDefault["scrape-metrics"] = "enabled"

elasticsearchScMonitor := createServiceMonitor(serviceMonitorName, dpl.Name, dpl.Namespace, labelsWithDefault)
addOwnerRefToObject(elasticsearchScMonitor, owner)
Expand All @@ -45,7 +46,7 @@ func createServiceMonitor(serviceMonitorName, clusterName, namespace string, lab
// ServerName can be e.g. elasticsearch-metrics.openshift-logging.svc
}
endpoint := monitoringv1.Endpoint{
Port: fmt.Sprintf("%s-%s", clusterName, "metrics"),
Port: clusterName,
Path: "/_prometheus/metrics",
Scheme: "https",
BearerTokenFile: "/var/run/secrets/kubernetes.io/serviceaccount/token",
Expand Down
14 changes: 14 additions & 0 deletions pkg/k8shandler/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func (node *statefulSetNode) create() error {
if err != nil {
if !errors.IsAlreadyExists(err) {
return fmt.Errorf("Could not create node resource: %v", err)
} else {
node.scale()
return nil
}
}

Expand Down Expand Up @@ -444,13 +447,24 @@ func (node *statefulSetNode) isChanged() bool {
changed := false

desired := node.self.DeepCopy()

// we want to blank this out before a get to ensure we get the correct information back (possible sdk issue with maps?)
node.self.Spec = apps.StatefulSetSpec{}

err := sdk.Get(&node.self)
// error check that it exists, etc
if err != nil {
// if it doesn't exist, return true
return false
}

// check the pod's nodeselector
if !areSelectorsSame(node.self.Spec.Template.Spec.NodeSelector, desired.Spec.Template.Spec.NodeSelector) {
logrus.Debugf("Resource '%s' has different nodeSelector than desired", node.self.Name)
node.self.Spec.Template.Spec.NodeSelector = desired.Spec.Template.Spec.NodeSelector
changed = true
}

// Only Image and Resources (CPU & memory) differences trigger rolling restart
for index := 0; index < len(node.self.Spec.Template.Spec.Containers); index++ {
nodeContainer := node.self.Spec.Template.Spec.Containers[index]
Expand Down
4 changes: 3 additions & 1 deletion pkg/k8shandler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,9 @@ func updateNodeConditions(clusterName, namespace string, status *api.Elasticsear

isUnschedulable := false
for _, podCondition := range nodePod.Status.Conditions {
if podCondition.Type == v1.PodReasonUnschedulable {
if podCondition.Type == v1.PodScheduled && podCondition.Status == v1.ConditionFalse {
podCondition.Type = v1.PodReasonUnschedulable
podCondition.Status = v1.ConditionTrue
updatePodUnschedulableCondition(node, podCondition)
isUnschedulable = true
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/k8shandler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,35 @@ func appendDefaultLabel(clusterName string, labels map[string]string) map[string
return labels
}

func areSelectorsSame(lhs, rhs map[string]string) bool {

if len(lhs) != len(rhs) {
return false
}

for lhsKey, lhsVal := range lhs {
rhsVal, ok := rhs[lhsKey]
if !ok || lhsVal != rhsVal {
return false
}
}

return true
}

func mergeSelectors(nodeSelectors, commonSelectors map[string]string) map[string]string {

if commonSelectors == nil {
commonSelectors = make(map[string]string)
}

for k, v := range nodeSelectors {
commonSelectors[k] = v
}

return commonSelectors
}

// getPodNames returns the pod names of the array of pods passed in
func getPodNames(pods []v1.Pod) []string {
var podNames []string
Expand Down
106 changes: 106 additions & 0 deletions pkg/k8shandler/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package k8shandler

import (
"testing"
)

func TestSelectorsBothUndefined(t *testing.T) {

commonSelector := map[string]string{}

nodeSelector := map[string]string{}

expected := map[string]string{}

actual := mergeSelectors(nodeSelector, commonSelector)

if !areSelectorsSame(actual, expected) {
t.Errorf("Expected %v but got %v", expected, actual)
}
}

func TestSelectorsCommonDefined(t *testing.T) {

commonSelector := map[string]string{
"common": "test",
}

nodeSelector := map[string]string{}

expected := map[string]string{
"common": "test",
}

actual := mergeSelectors(nodeSelector, commonSelector)

if !areSelectorsSame(actual, expected) {
t.Errorf("Expected %v but got %v", expected, actual)
}
}

func TestSelectorsNodeDefined(t *testing.T) {

commonSelector := map[string]string{}

nodeSelector := map[string]string{
"node": "test",
}

expected := map[string]string{
"node": "test",
}

actual := mergeSelectors(nodeSelector, commonSelector)

if !areSelectorsSame(actual, expected) {
t.Errorf("Expected %v but got %v", expected, actual)
}
}

func TestSelectorsCommonAndNodeDefined(t *testing.T) {

commonSelector := map[string]string{
"common": "test",
}

nodeSelector := map[string]string{
"node": "test",
}

expected := map[string]string{
"common": "test",
"node": "test",
}

actual := mergeSelectors(nodeSelector, commonSelector)

if !areSelectorsSame(actual, expected) {
t.Errorf("Expected %v but got %v", expected, actual)
}
}

func TestSelectorsCommonOverwritten(t *testing.T) {

commonSelector := map[string]string{
"common": "test",
"node": "test",
"test": "common",
}

nodeSelector := map[string]string{
"common": "node",
"test": "node",
}

expected := map[string]string{
"common": "node",
"node": "test",
"test": "node",
}

actual := mergeSelectors(nodeSelector, commonSelector)

if !areSelectorsSame(actual, expected) {
t.Errorf("Expected %v but got %v", expected, actual)
}
}