Skip to content

Commit

Permalink
Gracefully drain connections when stopping the gateway (#1203)
Browse files Browse the repository at this point in the history
* Fix gateway's preStop hook: curl does not exist (anymore?) in envoy image

* Before stopping the gateway, wait until requests are finished on all public listeners (and exit anyway if it exceeds terminationGracePeriodSeconds)

* Drain listeners with appropriate endpoint

* Simpler drain + sleep

* Remove PARENT_SHUTDOWN_TIME_SECONDS and terminationGracePeriodSeconds

* Use a perl script (no need to open the admin HTTP interface!)

* Use bash instead of perl in preStop hook

* Review @skonto comments: use socket address for admin cluster

* [WIP] add graceful shutdown test and tweak CI to just run that test

* [WIP] Fix gracefulshutdown_test.go

* [WIP] try to fix race condition and lint

* [WIP] use initialTimeout + debug

* [WIP] fix gracefulshutdown_test.go logic

* [WIP] refacto and add some comments to clarify

* [WIP] fix lint

* [WIP] reintroduce kind-e2e-upgrade.yaml

* [WIP] add test case when request takes a little longer than the drain time

* [WIP] fix compilation issue

* [WIP] FIx compilation issue (again)

* [WIP] hopefully fix data race

* [WIP] refacto and hopefully fix race condition (use sync.Map)

* [WIP] fix compilation issue

* [WIP] Handle EOF

* [WIP] check gateway pod has been removed + manual debugging

* [WIP] debugging

* [WIP] more debugging

* [WIP] more debugging

* [WIP] increase livenessProbe failure threshold as I'm not sure it should return EOF

* [WIP] remove debugging related stuff

* Revert all unnecessary changes made for testing

* Revert unnecessary change (livenessProbe)

* Scale to 1 replica

* Typo

* Run gracefulshutdown test first (speed up feedback loop)

* Add a comment for terminationGracePeriodSeconds

* Don't update deployment twice

Patch env and terminationGracePeriodSeconds at the same time

* Fix bad patch

* Run gracefulshutdown test at the end

- avoids conflicts with other tests
- change gracefulshutdown test to delete all gateway pods

* Fix gracefulshutdown test

* Fix gracefulshutdown test

* Lint
  • Loading branch information
norbjd committed Apr 29, 2024
1 parent 874233c commit cc30bec
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 6 deletions.
19 changes: 15 additions & 4 deletions config/200-bootstrap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@ data:
exact: GET
route:
cluster: service_stats
- match:
safe_regex:
regex: '/drain_listeners'
headers:
- name: ':method'
string_match:
exact: POST
route:
cluster: service_stats
clusters:
- name: service_stats
connect_timeout: 0.250s
Expand All @@ -82,8 +91,9 @@ data:
lb_endpoints:
endpoint:
address:
pipe:
path: /tmp/envoy.admin
socket_address:
address: 127.0.0.1
port_value: 9901
- name: xds_cluster
# This keepalive is recommended by envoy docs.
# https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol
Expand Down Expand Up @@ -112,5 +122,6 @@ data:
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog
address:
pipe:
path: /tmp/envoy.admin
socket_address:
address: 127.0.0.1
port_value: 9901
9 changes: 8 additions & 1 deletion config/300-gateway.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ spec:
- --base-id 1
- -c /tmp/config/envoy-bootstrap.yaml
- --log-level info
- --drain-time-s $(DRAIN_TIME_SECONDS)
- --drain-strategy immediate
command:
- /usr/local/bin/envoy
env:
- name: DRAIN_TIME_SECONDS
value: "15"
image: docker.io/envoyproxy/envoy:v1.26-latest
name: kourier-gateway
ports:
Expand Down Expand Up @@ -85,7 +90,7 @@ spec:
lifecycle:
preStop:
exec:
command: ["/bin/sh","-c","curl -X POST --unix /tmp/envoy.admin http://localhost/healthcheck/fail; sleep 15"]
command: ["/bin/sh","-c","curl -X POST http://localhost:9901/drain_listeners?graceful; sleep $DRAIN_TIME_SECONDS"]
readinessProbe:
httpGet:
httpHeaders:
Expand Down Expand Up @@ -115,6 +120,8 @@ spec:
limits:
cpu: "1"
memory: 800Mi
# to ensure a graceful drain, terminationGracePeriodSeconds must be greater than DRAIN_TIME_SECONDS environment variable
terminationGracePeriodSeconds: 30
volumes:
- name: config-volume
configMap:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pires/go-proxyproto v0.6.1
go.uber.org/zap v1.27.0
golang.org/x/sync v0.7.0
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237
google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -79,7 +80,6 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/oauth2 v0.19.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
Expand Down
51 changes: 51 additions & 0 deletions test/e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,57 @@ kubectl -n "${KOURIER_CONTROL_NAMESPACE}" patch configmap/config-kourier --type
kubectl delete -f test/config/tracing
unset TRACING_COLLECTOR_FULL_ENDPOINT

echo ">> Change DRAIN_TIME_SECONDS and terminationGracePeriodSeconds for graceful shutdown tests"
kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" patch deployment/3scale-kourier-gateway -p '{
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "kourier-gateway",
"env": [
{
"name": "DRAIN_TIME_SECONDS",
"value": "30"
}
]
}
],
"terminationGracePeriodSeconds": 60
}
}
}
}'
kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" rollout status deployment/3scale-kourier-gateway --timeout=300s

echo ">> Running graceful shutdown tests"
DRAIN_TIME_SECONDS=30 go test -race -count=1 -timeout=20m -tags=e2e ./test/gracefulshutdown \
--ingressendpoint="${IPS[0]}" \
--ingressClass=kourier.ingress.networking.knative.dev \
--cluster-suffix="$CLUSTER_SUFFIX"

kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" patch deployment/3scale-kourier-gateway -p '{
"spec": {
"template": {
"spec": {
"containers": [
{
"name": "kourier-gateway",
"env": [
{
"name": "DRAIN_TIME_SECONDS",
"value": "15"
}
]
}
],
"terminationGracePeriodSeconds": null
}
}
}
}'
kubectl -n "${KOURIER_GATEWAY_NAMESPACE}" rollout status deployment/3scale-kourier-gateway --timeout=300s

echo ">> Set IdleTimeout to 50s"
kubectl -n "${KOURIER_CONTROL_NAMESPACE}" patch configmap/config-kourier --type merge -p '{"data":{"stream-idle-timeout":"50s"}}'

Expand Down
173 changes: 173 additions & 0 deletions test/gracefulshutdown/gracefulshutdown_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
//go:build e2e
// +build e2e

/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package gracefulshutdown

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
"sync"
"testing"
"time"

"golang.org/x/sync/errgroup"
"gotest.tools/v3/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"knative.dev/networking/pkg/apis/networking/v1alpha1"
"knative.dev/networking/test"
"knative.dev/networking/test/conformance/ingress"
)

const (
kourierGatewayNamespace = "kourier-system"
kourierGatewayLabel = "app=3scale-kourier-gateway"
)

func TestGracefulShutdown(t *testing.T) {
gatewayNs := kourierGatewayNamespace
if gatewayNsOverride := os.Getenv("GATEWAY_NAMESPACE_OVERRIDE"); gatewayNsOverride != "" {
gatewayNs = gatewayNsOverride
}

// Retrieve drain time from environment
var drainTime time.Duration
drainTimeSeconds := os.Getenv("DRAIN_TIME_SECONDS")
if drainTimeSeconds == "" {
t.Fatal("DRAIN_TIME_SECONDS environment variable must be set")
}

drainTime, err := time.ParseDuration(drainTimeSeconds + "s")
if err != nil {
t.Fatal("DRAIN_TIME_SECONDS is an invalid duration:", err)
}
if drainTime <= 5*time.Second {
t.Fatal("DRAIN_TIME_SECONDS must be greater than 5")
}

clients := test.Setup(t)
ctx := context.Background()

// Create a service and an ingress
name, port, _ := ingress.CreateTimeoutService(ctx, t, clients)
_, client, _ := ingress.CreateIngressReady(ctx, t, clients, v1alpha1.IngressSpec{
Rules: []v1alpha1.IngressRule{{
Hosts: []string{name + ".example.com"},
Visibility: v1alpha1.IngressVisibilityExternalIP,
HTTP: &v1alpha1.HTTPIngressRuleValue{
Paths: []v1alpha1.HTTPIngressPath{{
Splits: []v1alpha1.IngressBackendSplit{{
IngressBackend: v1alpha1.IngressBackend{
ServiceName: name,
ServiceNamespace: test.ServingNamespace,
ServicePort: intstr.FromInt(port),
},
}},
}},
},
}},
})

tests := []struct {
name string
requestDuration time.Duration
wantStatusCode int
}{
{
name: fmt.Sprintf("do a request taking slightly less than the drain time: %s", drainTime),
requestDuration: drainTime - (3 * time.Second),
wantStatusCode: http.StatusOK,
},
{
name: fmt.Sprintf("do a request taking slightly more than the drain time: %s", drainTime),
requestDuration: drainTime + (3 * time.Second),
wantStatusCode: 0,
},
}

g := new(errgroup.Group)
var statusCodes sync.Map

// Run all requests asynchronously at the same time, and collect the results in statusCodes map
for i := range tests {
test := tests[i]

g.Go(func() error {
statusCode, err := sendRequest(client, name, test.requestDuration)
statusCodes.Store(test.name, statusCode)
return err
})
}

// Ensures the requests sent by the goroutines above are in-flight
time.Sleep(1 * time.Second)

// Retrieve and delete all gateway pods
gatewayPods, err := clients.KubeClient.CoreV1().Pods(gatewayNs).List(ctx, metav1.ListOptions{
LabelSelector: kourierGatewayLabel,
})
if err != nil {
t.Fatal("Failed to get Gateway pods:", err)
}

for _, gatewayPod := range gatewayPods.Items {
if err := clients.KubeClient.CoreV1().Pods(gatewayNs).Delete(ctx, gatewayPod.Name, metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete pod %s: %v", gatewayPod.Name, err)
}
}

// Wait until we get responses from the asynchronous requests
if err := g.Wait(); err != nil {
t.Fatal(err)
}

for _, test := range tests {
statusCode, _ := statusCodes.Load(test.name)

assert.Equal(t, statusCode.(int), test.wantStatusCode, fmt.Sprintf("%s has failed: expected %d, got %s",
test.name, test.wantStatusCode, statusCode))
}
}

func sendRequest(client *http.Client, name string, requestTimeout time.Duration) (statusCode int, err error) {
reqURL := fmt.Sprintf("http://%s.example.com?initialTimeout=%d", name, requestTimeout.Milliseconds())
req, err := http.NewRequest("GET", reqURL, nil)
if err != nil {
return 0, fmt.Errorf("error making GET request: %w", err)
}

resp, err := client.Do(req)
if err != nil {
var errURL *url.Error
// When the gateway cuts the connection (after completing the drain process), an EOF error is returned to the client
if errors.As(err, &errURL) && errors.Is(errURL, io.EOF) {
return 0, nil
}

return 0, err
}
defer resp.Body.Close()

return resp.StatusCode, nil
}

0 comments on commit cc30bec

Please sign in to comment.