diff --git a/config/200-bootstrap.yaml b/config/200-bootstrap.yaml index a10ad8864..069b03569 100644 --- a/config/200-bootstrap.yaml +++ b/config/200-bootstrap.yaml @@ -72,6 +72,15 @@ data: exact: GET route: cluster: service_stats + - match: + safe_regex: + regex: '/drain_listeners' + headers: + - name: ':method' + string_match: + exact: POST + route: + cluster: service_stats clusters: - name: service_stats connect_timeout: 0.250s @@ -82,8 +91,9 @@ data: lb_endpoints: endpoint: address: - pipe: - path: /tmp/envoy.admin + socket_address: + address: 127.0.0.1 + port_value: 9901 - name: xds_cluster # This keepalive is recommended by envoy docs. # https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol @@ -112,5 +122,6 @@ data: typed_config: "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog address: - pipe: - path: /tmp/envoy.admin + socket_address: + address: 127.0.0.1 + port_value: 9901 diff --git a/config/300-gateway.yaml b/config/300-gateway.yaml index 4ecb4d921..5d8b42527 100644 --- a/config/300-gateway.yaml +++ b/config/300-gateway.yaml @@ -48,8 +48,13 @@ spec: - --base-id 1 - -c /tmp/config/envoy-bootstrap.yaml - --log-level info + - --drain-time-s $(DRAIN_TIME_SECONDS) + - --drain-strategy immediate command: - /usr/local/bin/envoy + env: + - name: DRAIN_TIME_SECONDS + value: "15" image: docker.io/envoyproxy/envoy:v1.26-latest name: kourier-gateway ports: @@ -85,7 +90,7 @@ spec: lifecycle: preStop: exec: - command: ["/bin/sh","-c","curl -X POST --unix /tmp/envoy.admin http://localhost/healthcheck/fail; sleep 15"] + command: ["/bin/sh","-c","curl -X POST http://localhost:9901/drain_listeners?graceful; sleep $DRAIN_TIME_SECONDS"] readinessProbe: httpGet: httpHeaders: @@ -115,6 +120,8 @@ spec: limits: cpu: "1" memory: 800Mi + # to ensure a graceful drain, terminationGracePeriodSeconds must be greater than DRAIN_TIME_SECONDS environment variable + terminationGracePeriodSeconds: 30 volumes: - name: config-volume configMap: diff --git a/go.mod b/go.mod index 32917f44f..e51c42d70 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pires/go-proxyproto v0.6.1 go.uber.org/zap v1.27.0 + golang.org/x/sync v0.7.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237 google.golang.org/grpc v1.63.2 google.golang.org/protobuf v1.33.0 @@ -79,7 +80,6 @@ require ( golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect golang.org/x/oauth2 v0.19.0 // indirect - golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/test/e2e-kind.sh b/test/e2e-kind.sh index ea07652ab..a5487bf95 100755 --- a/test/e2e-kind.sh +++ b/test/e2e-kind.sh @@ -190,6 +190,57 @@ kubectl -n "${KOURIER_CONTROL_NAMESPACE}" patch configmap/config-kourier --type kubectl delete -f test/config/tracing unset TRACING_COLLECTOR_FULL_ENDPOINT +echo ">> Change DRAIN_TIME_SECONDS and terminationGracePeriodSeconds for graceful shutdown tests" +kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" patch deployment/3scale-kourier-gateway -p '{ + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": "kourier-gateway", + "env": [ + { + "name": "DRAIN_TIME_SECONDS", + "value": "30" + } + ] + } + ], + "terminationGracePeriodSeconds": 60 + } + } + } +}' +kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" rollout status deployment/3scale-kourier-gateway --timeout=300s + +echo ">> Running graceful shutdown tests" +DRAIN_TIME_SECONDS=30 go test -race -count=1 -timeout=20m -tags=e2e ./test/gracefulshutdown \ + --ingressendpoint="${IPS[0]}" \ + --ingressClass=kourier.ingress.networking.knative.dev \ + --cluster-suffix="$CLUSTER_SUFFIX" + +kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" patch deployment/3scale-kourier-gateway -p '{ + "spec": { + "template": { + "spec": { + "containers": [ + { + "name": "kourier-gateway", + "env": [ + { + "name": "DRAIN_TIME_SECONDS", + "value": "15" + } + ] + } + ], + "terminationGracePeriodSeconds": null + } + } + } +}' +kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" rollout status deployment/3scale-kourier-gateway --timeout=300s + echo ">> Set IdleTimeout to 50s" kubectl -n "${KOURIER_CONTROL_NAMESPACE}" patch configmap/config-kourier --type merge -p '{"data":{"stream-idle-timeout":"50s"}}' diff --git a/test/gracefulshutdown/gracefulshutdown_test.go b/test/gracefulshutdown/gracefulshutdown_test.go new file mode 100644 index 000000000..946ac7ae2 --- /dev/null +++ b/test/gracefulshutdown/gracefulshutdown_test.go @@ -0,0 +1,173 @@ +//go:build e2e +// +build e2e + +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gracefulshutdown + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "os" + "sync" + "testing" + "time" + + "golang.org/x/sync/errgroup" + "gotest.tools/v3/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "knative.dev/networking/pkg/apis/networking/v1alpha1" + "knative.dev/networking/test" + "knative.dev/networking/test/conformance/ingress" +) + +const ( + kourierGatewayNamespace = "kourier-system" + kourierGatewayLabel = "app=3scale-kourier-gateway" +) + +func TestGracefulShutdown(t *testing.T) { + gatewayNs := kourierGatewayNamespace + if gatewayNsOverride := os.Getenv("GATEWAY_NAMESPACE_OVERRIDE"); gatewayNsOverride != "" { + gatewayNs = gatewayNsOverride + } + + // Retrieve drain time from environment + var drainTime time.Duration + drainTimeSeconds := os.Getenv("DRAIN_TIME_SECONDS") + if drainTimeSeconds == "" { + t.Fatal("DRAIN_TIME_SECONDS environment variable must be set") + } + + drainTime, err := time.ParseDuration(drainTimeSeconds + "s") + if err != nil { + t.Fatal("DRAIN_TIME_SECONDS is an invalid duration:", err) + } + if drainTime <= 5*time.Second { + t.Fatal("DRAIN_TIME_SECONDS must be greater than 5") + } + + clients := test.Setup(t) + ctx := context.Background() + + // Create a service and an ingress + name, port, _ := ingress.CreateTimeoutService(ctx, t, clients) + _, client, _ := ingress.CreateIngressReady(ctx, t, clients, v1alpha1.IngressSpec{ + Rules: []v1alpha1.IngressRule{{ + Hosts: []string{name + ".example.com"}, + Visibility: v1alpha1.IngressVisibilityExternalIP, + HTTP: &v1alpha1.HTTPIngressRuleValue{ + Paths: []v1alpha1.HTTPIngressPath{{ + Splits: []v1alpha1.IngressBackendSplit{{ + IngressBackend: v1alpha1.IngressBackend{ + ServiceName: name, + ServiceNamespace: test.ServingNamespace, + ServicePort: intstr.FromInt(port), + }, + }}, + }}, + }, + }}, + }) + + tests := []struct { + name string + requestDuration time.Duration + wantStatusCode int + }{ + { + name: fmt.Sprintf("do a request taking slightly less than the drain time: %s", drainTime), + requestDuration: drainTime - (3 * time.Second), + wantStatusCode: http.StatusOK, + }, + { + name: fmt.Sprintf("do a request taking slightly more than the drain time: %s", drainTime), + requestDuration: drainTime + (3 * time.Second), + wantStatusCode: 0, + }, + } + + g := new(errgroup.Group) + var statusCodes sync.Map + + // Run all requests asynchronously at the same time, and collect the results in statusCodes map + for i := range tests { + test := tests[i] + + g.Go(func() error { + statusCode, err := sendRequest(client, name, test.requestDuration) + statusCodes.Store(test.name, statusCode) + return err + }) + } + + // Ensures the requests sent by the goroutines above are in-flight + time.Sleep(1 * time.Second) + + // Retrieve and delete all gateway pods + gatewayPods, err := clients.KubeClient.CoreV1().Pods(gatewayNs).List(ctx, metav1.ListOptions{ + LabelSelector: kourierGatewayLabel, + }) + if err != nil { + t.Fatal("Failed to get Gateway pods:", err) + } + + for _, gatewayPod := range gatewayPods.Items { + if err := clients.KubeClient.CoreV1().Pods(gatewayNs).Delete(ctx, gatewayPod.Name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod %s: %v", gatewayPod.Name, err) + } + } + + // Wait until we get responses from the asynchronous requests + if err := g.Wait(); err != nil { + t.Fatal(err) + } + + for _, test := range tests { + statusCode, _ := statusCodes.Load(test.name) + + assert.Equal(t, statusCode.(int), test.wantStatusCode, fmt.Sprintf("%s has failed: expected %d, got %s", + test.name, test.wantStatusCode, statusCode)) + } +} + +func sendRequest(client *http.Client, name string, requestTimeout time.Duration) (statusCode int, err error) { + reqURL := fmt.Sprintf("http://%s.example.com?initialTimeout=%d", name, requestTimeout.Milliseconds()) + req, err := http.NewRequest("GET", reqURL, nil) + if err != nil { + return 0, fmt.Errorf("error making GET request: %w", err) + } + + resp, err := client.Do(req) + if err != nil { + var errURL *url.Error + // When the gateway cuts the connection (after completing the drain process), an EOF error is returned to the client + if errors.As(err, &errURL) && errors.Is(errURL, io.EOF) { + return 0, nil + } + + return 0, err + } + defer resp.Body.Close() + + return resp.StatusCode, nil +}