diff --git a/charts/postgres-operator/crds/operatorconfigurations.yaml b/charts/postgres-operator/crds/operatorconfigurations.yaml index 9725c2708..7e3b607c0 100644 --- a/charts/postgres-operator/crds/operatorconfigurations.yaml +++ b/charts/postgres-operator/crds/operatorconfigurations.yaml @@ -318,6 +318,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index af535e2c8..a4c0e4f3a 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -106,6 +106,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -113,6 +162,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 7b3dd462d..38ce85e7a 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -128,6 +128,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/charts/postgres-operator/templates/configmap.yaml b/charts/postgres-operator/templates/configmap.yaml index 0b976294e..39de1767f 100644 --- a/charts/postgres-operator/templates/configmap.yaml +++ b/charts/postgres-operator/templates/configmap.yaml @@ -19,4 +19,5 @@ data: {{ toYaml .Values.configDebug | indent 2 }} {{ toYaml .Values.configLoggingRestApi | indent 2 }} {{ toYaml .Values.configTeamsApi | indent 2 }} +{{ toYaml .Values.configConnectionPool | indent 2 }} {{- end }} diff --git a/charts/postgres-operator/templates/operatorconfiguration.yaml b/charts/postgres-operator/templates/operatorconfiguration.yaml index 06e9c7605..a3e06feba 100644 --- a/charts/postgres-operator/templates/operatorconfiguration.yaml +++ b/charts/postgres-operator/templates/operatorconfiguration.yaml @@ -33,4 +33,6 @@ configuration: {{ toYaml .Values.configLoggingRestApi | indent 4 }} scalyr: {{ toYaml .Values.configScalyr | indent 4 }} + connection_pool: +{{ toYaml .Values.configConnectionPool | indent 4 }} {{- end }} diff --git a/charts/postgres-operator/values-crd.yaml b/charts/postgres-operator/values-crd.yaml index d170e0b77..7fd1b5098 100644 --- a/charts/postgres-operator/values-crd.yaml +++ b/charts/postgres-operator/values-crd.yaml @@ -269,6 +269,25 @@ configScalyr: # Memory request value for the Scalyr sidecar scalyr_memory_request: 50Mi +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index b6f18f305..2a1a53fca 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -245,6 +245,26 @@ configTeamsApi: # URL of the Teams API service # teams_api_url: http://fake-teams-api.default.svc.cluster.local +# configure connection pooler deployment created by the operator +configConnectionPool: + # db schema to install lookup function into + connection_pool_schema: "pooler" + # db user for pooler to use + connection_pool_user: "pooler" + # docker image + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer" + # max db connections the pooler should hold + connection_pool_max_db_connections: 60 + # default pooling mode + connection_pool_mode: "transaction" + # number of pooler instances + connection_pool_number_of_instances: 2 + # default resources + connection_pool_default_cpu_request: 500m + connection_pool_default_memory_request: 100Mi + connection_pool_default_cpu_limit: "1" + connection_pool_default_memory_limit: 100Mi + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/docker/DebugDockerfile b/docker/DebugDockerfile index 76dadf6df..0c11fe3b4 100644 --- a/docker/DebugDockerfile +++ b/docker/DebugDockerfile @@ -3,8 +3,17 @@ MAINTAINER Team ACID @ Zalando # We need root certificates to deal with teams api over https RUN apk --no-cache add ca-certificates go git musl-dev -RUN go get github.com/derekparker/delve/cmd/dlv COPY build/* / -CMD ["/root/go/bin/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] +RUN addgroup -g 1000 pgo +RUN adduser -D -u 1000 -G pgo -g 'Postgres Operator' pgo + +RUN go get github.com/derekparker/delve/cmd/dlv +RUN cp /root/go/bin/dlv /dlv +RUN chown -R pgo:pgo /dlv + +USER pgo:pgo +RUN ls -l / + +CMD ["/dlv", "--listen=:7777", "--headless=true", "--api-version=2", "exec", "/postgres-operator"] diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index 92e457d7e..d1fb212ee 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -140,6 +140,11 @@ These parameters are grouped directly under the `spec` key in the manifest. is `false`, then no volume will be mounted no matter how operator was configured (so you can override the operator configuration). Optional. +* **enableConnectionPool** + Tells the operator to create a connection pool with a database. If this + field is true, a connection pool deployment will be created even if + `connectionPool` section is empty. Optional, not set by default. + * **enableLogicalBackup** Determines if the logical backup of this cluster should be taken and uploaded to S3. Default: false. Optional. @@ -360,6 +365,35 @@ CPU and memory limits for the sidecar container. memory limits for the sidecar container. Optional, overrides the `default_memory_limits` operator configuration parameter. Optional. +## Connection pool + +Parameters are grouped under the `connectionPool` top-level key and specify +configuration for connection pool. If this section is not empty, a connection +pool will be created for a database even if `enableConnectionPool` is not +present. + +* **numberOfInstances** + How many instances of connection pool to create. + +* **schema** + Schema to create for credentials lookup function. + +* **user** + User to create for connection pool to be able to connect to a database. + +* **dockerImage** + Which docker image to use for connection pool deployment. + +* **maxDBConnections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. + +* **mode** + In which mode to run connection pool, transaction or session. + +* **resources** + Resource configuration for connection pool deployment. + ## Custom TLS certificates Those parameters are grouped under the `tls` top-level key. diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index ba8e73cf8..53cd617c4 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -595,3 +595,40 @@ scalyr sidecar. In the CRD-based configuration they are grouped under the * **scalyr_memory_limit** Memory limit value for the Scalyr sidecar. The default is `500Mi`. + +## Connection pool configuration + +Parameters are grouped under the `connection_pool` top-level key and specify +default configuration for connection pool, if a postgres manifest requests it +but do not specify some of the parameters. All of them are optional with the +operator being able to provide some reasonable defaults. + +* **connection_pool_number_of_instances** + How many instances of connection pool to create. Default is 2 which is also + the required minimum. + +* **connection_pool_schema** + Schema to create for credentials lookup function. Default is `pooler`. + +* **connection_pool_user** + User to create for connection pool to be able to connect to a database. + Default is `pooler`. + +* **connection_pool_image** + Docker image to use for connection pool deployment. + Default: "registry.opensource.zalan.do/acid/pgbouncer" + +* **connection_pool_max_db_connections** + How many connections the pooler can max hold. This value is divided among the + pooler pods. Default is 60 which will make up 30 connections per pod for the + default setup with two instances. + +* **connection_pool_mode** + Default pool mode, `session` or `transaction`. Default is `transaction`. + +* **connection_pool_default_cpu_request** + **connection_pool_default_memory_reques** + **connection_pool_default_cpu_limit** + **connection_pool_default_memory_limit** + Default resource configuration for connection pool deployment. The internal + default for memory request and limit is `100Mi`, for CPU it is `500m` and `1`. diff --git a/docs/user.md b/docs/user.md index 6e71d0404..acfcb7b54 100644 --- a/docs/user.md +++ b/docs/user.md @@ -512,6 +512,59 @@ monitoring is outside the scope of operator responsibilities. See [administrator documentation](administrator.md) for details on how backups are executed. +## Connection pool + +The operator can create a database side connection pool for those applications, +where an application side pool is not feasible, but a number of connections is +high. To create a connection pool together with a database, modify the +manifest: + +```yaml +spec: + enableConnectionPool: true +``` + +This will tell the operator to create a connection pool with default +configuration, through which one can access the master via a separate service +`{cluster-name}-pooler`. In most of the cases provided default configuration +should be good enough. + +To configure a new connection pool, specify: + +``` +spec: + connectionPool: + # how many instances of connection pool to create + number_of_instances: 2 + + # in which mode to run, session or transaction + mode: "transaction" + + # schema, which operator will create to install credentials lookup + # function + schema: "pooler" + + # user, which operator will create for connection pool + user: "pooler" + + # resources for each instance + resources: + requests: + cpu: 500m + memory: 100Mi + limits: + cpu: "1" + memory: 100Mi +``` + +By default `pgbouncer` is used to create a connection pool. To find out about +pool modes see [docs](https://www.pgbouncer.org/config.html#pool_mode) (but it +should be general approach between different implementation). + +Note, that using `pgbouncer` means meaningful resource CPU limit should be less +than 1 core (there is a way to utilize more than one, but in K8S it's easier +just to spin up more instances). + ## Custom TLS certificates By default, the spilo image generates its own TLS certificate during startup. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index f6be8a600..7772ed005 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -9,6 +9,10 @@ from kubernetes import client, config +def to_selector(labels): + return ",".join(["=".join(l) for l in labels.items()]) + + class EndToEndTestCase(unittest.TestCase): ''' Test interaction of the operator with multiple K8s components. @@ -47,7 +51,8 @@ def setUpClass(cls): for filename in ["operator-service-account-rbac.yaml", "configmap.yaml", "postgres-operator.yaml"]: - k8s.create_with_kubectl("manifests/" + filename) + result = k8s.create_with_kubectl("manifests/" + filename) + print("stdout: {}, stderr: {}".format(result.stdout, result.stderr)) k8s.wait_for_operator_pod_start() @@ -55,9 +60,14 @@ def setUpClass(cls): 'default', label_selector='name=postgres-operator').items[0].spec.containers[0].image print("Tested operator image: {}".format(actual_operator_image)) # shows up after tests finish - k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") - k8s.wait_for_pod_start('spilo-role=master') - k8s.wait_for_pod_start('spilo-role=replica') + result = k8s.create_with_kubectl("manifests/minimal-postgres-manifest.yaml") + print('stdout: {}, stderr: {}'.format(result.stdout, result.stderr)) + try: + k8s.wait_for_pod_start('spilo-role=master') + k8s.wait_for_pod_start('spilo-role=replica') + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_enable_load_balancer(self): @@ -66,7 +76,7 @@ def test_enable_load_balancer(self): ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # enable load balancer services pg_patch_enable_lbs = { @@ -178,7 +188,7 @@ def test_min_resource_limits(self): Lower resource limits below configured minimum and let operator fix it ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label _, failover_targets = k8s.get_pg_nodes(cluster_label) @@ -247,7 +257,7 @@ def test_node_readiness_label(self): Remove node readiness label from master node. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' labels = 'spilo-role=master,' + cluster_label readiness_label = 'lifecycle-status' readiness_value = 'ready' @@ -289,7 +299,7 @@ def test_scaling(self): Scale up from 2 to 3 and back to 2 pods by updating the Postgres manifest at runtime. ''' k8s = self.k8s - labels = "cluster-name=acid-minimal-cluster" + labels = "application=spilo,cluster-name=acid-minimal-cluster" k8s.wait_for_pg_to_scale(3) self.assertEqual(3, k8s.count_pods_with_label(labels)) @@ -339,13 +349,99 @@ def test_service_annotations(self): } k8s.update_config(unpatch_custom_service_annotations) + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) + def test_enable_disable_connection_pool(self): + ''' + For a database without connection pool, then turns it on, scale up, + turn off and on again. Test with different ways of doing this (via + enableConnectionPool or connectionPool configuration section). At the + end turn the connection pool off to not interfere with other tests. + ''' + k8s = self.k8s + service_labels = { + 'cluster-name': 'acid-minimal-cluster', + } + pod_labels = dict({ + 'connection-pool': 'acid-minimal-cluster-pooler', + }) + + pod_selector = to_selector(pod_labels) + service_selector = to_selector(service_labels) + + try: + # enable connection pool + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + + pods = k8s.api.core_v1.list_namespaced_pod( + 'default', label_selector=pod_selector + ).items + + self.assertTrue(pods, 'No connection pool pods') + + k8s.wait_for_service(service_selector) + services = k8s.api.core_v1.list_namespaced_service( + 'default', label_selector=service_selector + ).items + services = [ + s for s in services + if s.metadata.name.endswith('pooler') + ] + + self.assertTrue(services, 'No connection pool service') + + # scale up connection pool deployment + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'connectionPool': { + 'numberOfInstances': 2, + }, + } + }) + + k8s.wait_for_running_pods(pod_selector, 2) + + # turn it off, keeping configuration section + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': False, + } + }) + k8s.wait_for_pods_to_stop(pod_selector) + + k8s.api.custom_objects_api.patch_namespaced_custom_object( + 'acid.zalan.do', 'v1', 'default', + 'postgresqls', 'acid-minimal-cluster', + { + 'spec': { + 'enableConnectionPool': True, + } + }) + k8s.wait_for_pod_start(pod_selector) + except timeout_decorator.TimeoutError: + print('Operator log: {}'.format(k8s.get_operator_log())) + raise + @timeout_decorator.timeout(TEST_TIMEOUT_SEC) def test_taint_based_eviction(self): ''' Add taint "postgres=:NoExecute" to node with master. This must cause a failover. ''' k8s = self.k8s - cluster_label = 'cluster-name=acid-minimal-cluster' + cluster_label = 'application=spilo,cluster-name=acid-minimal-cluster' # get nodes of master and replica(s) (expected target of new master) current_master_node, current_replica_nodes = k8s.get_pg_nodes(cluster_label) @@ -496,12 +592,39 @@ def wait_for_operator_pod_start(self): # for local execution ~ 10 seconds suffices time.sleep(60) + def get_operator_pod(self): + pods = self.api.core_v1.list_namespaced_pod( + 'default', label_selector='name=postgres-operator' + ).items + + if pods: + return pods[0] + + return None + + def get_operator_log(self): + operator_pod = self.get_operator_pod() + pod_name = operator_pod.metadata.name + return self.api.core_v1.read_namespaced_pod_log( + name=pod_name, + namespace='default' + ) + def wait_for_pod_start(self, pod_labels, namespace='default'): pod_phase = 'No pod running' while pod_phase != 'Running': pods = self.api.core_v1.list_namespaced_pod(namespace, label_selector=pod_labels).items if pods: pod_phase = pods[0].status.phase + + if pods and pod_phase != 'Running': + pod_name = pods[0].metadata.name + response = self.api.core_v1.read_namespaced_pod( + name=pod_name, + namespace=namespace + ) + print("Pod description {}".format(response)) + time.sleep(self.RETRY_TIMEOUT_SEC) def get_service_type(self, svc_labels, namespace='default'): @@ -531,10 +654,27 @@ def wait_for_pg_to_scale(self, number_of_instances, namespace='default'): _ = self.api.custom_objects_api.patch_namespaced_custom_object( "acid.zalan.do", "v1", namespace, "postgresqls", "acid-minimal-cluster", body) - labels = 'cluster-name=acid-minimal-cluster' + labels = 'application=spilo,cluster-name=acid-minimal-cluster' while self.count_pods_with_label(labels) != number_of_instances: time.sleep(self.RETRY_TIMEOUT_SEC) + def wait_for_running_pods(self, labels, number, namespace=''): + while self.count_pods_with_label(labels) != number: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_pods_to_stop(self, labels, namespace=''): + while self.count_pods_with_label(labels) != 0: + time.sleep(self.RETRY_TIMEOUT_SEC) + + def wait_for_service(self, labels, namespace='default'): + def get_services(): + return self.api.core_v1.list_namespaced_service( + namespace, label_selector=labels + ).items + + while not get_services(): + time.sleep(self.RETRY_TIMEOUT_SEC) + def count_pods_with_label(self, labels, namespace='default'): return len(self.api.core_v1.list_namespaced_pod(namespace, label_selector=labels).items) @@ -571,7 +711,10 @@ def update_config(self, config_map_patch): self.wait_for_operator_pod_start() def create_with_kubectl(self, path): - subprocess.run(["kubectl", "create", "-f", path]) + return subprocess.run( + ["kubectl", "create", "-f", path], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) if __name__ == '__main__': diff --git a/manifests/configmap.yaml b/manifests/configmap.yaml index 0300b5495..fdc2d5d56 100644 --- a/manifests/configmap.yaml +++ b/manifests/configmap.yaml @@ -11,6 +11,16 @@ data: cluster_history_entries: "1000" cluster_labels: application:spilo cluster_name_label: cluster-name + # connection_pool_default_cpu_limit: "1" + # connection_pool_default_cpu_request: "500m" + # connection_pool_default_memory_limit: 100Mi + # connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + # connection_pool_mode: "transaction" + # connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" # custom_service_annotations: "keyx:valuez,keya:valuea" # custom_pod_annotations: "keya:valuea,keyb:valueb" db_hosted_zone: db.example.com diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index e5bc49f83..83cd721e7 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -129,6 +129,7 @@ rules: - apps resources: - statefulsets + - deployments verbs: - create - delete diff --git a/manifests/operatorconfiguration.crd.yaml b/manifests/operatorconfiguration.crd.yaml index 7bd5c529c..4e6858af8 100644 --- a/manifests/operatorconfiguration.crd.yaml +++ b/manifests/operatorconfiguration.crd.yaml @@ -294,6 +294,47 @@ spec: pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' scalyr_server_url: type: string + connection_pool: + type: object + properties: + connection_pool_schema: + type: string + #default: "pooler" + connection_pool_user: + type: string + #default: "pooler" + connection_pool_image: + type: string + #default: "registry.opensource.zalan.do/acid/pgbouncer" + connection_pool_max_db_connections: + type: integer + #default: 60 + connection_pool_mode: + type: string + enum: + - "session" + - "transaction" + #default: "transaction" + connection_pool_number_of_instances: + type: integer + minimum: 2 + #default: 2 + connection_pool_default_cpu_limit: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "1" + connection_pool_default_cpu_request: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + #default: "500m" + connection_pool_default_memory_limit: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" + connection_pool_default_memory_request: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + #default: "100Mi" status: type: object additionalProperties: diff --git a/manifests/postgresql-operator-default-configuration.yaml b/manifests/postgresql-operator-default-configuration.yaml index 33838b2a9..d4c9b518f 100644 --- a/manifests/postgresql-operator-default-configuration.yaml +++ b/manifests/postgresql-operator-default-configuration.yaml @@ -121,3 +121,14 @@ configuration: scalyr_memory_limit: 500Mi scalyr_memory_request: 50Mi # scalyr_server_url: "" + connection_pool: + connection_pool_default_cpu_limit: "1" + connection_pool_default_cpu_request: "500m" + connection_pool_default_memory_limit: 100Mi + connection_pool_default_memory_request: 100Mi + connection_pool_image: "registry.opensource.zalan.do/acid/pgbouncer:master-5" + # connection_pool_max_db_connections: 60 + connection_pool_mode: "transaction" + connection_pool_number_of_instances: 2 + # connection_pool_schema: "pooler" + # connection_pool_user: "pooler" diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 04c789fb9..06434da14 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -70,6 +70,55 @@ spec: uid: format: uuid type: string + connectionPool: + type: object + properties: + dockerImage: + type: string + maxDBConnections: + type: integer + mode: + type: string + enum: + - "session" + - "transaction" + numberOfInstances: + type: integer + minimum: 2 + resources: + type: object + required: + - requests + - limits + properties: + limits: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + requests: + type: object + required: + - cpu + - memory + properties: + cpu: + type: string + pattern: '^(\d+m|\d+(\.\d{1,3})?)$' + memory: + type: string + pattern: '^(\d+(e\d+)?|\d+(\.\d+)?(e\d+)?[EPTGMK]i?)$' + schema: + type: string + user: + type: string databases: type: object additionalProperties: @@ -77,6 +126,8 @@ spec: # Note: usernames specified here as database owners must be declared in the users key of the spec key. dockerImage: type: string + enableConnectionPool: + type: boolean enableLogicalBackup: type: boolean enableMasterLoadBalancer: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index eff47c255..dc552d3f4 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -105,6 +105,7 @@ var OperatorConfigCRDResourceColumns = []apiextv1beta1.CustomResourceColumnDefin var min0 = 0.0 var min1 = 1.0 +var min2 = 2.0 var minDisable = -1.0 // PostgresCRDResourceValidation to check applied manifest parameters @@ -176,6 +177,76 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ }, }, }, + "connectionPool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "dockerImage": { + Type: "string", + }, + "maxDBConnections": { + Type: "integer", + }, + "mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "numberOfInstances": { + Type: "integer", + Minimum: &min2, + }, + "resources": { + Type: "object", + Required: []string{"requests", "limits"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "limits": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + "requests": { + Type: "object", + Required: []string{"cpu", "memory"}, + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "cpu": { + Type: "string", + Description: "Decimal natural followed by m, or decimal natural followed by dot followed by up to three decimal digits (precision used by Kubernetes). Must be greater than 0", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "memory": { + Type: "string", + Description: "Plain integer or fixed-point integer using one of these suffixes: E, P, T, G, M, k (with or without a tailing i). Must be greater than 0", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + }, + }, + }, + }, + "schema": { + Type: "string", + }, + "user": { + Type: "string", + }, + }, + }, "databases": { Type: "object", AdditionalProperties: &apiextv1beta1.JSONSchemaPropsOrBool{ @@ -188,6 +259,9 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ "dockerImage": { Type: "string", }, + "enableConnectionPool": { + Type: "boolean", + }, "enableLogicalBackup": { Type: "boolean", }, @@ -418,7 +492,7 @@ var PostgresCRDResourceValidation = apiextv1beta1.CustomResourceValidation{ Type: "string", }, "tls": { - Type: "object", + Type: "object", Required: []string{"secretName"}, Properties: map[string]apiextv1beta1.JSONSchemaProps{ "secretName": { @@ -1055,6 +1129,54 @@ var OperatorConfigCRDResourceValidation = apiextv1beta1.CustomResourceValidation }, }, }, + "connection_pool": { + Type: "object", + Properties: map[string]apiextv1beta1.JSONSchemaProps{ + "connection_pool_default_cpu_limit": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_cpu_request": { + Type: "string", + Pattern: "^(\\d+m|\\d+(\\.\\d{1,3})?)$", + }, + "connection_pool_default_memory_limit": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_default_memory_request": { + Type: "string", + Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$", + }, + "connection_pool_image": { + Type: "string", + }, + "connection_pool_max_db_connections": { + Type: "integer", + }, + "connection_pool_mode": { + Type: "string", + Enum: []apiextv1beta1.JSON{ + { + Raw: []byte(`"session"`), + }, + { + Raw: []byte(`"transaction"`), + }, + }, + }, + "connection_pool_number_of_instances": { + Type: "integer", + Minimum: &min2, + }, + "connection_pool_schema": { + Type: "string", + }, + "connection_pool_user": { + Type: "string", + }, + }, + }, }, }, "status": { diff --git a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go index 8463be6bd..7c1d2b7e8 100644 --- a/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go +++ b/pkg/apis/acid.zalan.do/v1/operator_configuration_type.go @@ -1,5 +1,7 @@ package v1 +// Operator configuration CRD definition, please use snake_case for field names. + import ( "github.com/zalando/postgres-operator/pkg/util/config" @@ -65,12 +67,12 @@ type KubernetesMetaConfiguration struct { // TODO: use a proper toleration structure? PodToleration map[string]string `json:"toleration,omitempty"` // TODO: use namespacedname - PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` - PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` - MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` - EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` - PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` - PodManagementPolicy string `json:"pod_management_policy,omitempty"` + PodEnvironmentConfigMap string `json:"pod_environment_configmap,omitempty"` + PodPriorityClassName string `json:"pod_priority_class_name,omitempty"` + MasterPodMoveTimeout Duration `json:"master_pod_move_timeout,omitempty"` + EnablePodAntiAffinity bool `json:"enable_pod_antiaffinity,omitempty"` + PodAntiAffinityTopologyKey string `json:"pod_antiaffinity_topology_key,omitempty"` + PodManagementPolicy string `json:"pod_management_policy,omitempty"` } // PostgresPodResourcesDefaults defines the spec of default resources @@ -152,6 +154,20 @@ type ScalyrConfiguration struct { ScalyrMemoryLimit string `json:"scalyr_memory_limit,omitempty"` } +// Defines default configuration for connection pool +type ConnectionPoolConfiguration struct { + NumberOfInstances *int32 `json:"connection_pool_number_of_instances,omitempty"` + Schema string `json:"connection_pool_schema,omitempty"` + User string `json:"connection_pool_user,omitempty"` + Image string `json:"connection_pool_image,omitempty"` + Mode string `json:"connection_pool_mode,omitempty"` + MaxDBConnections *int32 `json:"connection_pool_max_db_connections,omitempty"` + DefaultCPURequest string `json:"connection_pool_default_cpu_request,omitempty"` + DefaultMemoryRequest string `json:"connection_pool_default_memory_request,omitempty"` + DefaultCPULimit string `json:"connection_pool_default_cpu_limit,omitempty"` + DefaultMemoryLimit string `json:"connection_pool_default_memory_limit,omitempty"` +} + // OperatorLogicalBackupConfiguration defines configuration for logical backup type OperatorLogicalBackupConfiguration struct { Schedule string `json:"logical_backup_schedule,omitempty"` @@ -188,6 +204,7 @@ type OperatorConfigurationData struct { LoggingRESTAPI LoggingRESTAPIConfiguration `json:"logging_rest_api"` Scalyr ScalyrConfiguration `json:"scalyr"` LogicalBackup OperatorLogicalBackupConfiguration `json:"logical_backup"` + ConnectionPool ConnectionPoolConfiguration `json:"connection_pool"` } //Duration shortens this frequently used name diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index 862db6a4e..1784f8235 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -1,5 +1,7 @@ package v1 +// Postgres CRD definition, please use CamelCase for field names. + import ( "time" @@ -27,6 +29,9 @@ type PostgresSpec struct { Patroni `json:"patroni,omitempty"` Resources `json:"resources,omitempty"` + EnableConnectionPool *bool `json:"enableConnectionPool,omitempty"` + ConnectionPool *ConnectionPool `json:"connectionPool,omitempty"` + TeamID string `json:"teamId"` DockerImage string `json:"dockerImage,omitempty"` @@ -162,3 +167,24 @@ type UserFlags []string type PostgresStatus struct { PostgresClusterStatus string `json:"PostgresClusterStatus"` } + +// Options for connection pooler +// +// TODO: prepared snippets of configuration, one can choose via type, e.g. +// pgbouncer-large (with higher resources) or odyssey-small (with smaller +// resources) +// Type string `json:"type,omitempty"` +// +// TODO: figure out what other important parameters of the connection pool it +// makes sense to expose. E.g. pool size (min/max boundaries), max client +// connections etc. +type ConnectionPool struct { + NumberOfInstances *int32 `json:"numberOfInstances,omitempty"` + Schema string `json:"schema,omitempty"` + User string `json:"user,omitempty"` + Mode string `json:"mode,omitempty"` + DockerImage string `json:"dockerImage,omitempty"` + MaxDBConnections *int32 `json:"maxDBConnections,omitempty"` + + Resources `json:"resources,omitempty"` +} diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index 753b0490c..fcab394ca 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -68,6 +68,59 @@ func (in *CloneDescription) DeepCopy() *CloneDescription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPool) DeepCopyInto(out *ConnectionPool) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + out.Resources = in.Resources + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPool. +func (in *ConnectionPool) DeepCopy() *ConnectionPool { + if in == nil { + return nil + } + out := new(ConnectionPool) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ConnectionPoolConfiguration) DeepCopyInto(out *ConnectionPoolConfiguration) { + *out = *in + if in.NumberOfInstances != nil { + in, out := &in.NumberOfInstances, &out.NumberOfInstances + *out = new(int32) + **out = **in + } + if in.MaxDBConnections != nil { + in, out := &in.MaxDBConnections, &out.MaxDBConnections + *out = new(int32) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ConnectionPoolConfiguration. +func (in *ConnectionPoolConfiguration) DeepCopy() *ConnectionPoolConfiguration { + if in == nil { + return nil + } + out := new(ConnectionPoolConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KubernetesMetaConfiguration) DeepCopyInto(out *KubernetesMetaConfiguration) { *out = *in @@ -254,6 +307,7 @@ func (in *OperatorConfigurationData) DeepCopyInto(out *OperatorConfigurationData out.LoggingRESTAPI = in.LoggingRESTAPI out.Scalyr = in.Scalyr out.LogicalBackup = in.LogicalBackup + in.ConnectionPool.DeepCopyInto(&out.ConnectionPool) return } @@ -416,6 +470,16 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { out.Volume = in.Volume in.Patroni.DeepCopyInto(&out.Patroni) out.Resources = in.Resources + if in.EnableConnectionPool != nil { + in, out := &in.EnableConnectionPool, &out.EnableConnectionPool + *out = new(bool) + **out = **in + } + if in.ConnectionPool != nil { + in, out := &in.ConnectionPool, &out.ConnectionPool + *out = new(ConnectionPool) + (*in).DeepCopyInto(*out) + } if in.SpiloFSGroup != nil { in, out := &in.SpiloFSGroup, &out.SpiloFSGroup *out = new(int64) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d740260d2..dba67c142 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/r3labs/diff" "github.com/sirupsen/logrus" appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -48,11 +49,26 @@ type Config struct { PodServiceAccountRoleBinding *rbacv1.RoleBinding } +// K8S objects that are belongs to a connection pool +type ConnectionPoolObjects struct { + Deployment *appsv1.Deployment + Service *v1.Service + + // It could happen that a connection pool was enabled, but the operator was + // not able to properly process a corresponding event or was restarted. In + // this case we will miss missing/require situation and a lookup function + // will not be installed. To avoid synchronizing it all the time to prevent + // this, we can remember the result in memory at least until the next + // restart. + LookupFunction bool +} + type kubeResources struct { Services map[PostgresRole]*v1.Service Endpoints map[PostgresRole]*v1.Endpoints Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet + ConnectionPool *ConnectionPoolObjects PodDisruptionBudget *policybeta1.PodDisruptionBudget //Pods are treated separately //PVCs are treated separately @@ -184,7 +200,8 @@ func (c *Cluster) isNewCluster() bool { func (c *Cluster) initUsers() error { c.setProcessName("initializing users") - // clear our the previous state of the cluster users (in case we are running a sync). + // clear our the previous state of the cluster users (in case we are + // running a sync). c.systemUsers = map[string]spec.PgUser{} c.pgUsers = map[string]spec.PgUser{} @@ -292,8 +309,10 @@ func (c *Cluster) Create() error { } c.logger.Infof("pods are ready") - // create database objects unless we are running without pods or disabled that feature explicitly + // create database objects unless we are running without pods or disabled + // that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&c.Spec) <= 0 || c.Spec.StandbyCluster != nil) { + c.logger.Infof("Create roles") if err = c.createRoles(); err != nil { return fmt.Errorf("could not create users: %v", err) } @@ -316,6 +335,26 @@ func (c *Cluster) Create() error { c.logger.Errorf("could not list resources: %v", err) } + // Create connection pool deployment and services if necessary. Since we + // need to peform some operations with the database itself (e.g. install + // lookup function), do it as the last step, when everything is available. + // + // Do not consider connection pool as a strict requirement, and if + // something fails, report warning + if c.needConnectionPool() { + if c.ConnectionPool != nil { + c.logger.Warning("Connection pool already exists in the cluster") + return nil + } + connPool, err := c.createConnectionPool(c.installLookupFunction) + if err != nil { + c.logger.Warningf("could not create connection pool: %v", err) + return nil + } + c.logger.Infof("connection pool %q has been successfully created", + util.NameFromMeta(connPool.Deployment.ObjectMeta)) + } + return nil } @@ -571,7 +610,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - if !reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) { + // connection pool needs one system user created, which is done in + // initUsers. Check if it needs to be called. + sameUsers := reflect.DeepEqual(oldSpec.Spec.Users, newSpec.Spec.Users) + needConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + if !sameUsers || needConnPool { c.logger.Debugf("syncing secrets") if err := c.initUsers(); err != nil { c.logger.Errorf("could not init users: %v", err) @@ -695,6 +738,11 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err := c.syncConnectionPool(oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return nil } @@ -746,6 +794,12 @@ func (c *Cluster) Delete() { c.logger.Warningf("could not remove leftover patroni objects; %v", err) } + // Delete connection pool objects anyway, even if it's not mentioned in the + // manifest, just to not keep orphaned components in case if something went + // wrong + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } } //NeedsRepair returns true if the cluster should be included in the repair scan (based on its in-memory status). @@ -812,6 +866,35 @@ func (c *Cluster) initSystemUsers() { Name: c.OpConfig.ReplicationUsername, Password: util.RandomPassword(constants.PasswordLength), } + + // Connection pool user is an exception, if requested it's going to be + // created by operator as a normal pgUser + if c.needConnectionPool() { + // initialize empty connection pool if not done yet + if c.Spec.ConnectionPool == nil { + c.Spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + username := util.Coalesce( + c.Spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + // connection pooler application should be able to login with this role + connPoolUser := spec.PgUser{ + Origin: spec.RoleConnectionPool, + Name: username, + Flags: []string{constants.RoleFlagLogin}, + Password: util.RandomPassword(constants.PasswordLength), + } + + if _, exists := c.pgUsers[username]; !exists { + c.pgUsers[username] = connPoolUser + } + + if _, exists := c.systemUsers[constants.ConnectionPoolUserKeyName]; !exists { + c.systemUsers[constants.ConnectionPoolUserKeyName] = connPoolUser + } + } } func (c *Cluster) initRobotUsers() error { @@ -1138,3 +1221,119 @@ func (c *Cluster) deletePatroniClusterConfigMaps() error { return c.deleteClusterObject(get, deleteConfigMapFn, "configmap") } + +// Test if two connection pool configuration needs to be synced. For simplicity +// compare not the actual K8S objects, but the configuration itself and request +// sync if there is any difference. +func (c *Cluster) needSyncConnPoolSpecs(oldSpec, newSpec *acidv1.ConnectionPool) (sync bool, reasons []string) { + reasons = []string{} + sync = false + + changelog, err := diff.Diff(oldSpec, newSpec) + if err != nil { + c.logger.Infof("Cannot get diff, do not do anything, %+v", err) + return false, reasons + } + + if len(changelog) > 0 { + sync = true + } + + for _, change := range changelog { + msg := fmt.Sprintf("%s %+v from '%+v' to '%+v'", + change.Type, change.Path, change.From, change.To) + reasons = append(reasons, msg) + } + + return sync, reasons +} + +func syncResources(a, b *v1.ResourceRequirements) bool { + for _, res := range []v1.ResourceName{ + v1.ResourceCPU, + v1.ResourceMemory, + } { + if !a.Limits[res].Equal(b.Limits[res]) || + !a.Requests[res].Equal(b.Requests[res]) { + return true + } + } + + return false +} + +// Check if we need to synchronize connection pool deployment due to new +// defaults, that are different from what we see in the DeploymentSpec +func (c *Cluster) needSyncConnPoolDefaults( + spec *acidv1.ConnectionPool, + deployment *appsv1.Deployment) (sync bool, reasons []string) { + + reasons = []string{} + sync = false + + config := c.OpConfig.ConnectionPool + podTemplate := deployment.Spec.Template + poolContainer := podTemplate.Spec.Containers[constants.ConnPoolContainer] + + if spec == nil { + spec = &acidv1.ConnectionPool{} + } + + if spec.NumberOfInstances == nil && + *deployment.Spec.Replicas != *config.NumberOfInstances { + + sync = true + msg := fmt.Sprintf("NumberOfInstances is different (having %d, required %d)", + *deployment.Spec.Replicas, *config.NumberOfInstances) + reasons = append(reasons, msg) + } + + if spec.DockerImage == "" && + poolContainer.Image != config.Image { + + sync = true + msg := fmt.Sprintf("DockerImage is different (having %s, required %s)", + poolContainer.Image, config.Image) + reasons = append(reasons, msg) + } + + expectedResources, err := generateResourceRequirements(spec.Resources, + c.makeDefaultConnPoolResources()) + + // An error to generate expected resources means something is not quite + // right, but for the purpose of robustness do not panic here, just report + // and ignore resources comparison (in the worst case there will be no + // updates for new resource values). + if err == nil && syncResources(&poolContainer.Resources, expectedResources) { + sync = true + msg := fmt.Sprintf("Resources are different (having %+v, required %+v)", + poolContainer.Resources, expectedResources) + reasons = append(reasons, msg) + } + + if err != nil { + c.logger.Warningf("Cannot generate expected resources, %v", err) + } + + for _, env := range poolContainer.Env { + if spec.User == "" && env.Name == "PGUSER" { + ref := env.ValueFrom.SecretKeyRef.LocalObjectReference + + if ref.Name != c.credentialSecretName(config.User) { + sync = true + msg := fmt.Sprintf("Pool user is different (having %s, required %s)", + ref.Name, config.User) + reasons = append(reasons, msg) + } + } + + if spec.Schema == "" && env.Name == "PGSCHEMA" && env.Value != config.Schema { + sync = true + msg := fmt.Sprintf("Pool schema is different (having %s, required %s)", + env.Value, config.Schema) + reasons = append(reasons, msg) + } + } + + return sync, reasons +} diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go index 9efbc51c6..a1b361642 100644 --- a/pkg/cluster/cluster_test.go +++ b/pkg/cluster/cluster_test.go @@ -9,6 +9,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/spec" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" "github.com/zalando/postgres-operator/pkg/util/teams" v1 "k8s.io/api/core/v1" @@ -704,3 +705,20 @@ func TestServiceAnnotations(t *testing.T) { }) } } + +func TestInitSystemUsers(t *testing.T) { + testName := "Test system users initialization" + + // default cluster without connection pool + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; exist { + t.Errorf("%s, connection pool user is present", testName) + } + + // cluster with connection pool + cl.Spec.EnableConnectionPool = boolToPointer(true) + cl.initSystemUsers() + if _, exist := cl.systemUsers[constants.ConnectionPoolUserKeyName]; !exist { + t.Errorf("%s, connection pool user is not present", testName) + } +} diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 07ea011a6..bca68c188 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -1,10 +1,12 @@ package cluster import ( + "bytes" "database/sql" "fmt" "net" "strings" + "text/template" "time" "github.com/lib/pq" @@ -28,13 +30,37 @@ const ( getDatabasesSQL = `SELECT datname, pg_get_userbyid(datdba) AS owner FROM pg_database;` createDatabaseSQL = `CREATE DATABASE "%s" OWNER "%s";` alterDatabaseOwnerSQL = `ALTER DATABASE "%s" OWNER TO "%s";` + connectionPoolLookup = ` + CREATE SCHEMA IF NOT EXISTS {{.pool_schema}}; + + CREATE OR REPLACE FUNCTION {{.pool_schema}}.user_lookup( + in i_username text, out uname text, out phash text) + RETURNS record AS $$ + BEGIN + SELECT usename, passwd FROM pg_catalog.pg_shadow + WHERE usename = i_username INTO uname, phash; + RETURN; + END; + $$ LANGUAGE plpgsql SECURITY DEFINER; + + REVOKE ALL ON FUNCTION {{.pool_schema}}.user_lookup(text) + FROM public, {{.pool_user}}; + GRANT EXECUTE ON FUNCTION {{.pool_schema}}.user_lookup(text) + TO {{.pool_user}}; + GRANT USAGE ON SCHEMA {{.pool_schema}} TO {{.pool_user}}; + ` ) -func (c *Cluster) pgConnectionString() string { +func (c *Cluster) pgConnectionString(dbname string) string { password := c.systemUsers[constants.SuperuserKeyName].Password - return fmt.Sprintf("host='%s' dbname=postgres sslmode=require user='%s' password='%s' connect_timeout='%d'", + if dbname == "" { + dbname = "postgres" + } + + return fmt.Sprintf("host='%s' dbname='%s' sslmode=require user='%s' password='%s' connect_timeout='%d'", fmt.Sprintf("%s.%s.svc.%s", c.Name, c.Namespace, c.OpConfig.ClusterDomain), + dbname, c.systemUsers[constants.SuperuserKeyName].Name, strings.Replace(password, "$", "\\$", -1), constants.PostgresConnectTimeout/time.Second) @@ -49,13 +75,17 @@ func (c *Cluster) databaseAccessDisabled() bool { } func (c *Cluster) initDbConn() error { + return c.initDbConnWithName("") +} + +func (c *Cluster) initDbConnWithName(dbname string) error { c.setProcessName("initializing db connection") if c.pgDb != nil { return nil } var conn *sql.DB - connstring := c.pgConnectionString() + connstring := c.pgConnectionString(dbname) finalerr := retryutil.Retry(constants.PostgresConnectTimeout, constants.PostgresConnectRetryTimeout, func() (bool, error) { @@ -94,6 +124,10 @@ func (c *Cluster) initDbConn() error { return nil } +func (c *Cluster) connectionIsClosed() bool { + return c.pgDb == nil +} + func (c *Cluster) closeDbConn() (err error) { c.setProcessName("closing db connection") if c.pgDb != nil { @@ -243,3 +277,88 @@ func makeUserFlags(rolsuper, rolinherit, rolcreaterole, rolcreatedb, rolcanlogin return result } + +// Creates a connection pool credentials lookup function in every database to +// perform remote authentification. +func (c *Cluster) installLookupFunction(poolSchema, poolUser string) error { + var stmtBytes bytes.Buffer + c.logger.Info("Installing lookup function") + + if err := c.initDbConn(); err != nil { + return fmt.Errorf("could not init database connection") + } + defer func() { + if c.connectionIsClosed() { + return + } + + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + currentDatabases, err := c.getDatabases() + if err != nil { + msg := "could not get databases to install pool lookup function: %v" + return fmt.Errorf(msg, err) + } + + templater := template.Must(template.New("sql").Parse(connectionPoolLookup)) + + for dbname, _ := range currentDatabases { + if dbname == "template0" || dbname == "template1" { + continue + } + + if err := c.initDbConnWithName(dbname); err != nil { + return fmt.Errorf("could not init database connection to %s", dbname) + } + + c.logger.Infof("Install pool lookup function into %s", dbname) + + params := TemplateParams{ + "pool_schema": poolSchema, + "pool_user": poolUser, + } + + if err := templater.Execute(&stmtBytes, params); err != nil { + c.logger.Errorf("could not prepare sql statement %+v: %v", + params, err) + // process other databases + continue + } + + // golang sql will do retries couple of times if pq driver reports + // connections issues (driver.ErrBadConn), but since our query is + // idempotent, we can retry in a view of other errors (e.g. due to + // failover a db is temporary in a read-only mode or so) to make sure + // it was applied. + execErr := retryutil.Retry( + constants.PostgresConnectTimeout, + constants.PostgresConnectRetryTimeout, + func() (bool, error) { + if _, err := c.pgDb.Exec(stmtBytes.String()); err != nil { + msg := fmt.Errorf("could not execute sql statement %s: %v", + stmtBytes.String(), err) + return false, msg + } + + return true, nil + }) + + if execErr != nil { + c.logger.Errorf("could not execute after retries %s: %v", + stmtBytes.String(), err) + // process other databases + continue + } + + c.logger.Infof("Pool lookup function installed into %s", dbname) + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + } + + c.ConnectionPool.LookupFunction = true + return nil +} diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index aaa27384a..800429930 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -21,6 +21,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" batchv1 "k8s.io/api/batch/v1" batchv1beta1 "k8s.io/api/batch/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -31,10 +32,12 @@ const ( patroniPGBinariesParameterName = "bin_dir" patroniPGParametersParameterName = "parameters" patroniPGHBAConfParameterName = "pg_hba" + localHost = "127.0.0.1/32" + connectionPoolContainer = "connection-pool" + pgPort = 5432 // the gid of the postgres user in the default spilo image spiloPostgresGID = 103 - localHost = "127.0.0.1/32" ) type pgUser struct { @@ -70,6 +73,10 @@ func (c *Cluster) statefulSetName() string { return c.Name } +func (c *Cluster) connPoolName() string { + return c.Name + "-pooler" +} + func (c *Cluster) endpointName(role PostgresRole) string { name := c.Name if role == Replica { @@ -88,6 +95,28 @@ func (c *Cluster) serviceName(role PostgresRole) string { return name } +func (c *Cluster) serviceAddress(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return service.ObjectMeta.Name + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + +func (c *Cluster) servicePort(role PostgresRole) string { + service, exist := c.Services[role] + + if exist { + return fmt.Sprint(service.Spec.Ports[0].Port) + } + + c.logger.Warningf("No service for role %s", role) + return "" +} + func (c *Cluster) podDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -96,10 +125,39 @@ func (c *Cluster) makeDefaultResources() acidv1.Resources { config := c.OpConfig - defaultRequests := acidv1.ResourceDescription{CPU: config.DefaultCPURequest, Memory: config.DefaultMemoryRequest} - defaultLimits := acidv1.ResourceDescription{CPU: config.DefaultCPULimit, Memory: config.DefaultMemoryLimit} + defaultRequests := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPURequest, + Memory: config.Resources.DefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.Resources.DefaultCPULimit, + Memory: config.Resources.DefaultMemoryLimit, + } - return acidv1.Resources{ResourceRequests: defaultRequests, ResourceLimits: defaultLimits} + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } +} + +// Generate default resource section for connection pool deployment, to be used +// if nothing custom is specified in the manifest +func (c *Cluster) makeDefaultConnPoolResources() acidv1.Resources { + config := c.OpConfig + + defaultRequests := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPURequest, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryRequest, + } + defaultLimits := acidv1.ResourceDescription{ + CPU: config.ConnectionPool.ConnPoolDefaultCPULimit, + Memory: config.ConnectionPool.ConnPoolDefaultMemoryLimit, + } + + return acidv1.Resources{ + ResourceRequests: defaultRequests, + ResourceLimits: defaultLimits, + } } func generateResourceRequirements(resources acidv1.Resources, defaultResources acidv1.Resources) (*v1.ResourceRequirements, error) { @@ -319,7 +377,11 @@ func tolerations(tolerationsSpec *[]v1.Toleration, podToleration map[string]stri return *tolerationsSpec } - if len(podToleration["key"]) > 0 || len(podToleration["operator"]) > 0 || len(podToleration["value"]) > 0 || len(podToleration["effect"]) > 0 { + if len(podToleration["key"]) > 0 || + len(podToleration["operator"]) > 0 || + len(podToleration["value"]) > 0 || + len(podToleration["effect"]) > 0 { + return []v1.Toleration{ { Key: podToleration["key"], @@ -784,12 +846,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef request := spec.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } limit := spec.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(request, limit) @@ -811,12 +873,12 @@ func (c *Cluster) generateStatefulSet(spec *acidv1.PostgresSpec) (*appsv1.Statef // TODO #413 sidecarRequest := sidecar.Resources.ResourceRequests.Memory if request == "" { - request = c.OpConfig.DefaultMemoryRequest + request = c.OpConfig.Resources.DefaultMemoryRequest } sidecarLimit := sidecar.Resources.ResourceLimits.Memory if limit == "" { - limit = c.OpConfig.DefaultMemoryLimit + limit = c.OpConfig.Resources.DefaultMemoryLimit } isSmaller, err := util.IsSmallerQuantity(sidecarRequest, sidecarLimit) @@ -1772,6 +1834,306 @@ func (c *Cluster) getLogicalBackupJobName() (jobName string) { return "logical-backup-" + c.clusterName().Name } +// Generate pool size related environment variables. +// +// MAX_DB_CONN would specify the global maximum for connections to a target +// database. +// +// MAX_CLIENT_CONN is not configurable at the moment, just set it high enough. +// +// DEFAULT_SIZE is a pool size per db/user (having in mind the use case when +// most of the queries coming through a connection pooler are from the same +// user to the same db). In case if we want to spin up more connection pool +// instances, take this into account and maintain the same number of +// connections. +// +// MIN_SIZE is a pool minimal size, to prevent situation when sudden workload +// have to wait for spinning up a new connections. +// +// RESERVE_SIZE is how many additional connections to allow for a pool. +func (c *Cluster) getConnPoolEnvVars(spec *acidv1.PostgresSpec) []v1.EnvVar { + effectiveMode := util.Coalesce( + spec.ConnectionPool.Mode, + c.OpConfig.ConnectionPool.Mode) + + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + effectiveMaxDBConn := util.CoalesceInt32( + spec.ConnectionPool.MaxDBConnections, + c.OpConfig.ConnectionPool.MaxDBConnections) + + if effectiveMaxDBConn == nil { + effectiveMaxDBConn = k8sutil.Int32ToPointer( + constants.ConnPoolMaxDBConnections) + } + + maxDBConn := *effectiveMaxDBConn / *numberOfInstances + + defaultSize := maxDBConn / 2 + minSize := defaultSize / 2 + reserveSize := minSize + + return []v1.EnvVar{ + { + Name: "CONNECTION_POOL_PORT", + Value: fmt.Sprint(pgPort), + }, + { + Name: "CONNECTION_POOL_MODE", + Value: effectiveMode, + }, + { + Name: "CONNECTION_POOL_DEFAULT_SIZE", + Value: fmt.Sprint(defaultSize), + }, + { + Name: "CONNECTION_POOL_MIN_SIZE", + Value: fmt.Sprint(minSize), + }, + { + Name: "CONNECTION_POOL_RESERVE_SIZE", + Value: fmt.Sprint(reserveSize), + }, + { + Name: "CONNECTION_POOL_MAX_CLIENT_CONN", + Value: fmt.Sprint(constants.ConnPoolMaxClientConnections), + }, + { + Name: "CONNECTION_POOL_MAX_DB_CONN", + Value: fmt.Sprint(maxDBConn), + }, + } +} + +func (c *Cluster) generateConnPoolPodTemplate(spec *acidv1.PostgresSpec) ( + *v1.PodTemplateSpec, error) { + + gracePeriod := int64(c.OpConfig.PodTerminateGracePeriod.Seconds()) + resources, err := generateResourceRequirements( + spec.ConnectionPool.Resources, + c.makeDefaultConnPoolResources()) + + effectiveDockerImage := util.Coalesce( + spec.ConnectionPool.DockerImage, + c.OpConfig.ConnectionPool.Image) + + effectiveSchema := util.Coalesce( + spec.ConnectionPool.Schema, + c.OpConfig.ConnectionPool.Schema) + + if err != nil { + return nil, fmt.Errorf("could not generate resource requirements: %v", err) + } + + secretSelector := func(key string) *v1.SecretKeySelector { + effectiveUser := util.Coalesce( + spec.ConnectionPool.User, + c.OpConfig.ConnectionPool.User) + + return &v1.SecretKeySelector{ + LocalObjectReference: v1.LocalObjectReference{ + Name: c.credentialSecretName(effectiveUser), + }, + Key: key, + } + } + + envVars := []v1.EnvVar{ + { + Name: "PGHOST", + Value: c.serviceAddress(Master), + }, + { + Name: "PGPORT", + Value: c.servicePort(Master), + }, + { + Name: "PGUSER", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("username"), + }, + }, + // the convention is to use the same schema name as + // connection pool username + { + Name: "PGSCHEMA", + Value: effectiveSchema, + }, + { + Name: "PGPASSWORD", + ValueFrom: &v1.EnvVarSource{ + SecretKeyRef: secretSelector("password"), + }, + }, + } + + envVars = append(envVars, c.getConnPoolEnvVars(spec)...) + + poolerContainer := v1.Container{ + Name: connectionPoolContainer, + Image: effectiveDockerImage, + ImagePullPolicy: v1.PullIfNotPresent, + Resources: *resources, + Ports: []v1.ContainerPort{ + { + ContainerPort: pgPort, + Protocol: v1.ProtocolTCP, + }, + }, + Env: envVars, + } + + podTemplate := &v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: c.connPoolLabelsSelector().MatchLabels, + Namespace: c.Namespace, + Annotations: c.generatePodAnnotations(spec), + }, + Spec: v1.PodSpec{ + ServiceAccountName: c.OpConfig.PodServiceAccountName, + TerminationGracePeriodSeconds: &gracePeriod, + Containers: []v1.Container{poolerContainer}, + // TODO: add tolerations to scheduler pooler on the same node + // as database + //Tolerations: *tolerationsSpec, + }, + } + + return podTemplate, nil +} + +// Return an array of ownerReferences to make an arbitraty object dependent on +// the StatefulSet. Dependency is made on StatefulSet instead of PostgreSQL CRD +// while the former is represent the actual state, and only it's deletion means +// we delete the cluster (e.g. if CRD was deleted, StatefulSet somehow +// survived, we can't delete an object because it will affect the functioning +// cluster). +func (c *Cluster) ownerReferences() []metav1.OwnerReference { + controller := true + + if c.Statefulset == nil { + c.logger.Warning("Cannot get owner reference, no statefulset") + return []metav1.OwnerReference{} + } + + return []metav1.OwnerReference{ + { + UID: c.Statefulset.ObjectMeta.UID, + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: c.Statefulset.ObjectMeta.Name, + Controller: &controller, + }, + } +} + +func (c *Cluster) generateConnPoolDeployment(spec *acidv1.PostgresSpec) ( + *appsv1.Deployment, error) { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + podTemplate, err := c.generateConnPoolPodTemplate(spec) + numberOfInstances := spec.ConnectionPool.NumberOfInstances + if numberOfInstances == nil { + numberOfInstances = util.CoalesceInt32( + c.OpConfig.ConnectionPool.NumberOfInstances, + k8sutil.Int32ToPointer(1)) + } + + if *numberOfInstances < constants.ConnPoolMinInstances { + msg := "Adjusted number of connection pool instances from %d to %d" + c.logger.Warningf(msg, numberOfInstances, constants.ConnPoolMinInstances) + + *numberOfInstances = constants.ConnPoolMinInstances + } + + if err != nil { + return nil, err + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this deployment, but there is a hope that this object + // will be garbage collected if something went wrong and operator + // didn't deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: appsv1.DeploymentSpec{ + Replicas: numberOfInstances, + Selector: c.connPoolLabelsSelector(), + Template: *podTemplate, + }, + } + + return deployment, nil +} + +func (c *Cluster) generateConnPoolService(spec *acidv1.PostgresSpec) *v1.Service { + + // there are two ways to enable connection pooler, either to specify a + // connectionPool section or enableConnectionPool. In the second case + // spec.connectionPool will be nil, so to make it easier to calculate + // default values, initialize it to an empty structure. It could be done + // anywhere, but here is the earliest common entry point between sync and + // create code, so init here. + if spec.ConnectionPool == nil { + spec.ConnectionPool = &acidv1.ConnectionPool{} + } + + serviceSpec := v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: c.connPoolName(), + Port: pgPort, + TargetPort: intstr.IntOrString{StrVal: c.servicePort(Master)}, + }, + }, + Type: v1.ServiceTypeClusterIP, + Selector: map[string]string{ + "connection-pool": c.connPoolName(), + }, + } + + service := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.connPoolName(), + Namespace: c.Namespace, + Labels: c.connPoolLabelsSelector().MatchLabels, + Annotations: map[string]string{}, + // make StatefulSet object its owner to represent the dependency. + // By itself StatefulSet is being deleted with "Orphaned" + // propagation policy, which means that it's deletion will not + // clean up this service, but there is a hope that this object will + // be garbage collected if something went wrong and operator didn't + // deleted it. + OwnerReferences: c.ownerReferences(), + }, + Spec: serviceSpec, + } + + return service +} + func ensurePath(file string, defaultDir string, defaultFile string) string { if file == "" { return path.Join(defaultDir, defaultFile) diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 25e0f7af4..e04b281ba 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -1,6 +1,8 @@ package cluster import ( + "errors" + "fmt" "reflect" "testing" @@ -13,6 +15,7 @@ import ( "github.com/zalando/postgres-operator/pkg/util/constants" "github.com/zalando/postgres-operator/pkg/util/k8sutil" + appsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -582,6 +585,375 @@ func TestSecretVolume(t *testing.T) { } } +func testResources(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + cpuReq := podSpec.Spec.Containers[0].Resources.Requests["cpu"] + if cpuReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest { + return fmt.Errorf("CPU request doesn't match, got %s, expected %s", + cpuReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPURequest) + } + + memReq := podSpec.Spec.Containers[0].Resources.Requests["memory"] + if memReq.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest { + return fmt.Errorf("Memory request doesn't match, got %s, expected %s", + memReq.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryRequest) + } + + cpuLim := podSpec.Spec.Containers[0].Resources.Limits["cpu"] + if cpuLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit { + return fmt.Errorf("CPU limit doesn't match, got %s, expected %s", + cpuLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultCPULimit) + } + + memLim := podSpec.Spec.Containers[0].Resources.Limits["memory"] + if memLim.String() != cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit { + return fmt.Errorf("Memory limit doesn't match, got %s, expected %s", + memLim.String(), cluster.OpConfig.ConnectionPool.ConnPoolDefaultMemoryLimit) + } + + return nil +} + +func testLabels(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + poolLabels := podSpec.ObjectMeta.Labels["connection-pool"] + + if poolLabels != cluster.connPoolLabelsSelector().MatchLabels["connection-pool"] { + return fmt.Errorf("Pod labels do not match, got %+v, expected %+v", + podSpec.ObjectMeta.Labels, cluster.connPoolLabelsSelector().MatchLabels) + } + + return nil +} + +func testEnvs(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + required := map[string]bool{ + "PGHOST": false, + "PGPORT": false, + "PGUSER": false, + "PGSCHEMA": false, + "PGPASSWORD": false, + "CONNECTION_POOL_MODE": false, + "CONNECTION_POOL_PORT": false, + } + + envs := podSpec.Spec.Containers[0].Env + for _, env := range envs { + required[env.Name] = true + } + + for env, value := range required { + if !value { + return fmt.Errorf("Environment variable %s is not present", env) + } + } + + return nil +} + +func testCustomPodTemplate(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { + if podSpec.ObjectMeta.Name != "test-pod-template" { + return fmt.Errorf("Custom pod template is not used, current spec %+v", + podSpec) + } + + return nil +} + +func TestConnPoolPodSpec(t *testing.T) { + testName := "Test connection pool pod template generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + MaxDBConnections: int32ToPointer(60), + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + var clusterNoDefaultRes = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{}, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + noCheck := func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error { return nil } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, podSpec *v1.PodTemplateSpec) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "no default resources", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: errors.New(`could not generate resource requirements: could not fill resource requests: could not parse default CPU quantity: quantities must match the regular expression '^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$'`), + cluster: clusterNoDefaultRes, + check: noCheck, + }, + { + subTest: "default resources are set", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testResources, + }, + { + subTest: "labels for service", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testLabels, + }, + { + subTest: "required envs", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testEnvs, + }, + } + for _, tt := range tests { + podSpec, err := tt.cluster.generateConnPoolPodTemplate(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate pod template,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, podSpec) + if err != nil { + t.Errorf("%s [%s]: Pod spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testDeploymentOwnwerReference(cluster *Cluster, deployment *appsv1.Deployment) error { + owner := deployment.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testSelector(cluster *Cluster, deployment *appsv1.Deployment) error { + labels := deployment.Spec.Selector.MatchLabels + expected := cluster.connPoolLabelsSelector().MatchLabels + + if labels["connection-pool"] != expected["connection-pool"] { + return fmt.Errorf("Labels are incorrect, got %+v, expected %+v", + labels, expected) + } + + return nil +} + +func TestConnPoolDeploymentSpec(t *testing.T) { + testName := "Test connection pool deployment spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *appsv1.Deployment) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + expected error + cluster *Cluster + check func(cluster *Cluster, deployment *appsv1.Deployment) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testDeploymentOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + expected: nil, + cluster: cluster, + check: testSelector, + }, + } + for _, tt := range tests { + deployment, err := tt.cluster.generateConnPoolDeployment(tt.spec) + + if err != tt.expected && err.Error() != tt.expected.Error() { + t.Errorf("%s [%s]: Could not generate deployment spec,\n %+v, expected\n %+v", + testName, tt.subTest, err, tt.expected) + } + + err = tt.check(cluster, deployment) + if err != nil { + t.Errorf("%s [%s]: Deployment spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + +func testServiceOwnwerReference(cluster *Cluster, service *v1.Service) error { + owner := service.ObjectMeta.OwnerReferences[0] + + if owner.Name != cluster.Statefulset.ObjectMeta.Name { + return fmt.Errorf("Ownere reference is incorrect, got %s, expected %s", + owner.Name, cluster.Statefulset.ObjectMeta.Name) + } + + return nil +} + +func testServiceSelector(cluster *Cluster, service *v1.Service) error { + selector := service.Spec.Selector + + if selector["connection-pool"] != cluster.connPoolName() { + return fmt.Errorf("Selector is incorrect, got %s, expected %s", + selector["connection-pool"], cluster.connPoolName()) + } + + return nil +} + +func TestConnPoolServiceSpec(t *testing.T) { + testName := "Test connection pool service spec generation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + noCheck := func(cluster *Cluster, deployment *v1.Service) error { + return nil + } + + tests := []struct { + subTest string + spec *acidv1.PostgresSpec + cluster *Cluster + check func(cluster *Cluster, deployment *v1.Service) error + }{ + { + subTest: "default configuration", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: noCheck, + }, + { + subTest: "owner reference", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceOwnwerReference, + }, + { + subTest: "selector", + spec: &acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + cluster: cluster, + check: testServiceSelector, + }, + } + for _, tt := range tests { + service := tt.cluster.generateConnPoolService(tt.spec) + + if err := tt.check(cluster, service); err != nil { + t.Errorf("%s [%s]: Service spec is incorrect, %+v", + testName, tt.subTest, err) + } + } +} + func TestTLS(t *testing.T) { var err error var spec acidv1.PostgresSpec diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index d6c2149bf..2e02a9a83 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -90,6 +90,132 @@ func (c *Cluster) createStatefulSet() (*appsv1.StatefulSet, error) { return statefulSet, nil } +// Prepare the database for connection pool to be used, i.e. install lookup +// function (do it first, because it should be fast and if it didn't succeed, +// it doesn't makes sense to create more K8S objects. At this moment we assume +// that necessary connection pool user exists. +// +// After that create all the objects for connection pool, namely a deployment +// with a chosen pooler and a service to expose it. +func (c *Cluster) createConnectionPool(lookup InstallFunction) (*ConnectionPoolObjects, error) { + var msg string + c.setProcessName("creating connection pool") + + schema := c.Spec.ConnectionPool.Schema + if schema == "" { + schema = c.OpConfig.ConnectionPool.Schema + } + + user := c.Spec.ConnectionPool.User + if user == "" { + user = c.OpConfig.ConnectionPool.User + } + + err := lookup(schema, user) + + if err != nil { + msg = "could not prepare database for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + deploymentSpec, err := c.generateConnPoolDeployment(&c.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return nil, fmt.Errorf(msg, err) + } + + // client-go does retry 10 times (with NoBackoff by default) when the API + // believe a request can be retried and returns Retry-After header. This + // should be good enough to not think about it here. + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return nil, err + } + + serviceSpec := c.generateConnPoolService(&c.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return nil, err + } + + c.ConnectionPool = &ConnectionPoolObjects{ + Deployment: deployment, + Service: service, + } + c.logger.Debugf("created new connection pool %q, uid: %q", + util.NameFromMeta(deployment.ObjectMeta), deployment.UID) + + return c.ConnectionPool, nil +} + +func (c *Cluster) deleteConnectionPool() (err error) { + c.setProcessName("deleting connection pool") + c.logger.Debugln("deleting connection pool") + + // Lack of connection pooler objects is not a fatal error, just log it if + // it was present before in the manifest + if c.ConnectionPool == nil { + c.logger.Infof("No connection pool to delete") + return nil + } + + // Clean up the deployment object. If deployment resource we've remembered + // is somehow empty, try to delete based on what would we generate + deploymentName := c.connPoolName() + deployment := c.ConnectionPool.Deployment + + if deployment != nil { + deploymentName = deployment.Name + } + + // set delete propagation policy to foreground, so that replica set will be + // also deleted. + policy := metav1.DeletePropagationForeground + options := metav1.DeleteOptions{PropagationPolicy: &policy} + err = c.KubeClient. + Deployments(c.Namespace). + Delete(deploymentName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool deployment was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete deployment: %v", err) + } + + c.logger.Infof("Connection pool deployment %q has been deleted", deploymentName) + + // Repeat the same for the service object + service := c.ConnectionPool.Service + serviceName := c.connPoolName() + + if service != nil { + serviceName = service.Name + } + + // set delete propagation policy to foreground, so that all the dependant + // will be deleted. + err = c.KubeClient. + Services(c.Namespace). + Delete(serviceName, &options) + + if !k8sutil.ResourceNotFound(err) { + c.logger.Debugf("Connection pool service was already deleted") + } else if err != nil { + return fmt.Errorf("could not delete service: %v", err) + } + + c.logger.Infof("Connection pool service %q has been deleted", serviceName) + + c.ConnectionPool = nil + return nil +} + func getPodIndex(podName string) (int32, error) { parts := strings.Split(podName, "-") if len(parts) == 0 { @@ -674,3 +800,34 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetPodDisruptionBudget() *policybeta1.PodDisruptionBudget { return c.PodDisruptionBudget } + +// Perform actual patching of a connection pool deployment, assuming that all +// the check were already done before. +func (c *Cluster) updateConnPoolDeployment(oldDeploymentSpec, newDeployment *appsv1.Deployment) (*appsv1.Deployment, error) { + c.setProcessName("updating connection pool") + if c.ConnectionPool == nil || c.ConnectionPool.Deployment == nil { + return nil, fmt.Errorf("there is no connection pool in the cluster") + } + + patchData, err := specPatch(newDeployment.Spec) + if err != nil { + return nil, fmt.Errorf("could not form patch for the deployment: %v", err) + } + + // An update probably requires RetryOnConflict, but since only one operator + // worker at one time will try to update it chances of conflicts are + // minimal. + deployment, err := c.KubeClient. + Deployments(c.ConnectionPool.Deployment.Namespace). + Patch( + c.ConnectionPool.Deployment.Name, + types.MergePatchType, + patchData, "") + if err != nil { + return nil, fmt.Errorf("could not patch deployment: %v", err) + } + + c.ConnectionPool.Deployment = deployment + + return deployment, nil +} diff --git a/pkg/cluster/resources_test.go b/pkg/cluster/resources_test.go new file mode 100644 index 000000000..f06e96e65 --- /dev/null +++ b/pkg/cluster/resources_test.go @@ -0,0 +1,127 @@ +package cluster + +import ( + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func mockInstallLookupFunction(schema string, user string) error { + return nil +} + +func boolToPointer(value bool) *bool { + return &value +} + +func TestConnPoolCreationAndDeletion(t *testing.T) { + testName := "Test connection pool creation" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + poolResources, err := cluster.createConnectionPool(mockInstallLookupFunction) + + if err != nil { + t.Errorf("%s: Cannot create connection pool, %s, %+v", + testName, err, poolResources) + } + + if poolResources.Deployment == nil { + t.Errorf("%s: Connection pool deployment is empty", testName) + } + + if poolResources.Service == nil { + t.Errorf("%s: Connection pool service is empty", testName) + } + + err = cluster.deleteConnectionPool() + if err != nil { + t.Errorf("%s: Cannot delete connection pool, %s", testName, err) + } +} + +func TestNeedConnPool(t *testing.T) { + testName := "Test how connection pool can be enabled" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + }, + }, + }, k8sutil.NewMockKubernetesClient(), acidv1.Postgresql{}, logger) + + cluster.Spec = acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with full definition", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(false), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is still enabled with flag being false", + testName) + } + + cluster.Spec = acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + ConnectionPool: &acidv1.ConnectionPool{}, + } + + if !cluster.needConnectionPool() { + t.Errorf("%s: Connection pool is not enabled with flag and full", + testName) + } +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index b04ff863b..a7c933ae7 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -23,6 +23,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { c.mu.Lock() defer c.mu.Unlock() + oldSpec := c.Postgresql c.setSpec(newSpec) defer func() { @@ -108,6 +109,11 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } } + // sync connection pool + if err = c.syncConnectionPool(&oldSpec, newSpec, c.installLookupFunction); err != nil { + return fmt.Errorf("could not sync connection pool: %v", err) + } + return err } @@ -424,7 +430,9 @@ func (c *Cluster) syncSecrets() error { } pwdUser := userMap[secretUsername] // if this secret belongs to the infrastructure role and the password has changed - replace it in the secret - if pwdUser.Password != string(secret.Data["password"]) && pwdUser.Origin == spec.RoleOriginInfrastructure { + if pwdUser.Password != string(secret.Data["password"]) && + pwdUser.Origin == spec.RoleOriginInfrastructure { + c.logger.Debugf("updating the secret %q from the infrastructure roles", secretSpec.Name) if _, err = c.KubeClient.Secrets(secretSpec.Namespace).Update(secretSpec); err != nil { return fmt.Errorf("could not update infrastructure role secret for role %q: %v", secretUsername, err) @@ -468,6 +476,16 @@ func (c *Cluster) syncRoles() (err error) { for _, u := range c.pgUsers { userNames = append(userNames, u.Name) } + + if c.needConnectionPool() { + connPoolUser := c.systemUsers[constants.ConnectionPoolUserKeyName] + userNames = append(userNames, connPoolUser.Name) + + if _, exists := c.pgUsers[connPoolUser.Name]; !exists { + c.pgUsers[connPoolUser.Name] = connPoolUser + } + } + dbUsers, err = c.readPgUsersFromDatabase(userNames) if err != nil { return fmt.Errorf("error getting users from the database: %v", err) @@ -600,3 +618,165 @@ func (c *Cluster) syncLogicalBackupJob() error { return nil } + +func (c *Cluster) syncConnectionPool(oldSpec, newSpec *acidv1.Postgresql, lookup InstallFunction) error { + if c.ConnectionPool == nil { + c.ConnectionPool = &ConnectionPoolObjects{} + } + + newNeedConnPool := c.needConnectionPoolWorker(&newSpec.Spec) + oldNeedConnPool := c.needConnectionPoolWorker(&oldSpec.Spec) + + if newNeedConnPool { + // Try to sync in any case. If we didn't needed connection pool before, + // it means we want to create it. If it was already present, still sync + // since it could happen that there is no difference in specs, and all + // the resources are remembered, but the deployment was manualy deleted + // in between + c.logger.Debug("syncing connection pool") + + // in this case also do not forget to install lookup function as for + // creating cluster + if !oldNeedConnPool || !c.ConnectionPool.LookupFunction { + newConnPool := newSpec.Spec.ConnectionPool + + specSchema := "" + specUser := "" + + if newConnPool != nil { + specSchema = newConnPool.Schema + specUser = newConnPool.User + } + + schema := util.Coalesce( + specSchema, + c.OpConfig.ConnectionPool.Schema) + + user := util.Coalesce( + specUser, + c.OpConfig.ConnectionPool.User) + + if err := lookup(schema, user); err != nil { + return err + } + } + + if err := c.syncConnectionPoolWorker(oldSpec, newSpec); err != nil { + c.logger.Errorf("could not sync connection pool: %v", err) + return err + } + } + + if oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + + if !oldNeedConnPool && !newNeedConnPool { + // delete and cleanup resources if not empty + if c.ConnectionPool != nil && + (c.ConnectionPool.Deployment != nil || + c.ConnectionPool.Service != nil) { + + if err := c.deleteConnectionPool(); err != nil { + c.logger.Warningf("could not remove connection pool: %v", err) + } + } + } + + return nil +} + +// Synchronize connection pool resources. Effectively we're interested only in +// synchronizing the corresponding deployment, but in case of deployment or +// service is missing, create it. After checking, also remember an object for +// the future references. +func (c *Cluster) syncConnectionPoolWorker(oldSpec, newSpec *acidv1.Postgresql) error { + deployment, err := c.KubeClient. + Deployments(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Deployment %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + deploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg = "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + deployment, err := c.KubeClient. + Deployments(deploymentSpec.Namespace). + Create(deploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + } else if err != nil { + return fmt.Errorf("could not get connection pool deployment to sync: %v", err) + } else { + c.ConnectionPool.Deployment = deployment + + // actual synchronization + oldConnPool := oldSpec.Spec.ConnectionPool + newConnPool := newSpec.Spec.ConnectionPool + specSync, specReason := c.needSyncConnPoolSpecs(oldConnPool, newConnPool) + defaultsSync, defaultsReason := c.needSyncConnPoolDefaults(newConnPool, deployment) + reason := append(specReason, defaultsReason...) + if specSync || defaultsSync { + c.logger.Infof("Update connection pool deployment %s, reason: %+v", + c.connPoolName(), reason) + + newDeploymentSpec, err := c.generateConnPoolDeployment(&newSpec.Spec) + if err != nil { + msg := "could not generate deployment for connection pool: %v" + return fmt.Errorf(msg, err) + } + + oldDeploymentSpec := c.ConnectionPool.Deployment + + deployment, err := c.updateConnPoolDeployment( + oldDeploymentSpec, + newDeploymentSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Deployment = deployment + return nil + } + } + + service, err := c.KubeClient. + Services(c.Namespace). + Get(c.connPoolName(), metav1.GetOptions{}) + + if err != nil && k8sutil.ResourceNotFound(err) { + msg := "Service %s for connection pool synchronization is not found, create it" + c.logger.Warningf(msg, c.connPoolName()) + + serviceSpec := c.generateConnPoolService(&newSpec.Spec) + service, err := c.KubeClient. + Services(serviceSpec.Namespace). + Create(serviceSpec) + + if err != nil { + return err + } + + c.ConnectionPool.Service = service + } else if err != nil { + return fmt.Errorf("could not get connection pool service to sync: %v", err) + } else { + // Service updates are not supported and probably not that useful anyway + c.ConnectionPool.Service = service + } + + return nil +} diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go new file mode 100644 index 000000000..483d4ba58 --- /dev/null +++ b/pkg/cluster/sync_test.go @@ -0,0 +1,212 @@ +package cluster + +import ( + "fmt" + "testing" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func int32ToPointer(value int32) *int32 { + return &value +} + +func deploymentUpdated(cluster *Cluster, err error) error { + if cluster.ConnectionPool.Deployment.Spec.Replicas == nil || + *cluster.ConnectionPool.Deployment.Spec.Replicas != 2 { + return fmt.Errorf("Wrong nubmer of instances") + } + + return nil +} + +func objectsAreSaved(cluster *Cluster, err error) error { + if cluster.ConnectionPool == nil { + return fmt.Errorf("Connection pool resources are empty") + } + + if cluster.ConnectionPool.Deployment == nil { + return fmt.Errorf("Deployment was not saved") + } + + if cluster.ConnectionPool.Service == nil { + return fmt.Errorf("Service was not saved") + } + + return nil +} + +func objectsAreDeleted(cluster *Cluster, err error) error { + if cluster.ConnectionPool != nil { + return fmt.Errorf("Connection pool was not deleted") + } + + return nil +} + +func TestConnPoolSynchronization(t *testing.T) { + testName := "Test connection pool synchronization" + var cluster = New( + Config{ + OpConfig: config.Config{ + ProtectedRoles: []string{"admin"}, + Auth: config.Auth{ + SuperUsername: superUserName, + ReplicationUsername: replicationUserName, + }, + ConnectionPool: config.ConnectionPool{ + ConnPoolDefaultCPURequest: "100m", + ConnPoolDefaultCPULimit: "100m", + ConnPoolDefaultMemoryRequest: "100Mi", + ConnPoolDefaultMemoryLimit: "100Mi", + NumberOfInstances: int32ToPointer(1), + }, + }, + }, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger) + + cluster.Statefulset = &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-sts", + }, + } + + clusterMissingObjects := *cluster + clusterMissingObjects.KubeClient = k8sutil.ClientMissingObjects() + + clusterMock := *cluster + clusterMock.KubeClient = k8sutil.NewMockKubernetesClient() + + clusterDirtyMock := *cluster + clusterDirtyMock.KubeClient = k8sutil.NewMockKubernetesClient() + clusterDirtyMock.ConnectionPool = &ConnectionPoolObjects{ + Deployment: &appsv1.Deployment{}, + Service: &v1.Service{}, + } + + clusterNewDefaultsMock := *cluster + clusterNewDefaultsMock.KubeClient = k8sutil.NewMockKubernetesClient() + cluster.OpConfig.ConnectionPool.Image = "pooler:2.0" + cluster.OpConfig.ConnectionPool.NumberOfInstances = int32ToPointer(2) + + tests := []struct { + subTest string + oldSpec *acidv1.Postgresql + newSpec *acidv1.Postgresql + cluster *Cluster + check func(cluster *Cluster, err error) error + }{ + { + subTest: "create if doesn't exist", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create if doesn't exist with a flag", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + EnableConnectionPool: boolToPointer(true), + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "create from scratch", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterMissingObjects, + check: objectsAreSaved, + }, + { + subTest: "delete if not needed", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterMock, + check: objectsAreDeleted, + }, + { + subTest: "cleanup if still there", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{}, + }, + cluster: &clusterDirtyMock, + check: objectsAreDeleted, + }, + { + subTest: "update deployment", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(1), + }, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{ + NumberOfInstances: int32ToPointer(2), + }, + }, + }, + cluster: &clusterMock, + check: deploymentUpdated, + }, + { + subTest: "update image from changed defaults", + oldSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + newSpec: &acidv1.Postgresql{ + Spec: acidv1.PostgresSpec{ + ConnectionPool: &acidv1.ConnectionPool{}, + }, + }, + cluster: &clusterNewDefaultsMock, + check: deploymentUpdated, + }, + } + for _, tt := range tests { + err := tt.cluster.syncConnectionPool(tt.oldSpec, tt.newSpec, mockInstallLookupFunction) + + if err := tt.check(tt.cluster, err); err != nil { + t.Errorf("%s [%s]: Could not synchronize, %+v", + testName, tt.subTest, err) + } + } +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 138b7015c..04d00cb58 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -69,3 +69,7 @@ type ClusterStatus struct { Spec acidv1.PostgresSpec Error error } + +type TemplateParams map[string]interface{} + +type InstallFunction func(schema string, user string) error diff --git a/pkg/cluster/util.go b/pkg/cluster/util.go index 8c02fed2e..dc1e93954 100644 --- a/pkg/cluster/util.go +++ b/pkg/cluster/util.go @@ -408,7 +408,32 @@ func (c *Cluster) labelsSet(shouldAddExtraLabels bool) labels.Set { } func (c *Cluster) labelsSelector() *metav1.LabelSelector { - return &metav1.LabelSelector{MatchLabels: c.labelsSet(false), MatchExpressions: nil} + return &metav1.LabelSelector{ + MatchLabels: c.labelsSet(false), + MatchExpressions: nil, + } +} + +// Return connection pool labels selector, which should from one point of view +// inherit most of the labels from the cluster itself, but at the same time +// have e.g. different `application` label, so that recreatePod operation will +// not interfere with it (it lists all the pods via labels, and if there would +// be no difference, it will recreate also pooler pods). +func (c *Cluster) connPoolLabelsSelector() *metav1.LabelSelector { + connPoolLabels := labels.Set(map[string]string{}) + + extraLabels := labels.Set(map[string]string{ + "connection-pool": c.connPoolName(), + "application": "db-connection-pool", + }) + + connPoolLabels = labels.Merge(connPoolLabels, c.labelsSet(false)) + connPoolLabels = labels.Merge(connPoolLabels, extraLabels) + + return &metav1.LabelSelector{ + MatchLabels: connPoolLabels, + MatchExpressions: nil, + } } func (c *Cluster) roleLabelsSet(shouldAddExtraLabels bool, role PostgresRole) labels.Set { @@ -483,3 +508,15 @@ func (c *Cluster) GetSpec() (*acidv1.Postgresql, error) { func (c *Cluster) patroniUsesKubernetes() bool { return c.OpConfig.EtcdHost == "" } + +func (c *Cluster) needConnectionPoolWorker(spec *acidv1.PostgresSpec) bool { + if spec.EnableConnectionPool == nil { + return spec.ConnectionPool != nil + } else { + return *spec.EnableConnectionPool + } +} + +func (c *Cluster) needConnectionPool() bool { + return c.needConnectionPoolWorker(&c.Spec) +} diff --git a/pkg/controller/operator_config.go b/pkg/controller/operator_config.go index 03602c3bd..970eef701 100644 --- a/pkg/controller/operator_config.go +++ b/pkg/controller/operator_config.go @@ -8,6 +8,7 @@ import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" "github.com/zalando/postgres-operator/pkg/util" "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -21,6 +22,10 @@ func (c *Controller) readOperatorConfigurationFromCRD(configObjectNamespace, con return config, nil } +func int32ToPointer(value int32) *int32 { + return &value +} + // importConfigurationFromCRD is a transitional function that converts CRD configuration to the one based on the configmap func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigurationData) *config.Config { result := &config.Config{} @@ -143,5 +148,51 @@ func (c *Controller) importConfigurationFromCRD(fromCRD *acidv1.OperatorConfigur result.ScalyrCPULimit = fromCRD.Scalyr.ScalyrCPULimit result.ScalyrMemoryLimit = fromCRD.Scalyr.ScalyrMemoryLimit + // Connection pool. Looks like we can't use defaulting in CRD before 1.17, + // so ensure default values here. + result.ConnectionPool.NumberOfInstances = util.CoalesceInt32( + fromCRD.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.NumberOfInstances = util.MaxInt32( + result.ConnectionPool.NumberOfInstances, + int32ToPointer(2)) + + result.ConnectionPool.Schema = util.Coalesce( + fromCRD.ConnectionPool.Schema, + constants.ConnectionPoolSchemaName) + + result.ConnectionPool.User = util.Coalesce( + fromCRD.ConnectionPool.User, + constants.ConnectionPoolUserName) + + result.ConnectionPool.Image = util.Coalesce( + fromCRD.ConnectionPool.Image, + "registry.opensource.zalan.do/acid/pgbouncer") + + result.ConnectionPool.Mode = util.Coalesce( + fromCRD.ConnectionPool.Mode, + constants.ConnectionPoolDefaultMode) + + result.ConnectionPool.ConnPoolDefaultCPURequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPURequest, + constants.ConnectionPoolDefaultCpuRequest) + + result.ConnectionPool.ConnPoolDefaultMemoryRequest = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryRequest, + constants.ConnectionPoolDefaultMemoryRequest) + + result.ConnectionPool.ConnPoolDefaultCPULimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultCPULimit, + constants.ConnectionPoolDefaultCpuLimit) + + result.ConnectionPool.ConnPoolDefaultMemoryLimit = util.Coalesce( + fromCRD.ConnectionPool.DefaultMemoryLimit, + constants.ConnectionPoolDefaultMemoryLimit) + + result.ConnectionPool.MaxDBConnections = util.CoalesceInt32( + fromCRD.ConnectionPool.MaxDBConnections, + int32ToPointer(constants.ConnPoolMaxDBConnections)) + return result } diff --git a/pkg/spec/types.go b/pkg/spec/types.go index 3e6bec8db..36783204d 100644 --- a/pkg/spec/types.go +++ b/pkg/spec/types.go @@ -23,13 +23,15 @@ const fileWithNamespace = "/var/run/secrets/kubernetes.io/serviceaccount/namespa // RoleOrigin contains the code of the origin of a role type RoleOrigin int -// The rolesOrigin constant values must be sorted by the role priority for resolveNameConflict(...) to work. +// The rolesOrigin constant values must be sorted by the role priority for +// resolveNameConflict(...) to work. const ( RoleOriginUnknown RoleOrigin = iota RoleOriginManifest RoleOriginInfrastructure RoleOriginTeamsAPI RoleOriginSystem + RoleConnectionPool ) type syncUserOperation int @@ -178,6 +180,8 @@ func (r RoleOrigin) String() string { return "teams API role" case RoleOriginSystem: return "system role" + case RoleConnectionPool: + return "connection pool role" default: panic(fmt.Sprintf("bogus role origin value %d", r)) } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index fee65be81..e0e095617 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/zalando/postgres-operator/pkg/spec" + "github.com/zalando/postgres-operator/pkg/util/constants" ) // CRD describes CustomResourceDefinition specific configuration parameters @@ -83,6 +84,20 @@ type LogicalBackup struct { LogicalBackupS3SSE string `name:"logical_backup_s3_sse" default:"AES256"` } +// Operator options for connection pooler +type ConnectionPool struct { + NumberOfInstances *int32 `name:"connection_pool_number_of_instances" default:"2"` + Schema string `name:"connection_pool_schema" default:"pooler"` + User string `name:"connection_pool_user" default:"pooler"` + Image string `name:"connection_pool_image" default:"registry.opensource.zalan.do/acid/pgbouncer"` + Mode string `name:"connection_pool_mode" default:"transaction"` + MaxDBConnections *int32 `name:"connection_pool_max_db_connections" default:"60"` + ConnPoolDefaultCPURequest string `name:"connection_pool_default_cpu_request" default:"500m"` + ConnPoolDefaultMemoryRequest string `name:"connection_pool_default_memory_request" default:"100Mi"` + ConnPoolDefaultCPULimit string `name:"connection_pool_default_cpu_limit" default:"1"` + ConnPoolDefaultMemoryLimit string `name:"connection_pool_default_memory_limit" default:"100Mi"` +} + // Config describes operator config type Config struct { CRD @@ -90,6 +105,7 @@ type Config struct { Auth Scalyr LogicalBackup + ConnectionPool WatchedNamespace string `name:"watched_namespace"` // special values: "*" means 'watch all namespaces', the empty string "" means 'watch a namespace where operator is deployed to' EtcdHost string `name:"etcd_host" default:""` // special values: the empty string "" means Patroni will use K8s as a DCS @@ -196,5 +212,10 @@ func validate(cfg *Config) (err error) { if cfg.Workers == 0 { err = fmt.Errorf("number of workers should be higher than 0") } + + if *cfg.ConnectionPool.NumberOfInstances < constants.ConnPoolMinInstances { + msg := "number of connection pool instances should be higher than %d" + err = fmt.Errorf(msg, constants.ConnPoolMinInstances) + } return } diff --git a/pkg/util/constants/pooler.go b/pkg/util/constants/pooler.go new file mode 100644 index 000000000..540d64e2c --- /dev/null +++ b/pkg/util/constants/pooler.go @@ -0,0 +1,18 @@ +package constants + +// Connection pool specific constants +const ( + ConnectionPoolUserName = "pooler" + ConnectionPoolSchemaName = "pooler" + ConnectionPoolDefaultType = "pgbouncer" + ConnectionPoolDefaultMode = "transaction" + ConnectionPoolDefaultCpuRequest = "500m" + ConnectionPoolDefaultCpuLimit = "1" + ConnectionPoolDefaultMemoryRequest = "100Mi" + ConnectionPoolDefaultMemoryLimit = "100Mi" + + ConnPoolContainer = 0 + ConnPoolMaxDBConnections = 60 + ConnPoolMaxClientConnections = 10000 + ConnPoolMinInstances = 2 +) diff --git a/pkg/util/constants/roles.go b/pkg/util/constants/roles.go index 2c20d69db..3d201142c 100644 --- a/pkg/util/constants/roles.go +++ b/pkg/util/constants/roles.go @@ -2,15 +2,16 @@ package constants // Roles specific constants const ( - PasswordLength = 64 - SuperuserKeyName = "superuser" - ReplicationUserKeyName = "replication" - RoleFlagSuperuser = "SUPERUSER" - RoleFlagInherit = "INHERIT" - RoleFlagLogin = "LOGIN" - RoleFlagNoLogin = "NOLOGIN" - RoleFlagCreateRole = "CREATEROLE" - RoleFlagCreateDB = "CREATEDB" - RoleFlagReplication = "REPLICATION" - RoleFlagByPassRLS = "BYPASSRLS" + PasswordLength = 64 + SuperuserKeyName = "superuser" + ConnectionPoolUserKeyName = "pooler" + ReplicationUserKeyName = "replication" + RoleFlagSuperuser = "SUPERUSER" + RoleFlagInherit = "INHERIT" + RoleFlagLogin = "LOGIN" + RoleFlagNoLogin = "NOLOGIN" + RoleFlagCreateRole = "CREATEROLE" + RoleFlagCreateDB = "CREATEDB" + RoleFlagReplication = "REPLICATION" + RoleFlagByPassRLS = "BYPASSRLS" ) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index 509b12c19..75b99ec7c 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -9,11 +9,13 @@ import ( batchv1beta1 "k8s.io/api/batch/v1beta1" clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" + apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" policybeta1 "k8s.io/api/policy/v1beta1" apiextclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" apiextbeta1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" appsv1 "k8s.io/client-go/kubernetes/typed/apps/v1" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -26,6 +28,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +func Int32ToPointer(value int32) *int32 { + return &value +} + // KubernetesClient describes getters for Kubernetes objects type KubernetesClient struct { corev1.SecretsGetter @@ -39,6 +45,7 @@ type KubernetesClient struct { corev1.NamespacesGetter corev1.ServiceAccountsGetter appsv1.StatefulSetsGetter + appsv1.DeploymentsGetter rbacv1.RoleBindingsGetter policyv1beta1.PodDisruptionBudgetsGetter apiextbeta1.CustomResourceDefinitionsGetter @@ -55,6 +62,34 @@ type mockSecret struct { type MockSecretGetter struct { } +type mockDeployment struct { + appsv1.DeploymentInterface +} + +type mockDeploymentNotExist struct { + appsv1.DeploymentInterface +} + +type MockDeploymentGetter struct { +} + +type MockDeploymentNotExistGetter struct { +} + +type mockService struct { + corev1.ServiceInterface +} + +type mockServiceNotExist struct { + corev1.ServiceInterface +} + +type MockServiceGetter struct { +} + +type MockServiceNotExistGetter struct { +} + type mockConfigMap struct { corev1.ConfigMapInterface } @@ -101,6 +136,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.NodesGetter = client.CoreV1() kubeClient.NamespacesGetter = client.CoreV1() kubeClient.StatefulSetsGetter = client.AppsV1() + kubeClient.DeploymentsGetter = client.AppsV1() kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1beta1() kubeClient.RESTClient = client.CoreV1().RESTClient() kubeClient.RoleBindingsGetter = client.RbacV1() @@ -230,19 +266,145 @@ func (c *mockConfigMap) Get(name string, options metav1.GetOptions) (*v1.ConfigM } // Secrets to be mocked -func (c *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { +func (mock *MockSecretGetter) Secrets(namespace string) corev1.SecretInterface { return &mockSecret{} } // ConfigMaps to be mocked -func (c *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { +func (mock *MockConfigMapsGetter) ConfigMaps(namespace string) corev1.ConfigMapInterface { return &mockConfigMap{} } +func (mock *MockDeploymentGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeployment{} +} + +func (mock *MockDeploymentNotExistGetter) Deployments(namespace string) appsv1.DeploymentInterface { + return &mockDeploymentNotExist{} +} + +func (mock *mockDeployment) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *mockDeployment) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockDeployment) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + v1.Container{ + Image: "pooler:1.0", + }, + }, + }, + }, + }, + }, nil +} + +func (mock *mockDeployment) Patch(name string, t types.PatchType, data []byte, subres ...string) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(2), + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + }, nil +} + +func (mock *mockDeploymentNotExist) Get(name string, opts metav1.GetOptions) (*apiappsv1.Deployment, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + +func (mock *mockDeploymentNotExist) Create(*apiappsv1.Deployment) (*apiappsv1.Deployment, error) { + return &apiappsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + }, + Spec: apiappsv1.DeploymentSpec{ + Replicas: Int32ToPointer(1), + }, + }, nil +} + +func (mock *MockServiceGetter) Services(namespace string) corev1.ServiceInterface { + return &mockService{} +} + +func (mock *MockServiceNotExistGetter) Services(namespace string) corev1.ServiceInterface { + return &mockServiceNotExist{} +} + +func (mock *mockService) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockService) Delete(name string, opts *metav1.DeleteOptions) error { + return nil +} + +func (mock *mockService) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Create(*v1.Service) (*v1.Service, error) { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-service", + }, + }, nil +} + +func (mock *mockServiceNotExist) Get(name string, opts metav1.GetOptions) (*v1.Service, error) { + return nil, &apierrors.StatusError{ + ErrStatus: metav1.Status{ + Reason: metav1.StatusReasonNotFound, + }, + } +} + // NewMockKubernetesClient for other tests func NewMockKubernetesClient() KubernetesClient { return KubernetesClient{ - SecretsGetter: &MockSecretGetter{}, - ConfigMapsGetter: &MockConfigMapsGetter{}, + SecretsGetter: &MockSecretGetter{}, + ConfigMapsGetter: &MockConfigMapsGetter{}, + DeploymentsGetter: &MockDeploymentGetter{}, + ServicesGetter: &MockServiceGetter{}, + } +} + +func ClientMissingObjects() KubernetesClient { + return KubernetesClient{ + DeploymentsGetter: &MockDeploymentNotExistGetter{}, + ServicesGetter: &MockServiceNotExistGetter{}, } } diff --git a/pkg/util/util.go b/pkg/util/util.go index ad6de14a2..8e3b7cd21 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -141,6 +141,40 @@ func Coalesce(val, defaultVal string) string { return val } +// Yeah, golang +func CoalesceInt32(val, defaultVal *int32) *int32 { + if val == nil { + return defaultVal + } + return val +} + +// Test if any of the values is nil +func testNil(values ...*int32) bool { + for _, v := range values { + if v == nil { + return true + } + } + + return false +} + +// Return maximum of two integers provided via pointers. If one value is not +// defined, return the other one. If both are not defined, result is also +// undefined, caller needs to check for that. +func MaxInt32(a, b *int32) *int32 { + if testNil(a, b) { + return nil + } + + if *a > *b { + return a + } + + return b +} + // IsSmallerQuantity : checks if first resource is of a smaller quantity than the second func IsSmallerQuantity(requestStr, limitStr string) (bool, error) {