diff --git a/deployments/common/crds/k8s.nginx.org_transportservers.yaml b/deployments/common/crds/k8s.nginx.org_transportservers.yaml index dfd6e4aed2..9f5302e0b7 100644 --- a/deployments/common/crds/k8s.nginx.org_transportservers.yaml +++ b/deployments/common/crds/k8s.nginx.org_transportservers.yaml @@ -115,6 +115,8 @@ spec: type: integer timeout: type: string + maxConns: + type: integer maxFails: type: integer name: diff --git a/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml b/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml index dfd6e4aed2..9f5302e0b7 100644 --- a/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml +++ b/deployments/helm-chart/crds/k8s.nginx.org_transportservers.yaml @@ -115,6 +115,8 @@ spec: type: integer timeout: type: string + maxConns: + type: integer maxFails: type: integer name: diff --git a/docs-web/configuration/transportserver-resource.md b/docs-web/configuration/transportserver-resource.md index c06f05eee3..8fdf255839 100644 --- a/docs-web/configuration/transportserver-resource.md +++ b/docs-web/configuration/transportserver-resource.md @@ -172,6 +172,7 @@ name: secure-app service: secure-app port: 8443 maxFails: 3 +maxConns: 100 failTimeout: 30s ``` @@ -199,6 +200,10 @@ failTimeout: 30s - Sets the `number `_ of unsuccessful attempts to communicate with the server that should happen in the duration set by the failTimeout parameter to consider the server unavailable. The default ``1``. - ``int`` - No + * - ``maxConns`` + - Sets the `number `_ of maximum connections to the proxied server. Default value is zero, meaning there is no limit. The default is ``0``. + - ``int`` + - No * - ``failTimeout`` - Sets the `time `_ during which the specified number of unsuccessful attempts to communicate with the server should happen to consider the server unavailable and the period of time the server will be considered unavailable. The default is ``10s``. - ``string`` diff --git a/internal/configs/transportserver.go b/internal/configs/transportserver.go index 87afd00be7..9629b5c78e 100644 --- a/internal/configs/transportserver.go +++ b/internal/configs/transportserver.go @@ -172,13 +172,15 @@ func generateStreamUpstream(upstream *conf_v1alpha1.Upstream, upstreamNamer *ups name := upstreamNamer.GetNameForUpstream(upstream.Name) maxFails := generateIntFromPointer(upstream.MaxFails, 1) + maxConns := generateIntFromPointer(upstream.MaxConns, 0) failTimeout := generateTimeWithDefault(upstream.FailTimeout, "10s") for _, e := range endpoints { s := version2.StreamUpstreamServer{ - Address: e, - MaxFails: maxFails, - FailTimeout: failTimeout, + Address: e, + MaxFails: maxFails, + FailTimeout: failTimeout, + MaxConnections: maxConns, } upsServers = append(upsServers, s) diff --git a/internal/configs/transportserver_test.go b/internal/configs/transportserver_test.go index faa0ec9ba9..570b2dd7f9 100644 --- a/internal/configs/transportserver_test.go +++ b/internal/configs/transportserver_test.go @@ -225,6 +225,94 @@ func TestGenerateTransportServerConfigForTCP(t *testing.T) { } +func TestGenerateTransportServerConfigForTCPMaxConnections(t *testing.T) { + transportServerEx := TransportServerEx{ + TransportServer: &conf_v1alpha1.TransportServer{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "tcp-server", + Namespace: "default", + }, + Spec: conf_v1alpha1.TransportServerSpec{ + Listener: conf_v1alpha1.TransportServerListener{ + Name: "tcp-listener", + Protocol: "TCP", + }, + Upstreams: []conf_v1alpha1.Upstream{ + { + Name: "tcp-app", + Service: "tcp-app-svc", + Port: 5001, + MaxFails: intPointer(3), + MaxConns: intPointer(3), + FailTimeout: "40s", + }, + }, + UpstreamParameters: &conf_v1alpha1.UpstreamParameters{ + ConnectTimeout: "30s", + NextUpstream: false, + }, + SessionParameters: &conf_v1alpha1.SessionParameters{ + Timeout: "50s", + }, + Action: &conf_v1alpha1.Action{ + Pass: "tcp-app", + }, + }, + }, + Endpoints: map[string][]string{ + "default/tcp-app-svc:5001": { + "10.0.0.20:5001", + }, + }, + } + + listenerPort := 2020 + + expected := &version2.TransportServerConfig{ + Upstreams: []version2.StreamUpstream{ + { + Name: "ts_default_tcp-server_tcp-app", + Servers: []version2.StreamUpstreamServer{ + { + Address: "10.0.0.20:5001", + MaxFails: 3, + FailTimeout: "40s", + MaxConnections: 3, + }, + }, + UpstreamLabels: version2.UpstreamLabels{ + ResourceName: "tcp-server", + ResourceType: "transportserver", + ResourceNamespace: "default", + Service: "tcp-app-svc", + }, + }, + }, + Server: version2.StreamServer{ + Port: 2020, + UDP: false, + StatusZone: "tcp-listener", + ProxyPass: "ts_default_tcp-server_tcp-app", + Name: "tcp-server", + Namespace: "default", + ProxyConnectTimeout: "30s", + ProxyNextUpstream: false, + ProxyNextUpstreamTries: 0, + ProxyNextUpstreamTimeout: "0s", + ProxyTimeout: "50s", + HealthCheck: nil, + ServerSnippets: []string{}, + }, + StreamSnippets: []string{}, + } + + result := generateTransportServerConfig(&transportServerEx, listenerPort, true) + if diff := cmp.Diff(expected, result); diff != "" { + t.Errorf("generateTransportServerConfig() mismatch (-want +got):\n%s", diff) + } + +} + func TestGenerateTransportServerConfigForTLSPasstrhough(t *testing.T) { transportServerEx := TransportServerEx{ TransportServer: &conf_v1alpha1.TransportServer{ diff --git a/internal/configs/version2/nginx-plus.transportserver.tmpl b/internal/configs/version2/nginx-plus.transportserver.tmpl index bd66d43c81..4c4110ade5 100644 --- a/internal/configs/version2/nginx-plus.transportserver.tmpl +++ b/internal/configs/version2/nginx-plus.transportserver.tmpl @@ -5,7 +5,7 @@ upstream {{ $u.Name }} { random two least_conn; {{ range $s := $u.Servers }} - server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }}; + server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }} max_conns={{ $s.MaxConnections }}; {{ end }} } {{ end }} diff --git a/internal/configs/version2/nginx.transportserver.tmpl b/internal/configs/version2/nginx.transportserver.tmpl index d197ee2740..fae5c9f7db 100644 --- a/internal/configs/version2/nginx.transportserver.tmpl +++ b/internal/configs/version2/nginx.transportserver.tmpl @@ -5,7 +5,7 @@ upstream {{ $u.Name }} { random two least_conn; {{ range $s := $u.Servers }} - server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }}; + server {{ $s.Address }} max_fails={{ $s.MaxFails }} fail_timeout={{ $s.FailTimeout }} max_conns={{ $s.MaxConnections }}; {{ end }} } {{ end }} diff --git a/internal/configs/version2/stream.go b/internal/configs/version2/stream.go index d7e58c4508..a7641411d1 100644 --- a/internal/configs/version2/stream.go +++ b/internal/configs/version2/stream.go @@ -16,9 +16,10 @@ type StreamUpstream struct { // StreamUpstreamServer defines a stream upstream server. type StreamUpstreamServer struct { - Address string - MaxFails int - FailTimeout string + Address string + MaxFails int + FailTimeout string + MaxConnections int } // StreamServer defines a server in the stream module. diff --git a/pkg/apis/configuration/v1alpha1/types.go b/pkg/apis/configuration/v1alpha1/types.go index 10a8588eb8..a11796c186 100644 --- a/pkg/apis/configuration/v1alpha1/types.go +++ b/pkg/apis/configuration/v1alpha1/types.go @@ -90,6 +90,7 @@ type Upstream struct { Port int `json:"port"` FailTimeout string `json:"failTimeout"` MaxFails *int `json:"maxFails"` + MaxConns *int `json:"maxConns"` HealthCheck *HealthCheck `json:"healthCheck"` } diff --git a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go index 7ed79b9065..3f30d7a114 100644 --- a/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/configuration/v1alpha1/zz_generated.deepcopy.go @@ -293,6 +293,11 @@ func (in *Upstream) DeepCopyInto(out *Upstream) { *out = new(int) **out = **in } + if in.MaxConns != nil { + in, out := &in.MaxConns, &out.MaxConns + *out = new(int) + **out = **in + } if in.HealthCheck != nil { in, out := &in.HealthCheck, &out.HealthCheck *out = new(HealthCheck) diff --git a/pkg/apis/configuration/validation/transportserver.go b/pkg/apis/configuration/validation/transportserver.go index ed622a001d..8245c40682 100644 --- a/pkg/apis/configuration/validation/transportserver.go +++ b/pkg/apis/configuration/validation/transportserver.go @@ -164,7 +164,8 @@ func validateTransportServerUpstreams(upstreams []v1alpha1.Upstream, fieldPath * allErrs = append(allErrs, validateServiceName(u.Service, idxPath.Child("service"))...) allErrs = append(allErrs, validatePositiveIntOrZeroFromPointer(u.MaxFails, idxPath.Child("maxFails"))...) - allErrs = append(allErrs, validateTime((u.FailTimeout), idxPath.Child("failTimeout"))...) + allErrs = append(allErrs, validatePositiveIntOrZeroFromPointer(u.MaxFails, idxPath.Child("maxConns"))...) + allErrs = append(allErrs, validateTime(u.FailTimeout, idxPath.Child("failTimeout"))...) for _, msg := range validation.IsValidPortNum(u.Port) { allErrs = append(allErrs, field.Invalid(idxPath.Child("port"), u.Port, msg)) diff --git a/tests/data/transport-server-tcp-load-balance/max-connections-transport-server.yaml b/tests/data/transport-server-tcp-load-balance/max-connections-transport-server.yaml new file mode 100644 index 0000000000..da2e1a4a34 --- /dev/null +++ b/tests/data/transport-server-tcp-load-balance/max-connections-transport-server.yaml @@ -0,0 +1,15 @@ +apiVersion: k8s.nginx.org/v1alpha1 +kind: TransportServer +metadata: + name: transport-server +spec: + listener: + name: tcp-server + protocol: TCP + upstreams: + - name: tcp-app + service: tcp-service + port: 3333 + maxConns: 2 + action: + pass: tcp-app diff --git a/tests/data/transport-server-tcp-load-balance/standard/service_deployment.yaml b/tests/data/transport-server-tcp-load-balance/standard/service_deployment.yaml index d8af18dc92..eb0d182901 100644 --- a/tests/data/transport-server-tcp-load-balance/standard/service_deployment.yaml +++ b/tests/data/transport-server-tcp-load-balance/standard/service_deployment.yaml @@ -14,7 +14,7 @@ spec: spec: containers: - name: tcp-service - image: seanoneillf5/tcp-server:1.0 + image: seanoneillf5/tcp-server:1.1 ports: - containerPort: 3333 name: tcp-server diff --git a/tests/suite/test_transport_server_tcp_load_balance.py b/tests/suite/test_transport_server_tcp_load_balance.py index 609bfe307d..45ccaccb53 100644 --- a/tests/suite/test_transport_server_tcp_load_balance.py +++ b/tests/suite/test_transport_server_tcp_load_balance.py @@ -1,6 +1,7 @@ import pytest import re import socket +import time from suite.resources_utils import ( wait_before_test, @@ -39,13 +40,14 @@ def restore_ts(self, kube_apis, transport_server_setup) -> None: """ Function to revert a TransportServer resource to a valid state. """ - patch_src = f"{TEST_DATA}/transport-server-status/standard/transport-server.yaml" + patch_src = f"{TEST_DATA}/transport-server-tcp-load-balance/standard/transport-server.yaml" patch_ts( kube_apis.custom_objects, transport_server_setup.name, patch_src, transport_server_setup.namespace, ) + wait_before_test() def test_number_of_replicas( self, kube_apis, crd_ingress_controller, transport_server_setup, ingress_controller_prerequisites @@ -88,6 +90,7 @@ def test_tcp_request_load_balanced( for i in range(20): client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) + client.sendall(b'connect') response = client.recv(4096) endpoint = response.decode() print(f' req number {i}; response: {endpoint}') @@ -129,6 +132,7 @@ def test_tcp_request_load_balanced_multiple( print(f"sending tcp requests to: {host}:{port}") client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) + client.sendall(b'connect') response = client.recv(4096) endpoint = response.decode() print(f'response: {endpoint}') @@ -174,6 +178,7 @@ def test_tcp_request_load_balanced_multiple( print(f"sending tcp requests to: {host}:{port}") client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) + client.sendall(b'connect') response = client.recv(4096) endpoint = response.decode() print(f'response: {endpoint}') @@ -210,12 +215,12 @@ def test_tcp_request_load_balanced_wrong_port( print(f"sending tcp requests to: {host}:{port}") for i in range(3): - client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client.connect((host, port)) - response = client.recv(4096) - endpoint = response.decode() - assert endpoint == "" - client.close() + try: + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect((host, port)) + client.sendall(b'connect') + except ConnectionResetError as E: + print("The expected exception occurred:", E) self.restore_ts(kube_apis, transport_server_setup) @@ -241,11 +246,91 @@ def test_tcp_request_load_balanced_missing_service( print(f"sending tcp requests to: {host}:{port}") for i in range(3): - client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - client.connect((host, port)) - response = client.recv(4096) - endpoint = response.decode() - assert endpoint == "" - client.close() + try: + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect((host, port)) + client.sendall(b'connect') + except ConnectionResetError as E: + print("The expected exception occurred:", E) self.restore_ts(kube_apis, transport_server_setup) + + def make_holding_connection(self, host, port): + print(f"sending tcp requests to: {host}:{port}") + client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + client.connect((host, port)) + client.sendall(b'hold') + response = client.recv(4096) + endpoint = response.decode() + print(f'response: {endpoint}') + return client + + def test_tcp_request_max_connections( + self, kube_apis, crd_ingress_controller, transport_server_setup, ingress_controller_prerequisites + ): + """ + The config, maxConns, should limit the number of open TCP connections. + 3 replicas of max 2 connections is 6, so making the 7th connection will fail. + """ + + # step 1 - set max connections to 2 with 1 replica + patch_src = f"{TEST_DATA}/transport-server-tcp-load-balance/max-connections-transport-server.yaml" + patch_ts( + kube_apis.custom_objects, + transport_server_setup.name, + patch_src, + transport_server_setup.namespace, + ) + wait_before_test() + + result_conf = get_ts_nginx_template_conf( + kube_apis.v1, + transport_server_setup.namespace, + transport_server_setup.name, + transport_server_setup.ingress_pod_name, + ingress_controller_prerequisites.namespace + ) + + pattern = 'max_conns=2' + configs = re.findall(pattern, result_conf) + + assert len(configs) is 3 + + # step 2 - make the number of allowed connections + port = transport_server_setup.public_endpoint.tcp_server_port + host = transport_server_setup.public_endpoint.public_ip + + clients = [] + for i in range(6): + c = self.make_holding_connection(host, port) + clients.append(c) + + # step 3 - assert the next connection fails + try: + c = self.make_holding_connection(host, port) + # making a connection should fail and throw an exception + assert c is None + except ConnectionResetError as E: + print("The expected exception occurred:", E) + + for c in clients: + c.close() + + # step 4 - revert to config with no max connections + patch_src = f"{TEST_DATA}/transport-server-tcp-load-balance/standard/transport-server.yaml" + patch_ts( + kube_apis.custom_objects, + transport_server_setup.name, + patch_src, + transport_server_setup.namespace, + ) + wait_before_test() + + # step 5 - confirm making lots of connections doesn't cause an error + clients = [] + for i in range(24): + c = self.make_holding_connection(host, port) + clients.append(c) + + for c in clients: + c.close() diff --git a/tests/tcp-server/README.md b/tests/tcp-server/README.md index f5adcdec2f..b2e3e28573 100644 --- a/tests/tcp-server/README.md +++ b/tests/tcp-server/README.md @@ -21,8 +21,8 @@ If you make changes to the server: local version ```-> imagePullPolicy: Never``` * Test the changes * Include the change as part of the commit that requires the tcp-server change - * Build the docker image with an increased version number ```docker build -t tcp-server:v2``` - * Push the docker image to the public repo + * Build the docker image with an increased version number ```docker build -t tcp-server:2.1``` + * Push the docker image to the public repo ```docker push seanoneillf5/tcp-server:2.1``` * Update the tag [service yaml](../data/transport-server-tcp-load-balance/standard/service_deployment.yaml) to match the new tag * Commit the tag change as part of the commit that requires the tcp-server change diff --git a/tests/tcp-server/main.go b/tests/tcp-server/main.go index 59e5ad1567..b8d58a0925 100644 --- a/tests/tcp-server/main.go +++ b/tests/tcp-server/main.go @@ -31,11 +31,24 @@ func main() { func handleRequest(conn net.Conn) { log.Println("accepted new connection") - defer conn.Close() - defer log.Println("closed connection") + + buf := make([]byte, 1024) + n, err := conn.Read(buf) + if err != nil { + log.Println("Error reading:", err.Error()) + conn.Close() + return + } + instruction := string(buf[:n]) + log.Printf("instruction:%v\n", instruction) + if instruction != "hold" { + defer conn.Close() + defer log.Println("closed connection") + } + address := conn.LocalAddr().String() log.Printf("write data to connection: %v\n", address) - _, err := conn.Write([]byte(address)) + _, err = conn.Write([]byte(address)) if err != nil { log.Printf("error writing to connection: %v", err) return