Skip to content
Merged
9 changes: 7 additions & 2 deletions docs/Examples.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,20 @@ spec:
EOF
```

This will automatically configure a cluster master with a single indexer
peer.
This will automatically configure a cluster with RF(replication_factor) number of indexer peers.

NOTE: Whenever we try to specify `replicas` on IndexerCluster CR less than RF(as set on ClusterMaster),
the operator will always scale the number of peers to either `replication_factor`(in case of single site indexer cluster)
or to `origin` count in `site_replication_factor`(in case of multi-site indexer cluster).

```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
splunk-cm-cluster-master-0 1/1 Running 0 29s
splunk-default-monitoring-console-0 1/1 Running 0 15s
splunk-example-indexer-0 1/1 Running 0 29s
splunk-example-indexer-1 1/1 Running 0 29s
splunk-example-indexer-2 1/1 Running 0 29s
splunk-operator-7c5599546c-wt4xl 1/1 Running 0 14h
```
Notes:
Expand Down
4 changes: 3 additions & 1 deletion pkg/splunk/client/enterprise.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,7 +894,9 @@ func (c *SplunkClient) UpdateMonitoringConsoleApp() error {

//ClusterInfo is the struct for checking ClusterInfo
type ClusterInfo struct {
MultiSite string `json:"multisite"`
MultiSite string `json:"multisite"`
ReplicationFactor int32 `json:"replication_factor"`
SiteReplicationFactor string `json:"site_replication_factor,omitempty"`
}

// GetClusterInfo queries the cluster about multi-site or single-site.
Expand Down
1 change: 1 addition & 0 deletions pkg/splunk/enterprise/finalizers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func splunkDeletionTester(t *testing.T, cr splcommon.MetaObject, delete func(spl
{MetaName: "*v1.StatefulSet-test-splunk-test-monitoring-console"},
{MetaName: "*v1.Secret-test-splunk-test-secret"},
{MetaName: "*v1.Secret-test-splunk-test-secret"},
{MetaName: "*v1beta1.ClusterMaster-test-master1"},
{MetaName: "*v1.Secret-test-splunk-test-secret"},
}
}
Expand Down
80 changes: 66 additions & 14 deletions pkg/splunk/enterprise/indexercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"errors"
"fmt"
"regexp"
"strconv"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -73,6 +75,26 @@ func ApplyIndexerCluster(client splcommon.ControllerClient, cr *enterprisev1.Ind
return result, err
}

namespacedName := types.NamespacedName{
Namespace: cr.GetNamespace(),
Name: cr.Spec.ClusterMasterRef.Name,
}
masterIdxCluster := &enterprisev1.ClusterMaster{}
err = client.Get(context.TODO(), namespacedName, masterIdxCluster)
if err == nil {
cr.Status.ClusterMasterPhase = masterIdxCluster.Status.Phase
} else {
cr.Status.ClusterMasterPhase = splcommon.PhaseError
}
mgr := indexerClusterPodManager{log: scopedLog, cr: cr, secrets: namespaceScopedSecret, newSplunkClient: splclient.NewSplunkClient}
// Check if we have configured enough number(<= RF) of replicas
if mgr.cr.Status.ClusterMasterPhase == splcommon.PhaseReady {
err = mgr.verifyRFPeers(client)
if err != nil {
return result, err
}
}

// check if deletion has been requested
if cr.ObjectMeta.DeletionTimestamp != nil {
DeleteOwnerReferencesForResources(client, cr, nil)
Expand All @@ -97,24 +119,12 @@ func ApplyIndexerCluster(client splcommon.ControllerClient, cr *enterprisev1.Ind
return result, err
}

namespacedName := types.NamespacedName{
Namespace: cr.GetNamespace(),
Name: cr.Spec.ClusterMasterRef.Name,
}
masterIdxCluster := &enterprisev1.ClusterMaster{}
err = client.Get(context.TODO(), namespacedName, masterIdxCluster)
if err == nil {
cr.Status.ClusterMasterPhase = masterIdxCluster.Status.Phase
} else {
cr.Status.ClusterMasterPhase = splcommon.PhaseError
}

// create or update statefulset for the indexers
statefulSet, err := getIndexerStatefulSet(client, cr)
if err != nil {
return result, err
}
mgr := indexerClusterPodManager{log: scopedLog, cr: cr, secrets: namespaceScopedSecret, newSplunkClient: splclient.NewSplunkClient}

phase, err := mgr.Update(client, statefulSet, cr.Spec.Replicas)
if err != nil {
return result, err
Expand Down Expand Up @@ -278,12 +288,20 @@ func ApplyIdxcSecret(mgr *indexerClusterPodManager, replicas int32, mock bool) e

// Update for indexerClusterPodManager handles all updates for a statefulset of indexers
func (mgr *indexerClusterPodManager) Update(c splcommon.ControllerClient, statefulSet *appsv1.StatefulSet, desiredReplicas int32) (splcommon.Phase, error) {

var err error
//Don't even try to create a statefulset and secret if CM is not ready yet.
if mgr.cr.Status.ClusterMasterPhase != splcommon.PhaseReady {
mgr.log.Error(err, "Cluster Master is not ready yet")
return splcommon.PhaseError, err
}

// Assign client
if mgr.c == nil {
mgr.c = c
}
// update statefulset, if necessary
_, err := splctrl.ApplyStatefulSet(mgr.c, statefulSet)
_, err = splctrl.ApplyStatefulSet(mgr.c, statefulSet)
if err != nil {
return splcommon.PhaseError, err
}
Expand Down Expand Up @@ -414,6 +432,40 @@ func (mgr *indexerClusterPodManager) getClusterMasterClient() *splclient.SplunkC
return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd)
}

// getSiteRepFactorOriginCount gets the origin count of the site_replication_factor
func getSiteRepFactorOriginCount(siteRepFactor string) int32 {
re := regexp.MustCompile(".*origin:(?P<rf>.*),.*")
match := re.FindStringSubmatch(siteRepFactor)
siteRF, _ := strconv.Atoi(match[1])
return int32(siteRF)
}

// verifyRFPeers verifies the number of peers specified in the replicas section
// of IndexerClsuster CR. If it is less than RF, than we set it to RF.
func (mgr *indexerClusterPodManager) verifyRFPeers(c splcommon.ControllerClient) error {
if mgr.c == nil {
mgr.c = c
}
cm := mgr.getClusterMasterClient()
clusterInfo, err := cm.GetClusterInfo(false)
if err != nil {
return fmt.Errorf("Could not get cluster info from cluster master")
}
var replicationFactor int32
// if it is a multisite indexer cluster, check site_replication_factor
if clusterInfo.MultiSite == "true" {
replicationFactor = getSiteRepFactorOriginCount(clusterInfo.SiteReplicationFactor)
} else { // for single site, check replication factor
replicationFactor = clusterInfo.ReplicationFactor
}

if mgr.cr.Spec.Replicas < replicationFactor {
mgr.log.Info("Changing number of replicas as it is less than RF number of peers", "replicas", mgr.cr.Spec.Replicas)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: Maybe can change the logging to something like "Increasing number of indexer replicas to match RF number of peers" or something similar

mgr.cr.Spec.Replicas = replicationFactor
}
return nil
}

// updateStatus for indexerClusterPodManager uses the REST API to update the status for an IndexerCluster custom resource
func (mgr *indexerClusterPodManager) updateStatus(statefulSet *appsv1.StatefulSet) error {
mgr.cr.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas
Expand Down
Loading