From 7215ca573396398b55aee9f74adb76808d4bd81c Mon Sep 17 00:00:00 2001 From: Richard Wall Date: Tue, 13 Mar 2018 15:56:59 +0000 Subject: [PATCH] Remove the loadbalanced CQL service and document an alternative way to connect. In no particular order: * Removed the CQL service. * Added a headless `nodes` service. * Renamed the `seedprovider` service to `seeds`. * Added documentation explaining the two headless services and how to connect a CQL client. * Refactored the two headless services into a single service control * Removed the service update code, which was not tested and which added unecessary complication. * Navigator will now only touch the services if they are missing. * Updated the E2E tests so that `cql_connect` always attempts to connect to the `nodes` service. * Removed the ServiceName from the NodePool statefulsets because it no longer made sense with multiple NodePools / StatefulSets. The SS servicename is supposed to match a service dedicated to that statefulset, not a single seedprovider service. We probably should dynamically create a service for each nodepool. * Removed the, also broken, CASSANDRA_SEEDS configuration which was pointing `seedProviderServiceName` rather than at a service name matching the name of the statefulset. * In E2E tests Reverted to a better mechanism for simulating node failure. Decommission leaves the node in a decommissioned state causing the C* process to immediately exit. * Added CONSISTENCY checks to the E2E CQL queries to verify that both C* nodes are reachable and have the test data. Fixes: #232 --- docs/cassandra.rst | 61 +++++++++ docs/quick-start/cassandra-cluster.yaml | 1 - hack/e2e.sh | 77 ++++++------ hack/libe2e.sh | 25 ++-- hack/testdata/cass-cluster-test.template.yaml | 1 - internal/test/unit/framework/state_fixture.go | 1 + pkg/apis/navigator/types.go | 1 - pkg/apis/navigator/v1alpha1/types.go | 2 - .../v1alpha1/zz_generated.conversion.go | 2 - pkg/controllers/cassandra/cassandra.go | 9 +- pkg/controllers/cassandra/cluster_control.go | 14 +-- .../cassandra/nodepool/resource.go | 17 +-- .../cassandra/seedlabeller/control.go | 8 +- .../cassandra/seedlabeller/control_test.go | 6 +- pkg/controllers/cassandra/service/control.go | 117 ++++++++++++++++++ .../cassandra/service/control_test.go | 99 +++++++++++++++ .../cassandra/service/cql/control.go | 44 ------- .../cassandra/service/cql/control_test.go | 36 ------ .../cassandra/service/cql/resource.go | 34 ----- .../cassandra/service/seedprovider/control.go | 45 ------- .../service/seedprovider/control_test.go | 36 ------ .../service/seedprovider/resource.go | 59 --------- .../cassandra/service/testing/testing.go | 60 --------- .../cassandra/service/util/util.go | 55 -------- pkg/controllers/cassandra/testing/testing.go | 27 ++-- pkg/controllers/cassandra/util/util.go | 13 +- pkg/controllers/executor.go | 1 + 27 files changed, 372 insertions(+), 479 deletions(-) create mode 100644 pkg/controllers/cassandra/service/control.go create mode 100644 pkg/controllers/cassandra/service/control_test.go delete mode 100644 pkg/controllers/cassandra/service/cql/control.go delete mode 100644 pkg/controllers/cassandra/service/cql/control_test.go delete mode 100644 pkg/controllers/cassandra/service/cql/resource.go delete mode 100644 pkg/controllers/cassandra/service/seedprovider/control.go delete mode 100644 pkg/controllers/cassandra/service/seedprovider/control_test.go delete mode 100644 pkg/controllers/cassandra/service/seedprovider/resource.go delete mode 100644 pkg/controllers/cassandra/service/testing/testing.go delete mode 100644 pkg/controllers/cassandra/service/util/util.go diff --git a/docs/cassandra.rst b/docs/cassandra.rst index 17504ba16..e86edb1e3 100644 --- a/docs/cassandra.rst +++ b/docs/cassandra.rst @@ -116,3 +116,64 @@ The ``resources`` field follows exactly the same specification as the Kubernetes (``pod.spec.containers[].resources``). See `Managing Compute Resources for Containers `_ for more information. + + +Connecting to Cassandra +----------------------- + +If you apply the YAML manifest from the example above, +Navigator will create a Cassandra cluster with three C* nodes running in three pods. +The IP addresses assigned to each C* node may change when pods are rescheduled or restarted, but there are stable DNS names which allow you to connect to the cluster. + +Services and DNS Names +~~~~~~~~~~~~~~~~~~~~~~ + +Navigator creates two `headless services `_ for every Cassandra cluster that it creates. +Each service has a corresponding DNS domain name: + +#. The *nodes* service (e.g. ``cass-demo-nodes``) has a DNS domain name which resolves to the IP addresses of **all** the C* nodes in cluster (nodes 0, 1, and 2 in this example). +#. The *seeds* service (e.g. ``cass-demo-seeds``) has a DNS domain name which resolves to the IP addresses of **only** the `seed nodes `_ (node 0 in this example). + +These DNS names have multiple HOST (`A`) records, one for each **healthy** C* node IP address. + +.. note:: + The DNS server only includes `healthy `_ nodes when answering requests for these two services. + +The DNS names can be resolved from any pod in the Kubernetes cluster: + +* If the pod is in the same namespace as the Cassandra cluster you need only use the left most label of the DNS name. E.g. ``cass-demo-nodes``. +* If the pod is in a different namespace you must use the fully qualified DNS name. E.g. ``cass-demo-nodes.my-namespace.svc.cluster.local``. + +.. note:: + Read `DNS for Services and Pods `_ for more information about DNS in Kubernetes. + +TCP Ports +~~~~~~~~~ + +The C* nodes all listen on the following TCP ports: + +#. **9042**: For CQL client connections. +#. **8080**: For Prometheus client connections. + +Connect using a CQL Client +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Navigator configures all the nodes in a Cassandra cluster to listen on TCP port 9042 for `CQL client connections `_. +And there are `CQL drivers for most popular programming languages `_. +Most drivers have the ability to connect to a single node and then discover all the other cluster nodes. + +For example, you could use the `Datastax Python driver `_ to connect to the Cassandra cluster as follows: + +.. code-block:: python + + from cassandra.cluster import Cluster + + cluster = Cluster(['cass-demo-nodes'], port=9042) + session = cluster.connect() + rows = session.execute('SELECT ... FROM ...') + for row in rows: + print row + +.. note:: + The IP address to which the driver makes the initial connection + depends on the DNS server and operating system configuration. diff --git a/docs/quick-start/cassandra-cluster.yaml b/docs/quick-start/cassandra-cluster.yaml index 0d6000d3d..8a6425c76 100644 --- a/docs/quick-start/cassandra-cluster.yaml +++ b/docs/quick-start/cassandra-cluster.yaml @@ -4,7 +4,6 @@ metadata: name: "demo" spec: version: "3.11.1" - cqlPort: 9042 sysctls: - "vm.max_map_count=0" nodePools: diff --git a/hack/e2e.sh b/hack/e2e.sh index 64cc90461..81a96b152 100755 --- a/hack/e2e.sh +++ b/hack/e2e.sh @@ -218,7 +218,7 @@ function test_cassandracluster() { --namespace "${namespace}" \ --filename \ <(envsubst \ - '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \ < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") then fail_test "Failed to create cassandracluster" @@ -246,31 +246,31 @@ function test_cassandracluster() { # Wait 5 minutes for cassandra to start and listen for CQL queries. if ! retry TIMEOUT=300 cql_connect \ "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9042; then + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}"; then fail_test "Navigator controller failed to create cassandracluster service" fi if ! retry TIMEOUT=300 in_cluster_command \ "${namespace}" \ "alpine:3.6" \ - /bin/sh -c "apk add --no-cache curl && curl -vv http://cass-${CASS_NAME}-ringnodes-0.cass-${CASS_NAME}-seedprovider:8080"; then + /bin/sh -c "apk add --no-cache curl && curl -vv http://cass-${CASS_NAME}-nodes:8080"; then fail_test "Pilot did not start Prometheus metric exporter" fi # Create a database cql_connect \ "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9042 \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ --debug \ < "${SCRIPT_DIR}/testdata/cassandra_test_database1.cql" # Insert a record cql_connect \ "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9042 \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ --debug \ --execute="INSERT INTO space1.testtable1(key, value) VALUES('testkey1', 'testvalue1')" @@ -282,8 +282,8 @@ function test_cassandracluster() { not \ cql_connect \ "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9042 \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ --debug # Kill the cassandra process gracefully which allows it to flush its data to disk. # kill_cassandra_process \ @@ -303,38 +303,21 @@ function test_cassandracluster() { stdout_matches "testvalue1" \ cql_connect \ "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9042 \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ --debug \ --execute='SELECT * FROM space1.testtable1' then fail_test "Cassandra data was lost" fi - # Change the CQL port - export CASS_CQL_PORT=9043 - kubectl apply \ - --namespace "${namespace}" \ - --filename \ - <(envsubst \ - '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ - < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") - - # Wait 60s for cassandra CQL port to change - if ! retry TIMEOUT=60 cql_connect \ - "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9043; then - fail_test "Navigator controller failed to update cassandracluster service" - fi - # Increment the replica count export CASS_REPLICAS=2 kubectl apply \ --namespace "${namespace}" \ --filename \ <(envsubst \ - '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_CQL_PORT:$CASS_VERSION' \ + '$NAVIGATOR_IMAGE_REPOSITORY:$NAVIGATOR_IMAGE_TAG:$NAVIGATOR_IMAGE_PULLPOLICY:$CASS_NAME:$CASS_REPLICAS:$CASS_VERSION' \ < "${SCRIPT_DIR}/testdata/cass-cluster-test.template.yaml") if ! retry TIMEOUT=300 stdout_equals 2 kubectl \ @@ -348,7 +331,7 @@ function test_cassandracluster() { # TODO: A better test would be to query the endpoints and check that only # the `-0` pods are included. E.g. - # kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seedprovider -o "jsonpath={.subsets[*].addresses[*].hostname}" + # kubectl -n test-cassandra-1519754828-19864 get ep cass-cassandra-1519754828-19864-cassandra-seeds -o "jsonpath={.subsets[*].addresses[*].hostname}" if ! stdout_equals "cass-${CASS_NAME}-ringnodes-0" \ kubectl get pods --namespace "${namespace}" \ --selector=navigator.jetstack.io/cassandra-seed=true \ @@ -357,16 +340,32 @@ function test_cassandracluster() { fail_test "First cassandra node not marked as seed" fi + if ! retry \ + stdout_matches "testvalue1" \ + cql_connect \ + "${namespace}" \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ + --debug \ + --execute='CONSISTENCY ALL; SELECT * FROM space1.testtable1' + then + fail_test "Data was not replicated to second node" + fi + simulate_unresponsive_cassandra_process \ "${namespace}" \ - "cass-${CASS_NAME}-ringnodes-0" \ - "cassandra" - - if ! retry cql_connect \ - "${namespace}" \ - "cass-${CASS_NAME}-cql" \ - 9043; then - fail_test "Cassandra readiness probe failed to bypass dead node" + "cass-${CASS_NAME}-ringnodes-0" + + if ! retry TIMEOUT=600 \ + stdout_matches "testvalue1" \ + cql_connect \ + "${namespace}" \ + "cass-${CASS_NAME}-nodes" \ + "${CASS_CQL_PORT}" \ + --debug \ + --execute='CONSISTENCY ALL; SELECT * FROM space1.testtable1' + then + fail_test "Cassandra liveness probe failed to restart dead node" fi } diff --git a/hack/libe2e.sh b/hack/libe2e.sh index 4909e57cd..5c2cba0c4 100644 --- a/hack/libe2e.sh +++ b/hack/libe2e.sh @@ -116,30 +116,33 @@ function kube_event_exists() { return 1 } -function simulate_unresponsive_cassandra_process() { - local namespace=$1 - local pod=$2 - local container=$3 - # Decommission causes cassandra to stop accepting CQL connections. +function decommission_cassandra_node() { + local namespace="${1}" + local pod="${2}" kubectl \ --namespace="${namespace}" \ - exec "${pod}" --container="${container}" -- \ + exec "${pod}" -- \ /bin/sh -c 'JVM_OPTS="" exec nodetool decommission' } function signal_cassandra_process() { - local namespace=$1 - local pod=$2 - local container=$3 - local signal=$4 + local namespace="${1}" + local pod="${2}" + local signal="${3}" # Send STOP signal to all the cassandra user's processes kubectl \ --namespace="${namespace}" \ - exec "${pod}" --container="${container}" -- \ + exec "${pod}" -- \ bash -c "kill -${signal}"' -- $(ps -u cassandra -o pid=) && ps faux' } +function simulate_unresponsive_cassandra_process() { + local namespace="${1}" + local pod="${2}" + signal_cassandra_process "${namespace}" "${pod}" "SIGSTOP" +} + function stdout_equals() { local expected="${1}" shift diff --git a/hack/testdata/cass-cluster-test.template.yaml b/hack/testdata/cass-cluster-test.template.yaml index bb814adbd..7bea36465 100644 --- a/hack/testdata/cass-cluster-test.template.yaml +++ b/hack/testdata/cass-cluster-test.template.yaml @@ -4,7 +4,6 @@ metadata: name: "${CASS_NAME}" spec: version: "${CASS_VERSION}" - cqlPort: ${CASS_CQL_PORT} sysctls: - "vm.max_map_count=0" nodePools: diff --git a/internal/test/unit/framework/state_fixture.go b/internal/test/unit/framework/state_fixture.go index 02acf5bca..e3223895f 100644 --- a/internal/test/unit/framework/state_fixture.go +++ b/internal/test/unit/framework/state_fixture.go @@ -60,6 +60,7 @@ func (s *StateFixture) Start() { ConfigMapLister: s.kubeSharedInformerFactory.Core().V1().ConfigMaps().Lister(), PilotLister: s.navigatorSharedInformerFactory.Navigator().V1alpha1().Pilots().Lister(), PodLister: s.kubeSharedInformerFactory.Core().V1().Pods().Lister(), + ServiceLister: s.kubeSharedInformerFactory.Core().V1().Services().Lister(), } s.stopCh = make(chan struct{}) s.kubeSharedInformerFactory.Start(s.stopCh) diff --git a/pkg/apis/navigator/types.go b/pkg/apis/navigator/types.go index a52ab51ff..8a5334d3a 100644 --- a/pkg/apis/navigator/types.go +++ b/pkg/apis/navigator/types.go @@ -32,7 +32,6 @@ type CassandraClusterSpec struct { NodePools []CassandraClusterNodePool Version version.Version Image *ImageSpec - CqlPort int32 } type CassandraClusterNodePool struct { diff --git a/pkg/apis/navigator/v1alpha1/types.go b/pkg/apis/navigator/v1alpha1/types.go index ac67790c3..87c4b1ed8 100644 --- a/pkg/apis/navigator/v1alpha1/types.go +++ b/pkg/apis/navigator/v1alpha1/types.go @@ -37,8 +37,6 @@ type CassandraClusterSpec struct { // Image describes the database image to use Image *ImageSpec `json:"image"` - CqlPort int32 `json:"cqlPort"` - // The version of the database to be used for nodes in the cluster. Version version.Version `json:"version"` } diff --git a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go index 32ea83527..d89054d22 100644 --- a/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/navigator/v1alpha1/zz_generated.conversion.go @@ -224,7 +224,6 @@ func autoConvert_v1alpha1_CassandraClusterSpec_To_navigator_CassandraClusterSpec } out.NodePools = *(*[]navigator.CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools)) out.Image = (*navigator.ImageSpec)(unsafe.Pointer(in.Image)) - out.CqlPort = in.CqlPort out.Version = in.Version return nil } @@ -241,7 +240,6 @@ func autoConvert_navigator_CassandraClusterSpec_To_v1alpha1_CassandraClusterSpec out.NodePools = *(*[]CassandraClusterNodePool)(unsafe.Pointer(&in.NodePools)) out.Version = in.Version out.Image = (*ImageSpec)(unsafe.Pointer(in.Image)) - out.CqlPort = in.CqlPort return nil } diff --git a/pkg/controllers/cassandra/cassandra.go b/pkg/controllers/cassandra/cassandra.go index c24cd88f1..62ea7a501 100644 --- a/pkg/controllers/cassandra/cassandra.go +++ b/pkg/controllers/cassandra/cassandra.go @@ -29,8 +29,7 @@ import ( "github.com/jetstack/navigator/pkg/controllers/cassandra/role" "github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" - servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql" - serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" + "github.com/jetstack/navigator/pkg/controllers/cassandra/service" "github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount" ) @@ -98,15 +97,17 @@ func NewCassandra( cc.rolesListerSynced = roles.Informer().HasSynced cc.roleBindingsListerSynced = roleBindings.Informer().HasSynced cc.control = NewControl( - serviceseedprovider.NewControl( + service.NewControl( kubeClient, services.Lister(), recorder, + service.NodesServiceForCluster, ), - servicecql.NewControl( + service.NewControl( kubeClient, services.Lister(), recorder, + service.SeedsServiceForCluster, ), nodepool.NewControl( kubeClient, diff --git a/pkg/controllers/cassandra/cluster_control.go b/pkg/controllers/cassandra/cluster_control.go index 2336e38b8..5fad1581f 100644 --- a/pkg/controllers/cassandra/cluster_control.go +++ b/pkg/controllers/cassandra/cluster_control.go @@ -11,8 +11,6 @@ import ( "github.com/jetstack/navigator/pkg/controllers/cassandra/role" "github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" - servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql" - serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" "github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount" ) @@ -39,8 +37,8 @@ type ControlInterface interface { var _ ControlInterface = &defaultCassandraClusterControl{} type defaultCassandraClusterControl struct { - seedProviderServiceControl serviceseedprovider.Interface - cqlServiceControl servicecql.Interface + seedProviderServiceControl ControlInterface + nodesServiceControl ControlInterface nodepoolControl nodepool.Interface pilotControl pilot.Interface serviceAccountControl serviceaccount.Interface @@ -51,8 +49,8 @@ type defaultCassandraClusterControl struct { } func NewControl( - seedProviderServiceControl serviceseedprovider.Interface, - cqlServiceControl servicecql.Interface, + seedProviderServiceControl ControlInterface, + nodesServiceControl ControlInterface, nodepoolControl nodepool.Interface, pilotControl pilot.Interface, serviceAccountControl serviceaccount.Interface, @@ -63,7 +61,7 @@ func NewControl( ) ControlInterface { return &defaultCassandraClusterControl{ seedProviderServiceControl: seedProviderServiceControl, - cqlServiceControl: cqlServiceControl, + nodesServiceControl: nodesServiceControl, nodepoolControl: nodepoolControl, pilotControl: pilotControl, serviceAccountControl: serviceAccountControl, @@ -87,7 +85,7 @@ func (e *defaultCassandraClusterControl) Sync(c *v1alpha1.CassandraCluster) erro ) return err } - err = e.cqlServiceControl.Sync(c) + err = e.nodesServiceControl.Sync(c) if err != nil { e.recorder.Eventf( c, diff --git a/pkg/controllers/cassandra/nodepool/resource.go b/pkg/controllers/cassandra/nodepool/resource.go index e5ceea47b..40ed1cfe3 100644 --- a/pkg/controllers/cassandra/nodepool/resource.go +++ b/pkg/controllers/cassandra/nodepool/resource.go @@ -35,7 +35,6 @@ func StatefulSetForCluster( ) *apps.StatefulSet { statefulSetName := util.NodePoolResourceName(cluster, np) - seedProviderServiceName := util.SeedProviderServiceName(cluster) nodePoolLabels := util.NodePoolLabels(cluster, np.Name) image := cassImageToUse(&cluster.Spec) @@ -49,8 +48,7 @@ func StatefulSetForCluster( OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, }, Spec: apps.StatefulSetSpec{ - Replicas: util.Int32Ptr(int32(np.Replicas)), - ServiceName: seedProviderServiceName, + Replicas: util.Int32Ptr(int32(np.Replicas)), Selector: &metav1.LabelSelector{ MatchLabels: nodePoolLabels, }, @@ -167,7 +165,7 @@ func StatefulSetForCluster( }, { Name: "cql", - ContainerPort: util.DefaultCqlPort, + ContainerPort: int32(9042), }, { Name: "prometheus", @@ -195,22 +193,13 @@ func StatefulSetForCluster( Name: "HEAP_NEWSIZE", Value: "100M", }, - { - Name: "CASSANDRA_SEEDS", - Value: fmt.Sprintf( - "%s-0.%s.%s.svc.cluster.local", - statefulSetName, - seedProviderServiceName, - cluster.Namespace, - ), - }, { Name: "CASSANDRA_ENDPOINT_SNITCH", Value: cassSnitch, }, { Name: "CASSANDRA_SERVICE", - Value: seedProviderServiceName, + Value: util.SeedsServiceName(cluster), }, { Name: "CASSANDRA_CLUSTER_NAME", diff --git a/pkg/controllers/cassandra/seedlabeller/control.go b/pkg/controllers/cassandra/seedlabeller/control.go index 81d6e8351..32ac9a5bf 100644 --- a/pkg/controllers/cassandra/seedlabeller/control.go +++ b/pkg/controllers/cassandra/seedlabeller/control.go @@ -11,7 +11,7 @@ import ( "k8s.io/client-go/tools/record" "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" - "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" + "github.com/jetstack/navigator/pkg/controllers/cassandra/service" "github.com/jetstack/navigator/pkg/controllers/cassandra/util" ) @@ -53,14 +53,14 @@ func (c *defaultSeedLabeller) labelSeedNodes( return nil } labels := pod.Labels - value := labels[seedprovider.SeedLabelKey] - if value == seedprovider.SeedLabelValue { + value := labels[service.SeedLabelKey] + if value == service.SeedLabelValue { return nil } if labels == nil { labels = map[string]string{} } - labels[seedprovider.SeedLabelKey] = seedprovider.SeedLabelValue + labels[service.SeedLabelKey] = service.SeedLabelValue podCopy := pod.DeepCopy() podCopy.SetLabels(labels) _, err = c.kubeClient.CoreV1().Pods(podCopy.Namespace).Update(podCopy) diff --git a/pkg/controllers/cassandra/seedlabeller/control_test.go b/pkg/controllers/cassandra/seedlabeller/control_test.go index 033085cdf..fad694c16 100644 --- a/pkg/controllers/cassandra/seedlabeller/control_test.go +++ b/pkg/controllers/cassandra/seedlabeller/control_test.go @@ -12,7 +12,7 @@ import ( "github.com/jetstack/navigator/pkg/controllers" "github.com/jetstack/navigator/pkg/controllers/cassandra/nodepool" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" - "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" + "github.com/jetstack/navigator/pkg/controllers/cassandra/service" casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" ) @@ -24,7 +24,7 @@ func CheckSeedLabel(podName, podNamespace string, t *testing.T, state *controlle if err != nil { t.Fatal(err) } - if p.Labels[seedprovider.SeedLabelKey] != seedprovider.SeedLabelValue { + if p.Labels[service.SeedLabelKey] != service.SeedLabelValue { t.Errorf("unexpected seed label: %s", p.Labels) } } @@ -42,7 +42,7 @@ func TestSeedLabellerSync(t *testing.T) { pod0LabelMissing := pod0.DeepCopy() pod0LabelMissing.SetLabels(map[string]string{}) pod0ValueIncorrect := pod0LabelMissing.DeepCopy() - pod0ValueIncorrect.Labels[seedprovider.SeedLabelKey] = "blah" + pod0ValueIncorrect.Labels[service.SeedLabelKey] = "blah" type testT struct { kubeObjects []runtime.Object diff --git a/pkg/controllers/cassandra/service/control.go b/pkg/controllers/cassandra/service/control.go new file mode 100644 index 000000000..749542239 --- /dev/null +++ b/pkg/controllers/cassandra/service/control.go @@ -0,0 +1,117 @@ +package service + +import ( + "fmt" + + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + + "github.com/golang/glog" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/jetstack/navigator/pkg/controllers/cassandra/util" + + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/record" + + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" +) + +const ( + SeedLabelKey = "navigator.jetstack.io/cassandra-seed" + SeedLabelValue = "true" +) + +type serviceFactory func(*v1alpha1.CassandraCluster) *apiv1.Service + +type control struct { + kubeClient kubernetes.Interface + serviceLister corelisters.ServiceLister + recorder record.EventRecorder + serviceFactory serviceFactory +} + +func NewControl( + kubeClient kubernetes.Interface, + serviceLister corelisters.ServiceLister, + recorder record.EventRecorder, + serviceFactory serviceFactory, +) *control { + return &control{ + kubeClient: kubeClient, + serviceLister: serviceLister, + recorder: recorder, + serviceFactory: serviceFactory, + } +} + +func (c *control) Sync(cluster *v1alpha1.CassandraCluster) error { + service := c.serviceFactory(cluster) + _, err := c.serviceLister.Services(service.Namespace).Get(service.Name) + if err == nil { + glog.V(4).Infof("Service already exists: %s", service.Name) + return nil + } + if !k8sErrors.IsNotFound(err) { + return err + } + glog.V(4).Infof("Creating service: %s", service.Name) + _, err = c.kubeClient.CoreV1().Services(service.Namespace).Create(service) + if k8sErrors.IsAlreadyExists(err) { + glog.V(4).Infof("Service exists: %s", service.Name) + return nil + } + return err +} + +func NodesServiceForCluster( + cluster *v1alpha1.CassandraCluster, +) *apiv1.Service { + labels := util.ClusterLabels(cluster) + return &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-nodes", util.ResourceBaseName(cluster)), + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, + Labels: labels, + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "None", + Type: apiv1.ServiceTypeClusterIP, + Selector: labels, + // Headless service should not require a port. + // But without it, DNS records are not registered. + // See https://github.com/kubernetes/kubernetes/issues/55158 + Ports: []apiv1.ServicePort{{Port: 65535}}, + }, + } +} + +var _ serviceFactory = NodesServiceForCluster + +func SeedsServiceForCluster(cluster *v1alpha1.CassandraCluster) *apiv1.Service { + labels := util.ClusterLabels(cluster) + service := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: util.SeedsServiceName(cluster), + Namespace: cluster.Namespace, + OwnerReferences: []metav1.OwnerReference{util.NewControllerRef(cluster)}, + Labels: labels, + }, + Spec: apiv1.ServiceSpec{ + ClusterIP: "None", + Type: apiv1.ServiceTypeClusterIP, + Selector: labels, + // Headless service should not require a port. + // But without it, DNS records are not registered. + // See https://github.com/kubernetes/kubernetes/issues/55158 + Ports: []apiv1.ServicePort{{Port: 65535}}, + }, + } + // Only mark nodes explicitly labeled as seeds as seed nodes + service.Spec.Selector[SeedLabelKey] = SeedLabelValue + return service +} + +var _ serviceFactory = SeedsServiceForCluster diff --git a/pkg/controllers/cassandra/service/control_test.go b/pkg/controllers/cassandra/service/control_test.go new file mode 100644 index 000000000..a4d78e6e7 --- /dev/null +++ b/pkg/controllers/cassandra/service/control_test.go @@ -0,0 +1,99 @@ +package service_test + +import ( + "testing" + + casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" + + "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/jetstack/navigator/internal/test/unit/framework" + v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" + "github.com/jetstack/navigator/pkg/controllers" + "github.com/jetstack/navigator/pkg/controllers/cassandra/service" +) + +func TestSync(t *testing.T) { + cluster1 := casstesting.ClusterForTest() + service1 := service.NodesServiceForCluster(cluster1) + serviceFactory := service.NodesServiceForCluster + + type testT struct { + kubeObjects []runtime.Object + navObjects []runtime.Object + cluster *v1alpha1.CassandraCluster + fixtureManipulator func(*testing.T, *framework.StateFixture) + assertions func(*testing.T, *controllers.State, testT) + expectErr bool + } + + tests := map[string]testT{ + "create service": { + cluster: cluster1, + assertions: func(t *testing.T, state *controllers.State, test testT) { + expectedService := service1 + _, err := state.Clientset. + CoreV1(). + Services(expectedService.Namespace). + Get(expectedService.Name, v1.GetOptions{}) + if err != nil { + t.Error(err) + } + }, + }, + + "service exists": { + kubeObjects: []runtime.Object{service1}, + cluster: cluster1, + }, + "not yet listed": { + kubeObjects: []runtime.Object{}, + cluster: cluster1, + fixtureManipulator: func(t *testing.T, fixture *framework.StateFixture) { + _, err := fixture.KubeClient().CoreV1().Services(service1.Namespace).Create(service1) + if err != nil { + t.Fatal(err) + } + }, + }, + } + + for title, test := range tests { + t.Run( + title, + func(t *testing.T) { + fixture := &framework.StateFixture{ + T: t, + KubeObjects: test.kubeObjects, + NavigatorObjects: test.navObjects, + } + fixture.Start() + defer fixture.Stop() + if test.fixtureManipulator != nil { + test.fixtureManipulator(t, fixture) + } + state := fixture.State() + c := service.NewControl( + state.Clientset, + state.ServiceLister, + state.Recorder, + serviceFactory, + ) + err := c.Sync(test.cluster) + if err == nil { + if test.expectErr { + t.Error("Expected an error") + } + } else { + if !test.expectErr { + t.Error(err) + } + } + if test.assertions != nil { + test.assertions(t, state, test) + } + }, + ) + } +} diff --git a/pkg/controllers/cassandra/service/cql/control.go b/pkg/controllers/cassandra/service/cql/control.go deleted file mode 100644 index a6c291ae8..000000000 --- a/pkg/controllers/cassandra/service/cql/control.go +++ /dev/null @@ -1,44 +0,0 @@ -package cql - -import ( - "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/record" - - v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" - serviceutil "github.com/jetstack/navigator/pkg/controllers/cassandra/service/util" -) - -type Interface interface { - Sync(*v1alpha1.CassandraCluster) error -} - -type defaultCassandraClusterServiceControl struct { - kubeClient kubernetes.Interface - serviceLister corelisters.ServiceLister - recorder record.EventRecorder -} - -var _ Interface = &defaultCassandraClusterServiceControl{} - -func NewControl( - kubeClient kubernetes.Interface, - serviceLister corelisters.ServiceLister, - recorder record.EventRecorder, -) Interface { - return &defaultCassandraClusterServiceControl{ - kubeClient: kubeClient, - serviceLister: serviceLister, - recorder: recorder, - } -} - -func (e *defaultCassandraClusterServiceControl) Sync(cluster *v1alpha1.CassandraCluster) error { - return serviceutil.SyncService( - cluster, - e.kubeClient, - e.serviceLister, - ServiceForCluster, - updateServiceForCluster, - ) -} diff --git a/pkg/controllers/cassandra/service/cql/control_test.go b/pkg/controllers/cassandra/service/cql/control_test.go deleted file mode 100644 index cb43a7cf6..000000000 --- a/pkg/controllers/cassandra/service/cql/control_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package cql_test - -import ( - "fmt" - "testing" - - apiv1 "k8s.io/api/core/v1" - - "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql" - servicetesting "github.com/jetstack/navigator/pkg/controllers/cassandra/service/testing" - casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" -) - -func newFixture(t *testing.T) *casstesting.Fixture { - f := casstesting.NewFixture(t) - f.SeedProviderServiceControl = &casstesting.FakeControl{} - return f -} - -func newService(f *casstesting.Fixture) *apiv1.Service { - return cql.ServiceForCluster(f.Cluster) -} - -func TestCqlServiceSync(t *testing.T) { - servicetesting.RunStandardServiceTests(t, newFixture, newService) - t.Run( - "sync error", - func(t *testing.T) { - f := newFixture(t) - f.CqlServiceControl = &casstesting.FakeControl{ - SyncError: fmt.Errorf("simulated sync error"), - } - f.RunExpectError() - }, - ) -} diff --git a/pkg/controllers/cassandra/service/cql/resource.go b/pkg/controllers/cassandra/service/cql/resource.go deleted file mode 100644 index c15d6ec38..000000000 --- a/pkg/controllers/cassandra/service/cql/resource.go +++ /dev/null @@ -1,34 +0,0 @@ -package cql - -import ( - apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/intstr" - - v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" - serviceutil "github.com/jetstack/navigator/pkg/controllers/cassandra/service/util" - "github.com/jetstack/navigator/pkg/controllers/cassandra/util" -) - -func ServiceForCluster( - cluster *v1alpha1.CassandraCluster, -) *apiv1.Service { - return updateServiceForCluster(cluster, &apiv1.Service{}) -} - -func updateServiceForCluster( - cluster *v1alpha1.CassandraCluster, - service *apiv1.Service, -) *apiv1.Service { - service = service.DeepCopy() - service = serviceutil.SetStandardServiceAttributes(cluster, service) - service.SetName(util.CqlServiceName(cluster)) - service.Spec.Type = apiv1.ServiceTypeClusterIP - service.Spec.Ports = []apiv1.ServicePort{ - { - Name: "cql", - Port: cluster.Spec.CqlPort, - TargetPort: intstr.FromInt(util.DefaultCqlPort), - }, - } - return service -} diff --git a/pkg/controllers/cassandra/service/seedprovider/control.go b/pkg/controllers/cassandra/service/seedprovider/control.go deleted file mode 100644 index 591f2efcf..000000000 --- a/pkg/controllers/cassandra/service/seedprovider/control.go +++ /dev/null @@ -1,45 +0,0 @@ -package seedprovider - -import ( - serviceutil "github.com/jetstack/navigator/pkg/controllers/cassandra/service/util" - - "k8s.io/client-go/kubernetes" - corelisters "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/record" - - v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" -) - -type Interface interface { - Sync(*v1alpha1.CassandraCluster) error -} - -type defaultCassandraClusterServiceControl struct { - kubeClient kubernetes.Interface - serviceLister corelisters.ServiceLister - recorder record.EventRecorder -} - -var _ Interface = &defaultCassandraClusterServiceControl{} - -func NewControl( - kubeClient kubernetes.Interface, - serviceLister corelisters.ServiceLister, - recorder record.EventRecorder, -) Interface { - return &defaultCassandraClusterServiceControl{ - kubeClient: kubeClient, - serviceLister: serviceLister, - recorder: recorder, - } -} - -func (e *defaultCassandraClusterServiceControl) Sync(cluster *v1alpha1.CassandraCluster) error { - return serviceutil.SyncService( - cluster, - e.kubeClient, - e.serviceLister, - ServiceForCluster, - updateServiceForCluster, - ) -} diff --git a/pkg/controllers/cassandra/service/seedprovider/control_test.go b/pkg/controllers/cassandra/service/seedprovider/control_test.go deleted file mode 100644 index 7598b344b..000000000 --- a/pkg/controllers/cassandra/service/seedprovider/control_test.go +++ /dev/null @@ -1,36 +0,0 @@ -package seedprovider_test - -import ( - "fmt" - "testing" - - apiv1 "k8s.io/api/core/v1" - - "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" - servicetesting "github.com/jetstack/navigator/pkg/controllers/cassandra/service/testing" - casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" -) - -func newFixture(t *testing.T) *casstesting.Fixture { - f := casstesting.NewFixture(t) - f.CqlServiceControl = &casstesting.FakeControl{} - return f -} - -func newService(f *casstesting.Fixture) *apiv1.Service { - return seedprovider.ServiceForCluster(f.Cluster) -} - -func TestSeedProviderServiceSync(t *testing.T) { - servicetesting.RunStandardServiceTests(t, newFixture, newService) - t.Run( - "sync error", - func(t *testing.T) { - f := newFixture(t) - f.SeedProviderServiceControl = &casstesting.FakeControl{ - SyncError: fmt.Errorf("simulated sync error"), - } - f.RunExpectError() - }, - ) -} diff --git a/pkg/controllers/cassandra/service/seedprovider/resource.go b/pkg/controllers/cassandra/service/seedprovider/resource.go deleted file mode 100644 index d4f68ac4f..000000000 --- a/pkg/controllers/cassandra/service/seedprovider/resource.go +++ /dev/null @@ -1,59 +0,0 @@ -package seedprovider - -import ( - apiv1 "k8s.io/api/core/v1" - - v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" - serviceutil "github.com/jetstack/navigator/pkg/controllers/cassandra/service/util" - "github.com/jetstack/navigator/pkg/controllers/cassandra/util" -) - -const ( - TolerateUnreadyEndpointsAnnotationKey = "service.alpha.kubernetes.io/tolerate-unready-endpoints" - SeedLabelKey = "navigator.jetstack.io/cassandra-seed" - SeedLabelValue = "true" -) - -func ServiceForCluster( - cluster *v1alpha1.CassandraCluster, -) *apiv1.Service { - return updateServiceForCluster(cluster, &apiv1.Service{}) -} - -func updateServiceForCluster( - cluster *v1alpha1.CassandraCluster, - service *apiv1.Service, -) *apiv1.Service { - service = service.DeepCopy() - service = serviceutil.SetStandardServiceAttributes(cluster, service) - service.SetName(util.SeedProviderServiceName(cluster)) - service.Spec.Type = apiv1.ServiceTypeClusterIP - service.Spec.ClusterIP = "None" - - // Only mark nodes explicitly labeled as seeds as seed nodes - service.Spec.Selector[SeedLabelKey] = SeedLabelValue - - // Headless service should not require a port. - // But without it, DNS records are not registered. - // See https://github.com/kubernetes/kubernetes/issues/55158 - service.Spec.Ports = []apiv1.ServicePort{ - { - Port: 65535, - }, - } - // This ensures that DNS names are published regardless of whether the - // Cassandra pod ReadinessProbes (listening on their CQL port). - // It won't handle CQL connections until it has successfully connected and - // negotiated with a seed host - service.Spec.PublishNotReadyAddresses = true - // XXX: This annotation is only necessary for Kubernetes <=1.7, which do not - // pay attention to the field above. - // Remove it when we no longer support those versions. - annotations := service.GetAnnotations() - if annotations == nil { - annotations = map[string]string{} - } - annotations[TolerateUnreadyEndpointsAnnotationKey] = "true" - service.SetAnnotations(annotations) - return service -} diff --git a/pkg/controllers/cassandra/service/testing/testing.go b/pkg/controllers/cassandra/service/testing/testing.go deleted file mode 100644 index eb5e58e23..000000000 --- a/pkg/controllers/cassandra/service/testing/testing.go +++ /dev/null @@ -1,60 +0,0 @@ -package testing - -import ( - "testing" - - apiv1 "k8s.io/api/core/v1" - - casstesting "github.com/jetstack/navigator/pkg/controllers/cassandra/testing" -) - -type FixtureFactory func(t *testing.T) *casstesting.Fixture -type ServiceFactory func(f *casstesting.Fixture) *apiv1.Service - -func RunStandardServiceTests(t *testing.T, newFixture FixtureFactory, newService ServiceFactory) { - t.Run( - "service created", - func(t *testing.T) { - f := newFixture(t) - f.Run() - f.AssertServicesLength(1) - }, - ) - t.Run( - "service exists", - func(t *testing.T) { - f := newFixture(t) - f.AddObjectK(newService(f)) - f.Run() - f.AssertServicesLength(1) - }, - ) - t.Run( - "service needs sync", - func(t *testing.T) { - f := newFixture(t) - // Remove the ports from the default cluster and expect them to be - // re-created. - unsyncedService := newService(f) - unsyncedService.Spec.Selector = map[string]string{} - f.AddObjectK(unsyncedService) - f.Run() - services := f.Services() - service := services.Items[0] - if len(service.Spec.Selector) == 0 { - t.Log(service) - t.Error("Service was not updated") - } - }, - ) - t.Run( - "service with outside owner", - func(t *testing.T) { - f := newFixture(t) - unownedService := newService(f) - unownedService.OwnerReferences = nil - f.AddObjectK(unownedService) - f.RunExpectError() - }, - ) -} diff --git a/pkg/controllers/cassandra/service/util/util.go b/pkg/controllers/cassandra/service/util/util.go deleted file mode 100644 index f3b993767..000000000 --- a/pkg/controllers/cassandra/service/util/util.go +++ /dev/null @@ -1,55 +0,0 @@ -package util - -import ( - k8sErrors "k8s.io/apimachinery/pkg/api/errors" - corelisters "k8s.io/client-go/listers/core/v1" - - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - - v1alpha1 "github.com/jetstack/navigator/pkg/apis/navigator/v1alpha1" - "github.com/jetstack/navigator/pkg/controllers/cassandra/util" -) - -func SetStandardServiceAttributes( - cluster *v1alpha1.CassandraCluster, - service *apiv1.Service, -) *apiv1.Service { - service.SetNamespace(cluster.Namespace) - service.SetLabels(util.ClusterLabels(cluster)) - service.SetOwnerReferences([]metav1.OwnerReference{ - util.NewControllerRef(cluster), - }) - service.Spec.Selector = util.NodePoolLabels(cluster, "") - return service -} - -type ServiceCreator func(cluster *v1alpha1.CassandraCluster) *apiv1.Service -type ServiceUpdater func(cluster *v1alpha1.CassandraCluster, service *apiv1.Service) *apiv1.Service - -func SyncService( - cluster *v1alpha1.CassandraCluster, - kubeClient kubernetes.Interface, - serviceLister corelisters.ServiceLister, - createService ServiceCreator, - updateService ServiceUpdater, -) error { - svc := createService(cluster) - client := kubeClient.CoreV1().Services(svc.Namespace) - existingSvc, err := serviceLister.Services(svc.Namespace).Get(svc.Name) - if k8sErrors.IsNotFound(err) { - _, err = client.Create(svc) - return err - } - if err != nil { - return err - } - err = util.OwnerCheck(existingSvc, cluster) - if err != nil { - return err - } - updatedService := updateService(cluster, existingSvc) - _, err = client.Update(updatedService) - return err -} diff --git a/pkg/controllers/cassandra/testing/testing.go b/pkg/controllers/cassandra/testing/testing.go index bd0967bc3..91e2c4be3 100644 --- a/pkg/controllers/cassandra/testing/testing.go +++ b/pkg/controllers/cassandra/testing/testing.go @@ -14,8 +14,7 @@ import ( "github.com/jetstack/navigator/pkg/controllers/cassandra/role" "github.com/jetstack/navigator/pkg/controllers/cassandra/rolebinding" "github.com/jetstack/navigator/pkg/controllers/cassandra/seedlabeller" - servicecql "github.com/jetstack/navigator/pkg/controllers/cassandra/service/cql" - serviceseedprovider "github.com/jetstack/navigator/pkg/controllers/cassandra/service/seedprovider" + "github.com/jetstack/navigator/pkg/controllers/cassandra/service" "github.com/jetstack/navigator/pkg/controllers/cassandra/serviceaccount" "k8s.io/api/core/v1" @@ -34,7 +33,6 @@ import ( func ClusterForTest() *v1alpha1.CassandraCluster { c := &v1alpha1.CassandraCluster{ Spec: v1alpha1.CassandraClusterSpec{ - CqlPort: 9042, NodePools: []v1alpha1.CassandraClusterNodePool{ v1alpha1.CassandraClusterNodePool{ Name: "RingNodes", @@ -51,8 +49,8 @@ func ClusterForTest() *v1alpha1.CassandraCluster { type Fixture struct { t *testing.T Cluster *v1alpha1.CassandraCluster - SeedProviderServiceControl serviceseedprovider.Interface - CqlServiceControl servicecql.Interface + SeedProviderServiceControl cassandra.ControlInterface + NodesServiceControl cassandra.ControlInterface NodepoolControl nodepool.Interface PilotControl pilot.Interface ServiceAccountControl serviceaccount.Interface @@ -98,12 +96,21 @@ func (f *Fixture) setupAndSync() error { services := k8sFactory.Core().V1().Services().Lister() if f.SeedProviderServiceControl == nil { - f.SeedProviderServiceControl = serviceseedprovider.NewControl(f.k8sClient, services, recorder) + f.SeedProviderServiceControl = service.NewControl( + f.k8sClient, + services, + recorder, + service.SeedsServiceForCluster, + ) } - if f.CqlServiceControl == nil { - f.CqlServiceControl = servicecql.NewControl(f.k8sClient, services, recorder) + if f.NodesServiceControl == nil { + f.NodesServiceControl = service.NewControl( + f.k8sClient, + services, + recorder, + service.NodesServiceForCluster, + ) } - statefulSets := k8sFactory.Apps().V1beta1().StatefulSets().Lister() pods := k8sFactory.Core().V1().Pods().Lister() if f.NodepoolControl == nil { @@ -163,7 +170,7 @@ func (f *Fixture) setupAndSync() error { c := cassandra.NewControl( f.SeedProviderServiceControl, - f.CqlServiceControl, + f.NodesServiceControl, f.NodepoolControl, f.PilotControl, f.ServiceAccountControl, diff --git a/pkg/controllers/cassandra/util/util.go b/pkg/controllers/cassandra/util/util.go index 3956ae341..b425e898b 100644 --- a/pkg/controllers/cassandra/util/util.go +++ b/pkg/controllers/cassandra/util/util.go @@ -19,7 +19,6 @@ const ( kindName = "CassandraCluster" ClusterNameLabelKey = "navigator.jetstack.io/cassandra-cluster-name" NodePoolNameLabelKey = "navigator.jetstack.io/cassandra-node-pool-name" - DefaultCqlPort = 9042 ) func NewControllerRef(c *v1alpha1.CassandraCluster) metav1.OwnerReference { @@ -38,12 +37,8 @@ func NodePoolResourceName(c *v1alpha1.CassandraCluster, np *v1alpha1.CassandraCl return fmt.Sprintf("%s-%s", ResourceBaseName(c), np.Name) } -func SeedProviderServiceName(c *v1alpha1.CassandraCluster) string { - return fmt.Sprintf("%s-seedprovider", ResourceBaseName(c)) -} - -func CqlServiceName(c *v1alpha1.CassandraCluster) string { - return fmt.Sprintf("%s-cql", ResourceBaseName(c)) +func SeedsServiceName(c *v1alpha1.CassandraCluster) string { + return fmt.Sprintf("%s-seeds", ResourceBaseName(c)) } func ServiceAccountName(c *v1alpha1.CassandraCluster) string { @@ -78,9 +73,7 @@ func NodePoolLabels( poolName string, ) map[string]string { labels := ClusterLabels(c) - if poolName != "" { - labels[NodePoolNameLabelKey] = poolName - } + labels[NodePoolNameLabelKey] = poolName return labels } diff --git a/pkg/controllers/executor.go b/pkg/controllers/executor.go index 2db2eb952..cc931daba 100644 --- a/pkg/controllers/executor.go +++ b/pkg/controllers/executor.go @@ -23,6 +23,7 @@ type State struct { ConfigMapLister corelisters.ConfigMapLister PilotLister listers.PilotLister PodLister corelisters.PodLister + ServiceLister corelisters.ServiceLister } type Action interface {