Skip to content

Commit

Permalink
Merge pull request #110000 from wojtek-t/fix_shutdown_sequence
Browse files Browse the repository at this point in the history
Fix apiserver shutdown in integration tests
  • Loading branch information
k8s-ci-robot committed May 26, 2022
2 parents 9ad4181 + 6f70677 commit eb37a5d
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 67 deletions.
32 changes: 23 additions & 9 deletions cmd/kube-apiserver/app/testing/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"

"k8s.io/klog/v2"
)

// This key is for testing purposes only and is not considered secure.
Expand Down Expand Up @@ -99,26 +101,34 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
instanceOptions = NewDefaultTestServerOptions()
}

result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}

stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
if len(result.TmpDir) != 0 {
os.RemoveAll(result.TmpDir)

// If the apiserver was started, let's wait for it to
// shutdown clearly.
if errCh != nil {
err, ok := <-errCh
if ok && err != nil {
klog.Errorf("Failed to shutdown test server clearly: %v", err)
}
}
os.RemoveAll(result.TmpDir)
}
defer func() {
if result.TearDownFn == nil {
tearDown()
}
}()

result.TmpDir, err = os.MkdirTemp("", "kubernetes-kube-apiserver")
if err != nil {
return result, fmt.Errorf("failed to create temp dir: %v", err)
}

fs := pflag.NewFlagSet("test", pflag.PanicOnError)

s := options.NewServerRunOptions()
Expand Down Expand Up @@ -209,8 +219,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager)
}

errCh := make(chan error)
errCh = make(chan error)
go func(stopCh <-chan struct{}) {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
Expand Down Expand Up @@ -302,7 +313,10 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
result.ClientConfig.QPS = 1000
result.ClientConfig.Burst = 10000
result.ServerOpts = s
result.TearDownFn = tearDown
result.TearDownFn = func() {
tearDown()
etcdClient.Close()
}
result.EtcdClient = etcdClient
result.EtcdStoragePrefix = storageConfig.Prefix

Expand Down
18 changes: 6 additions & 12 deletions test/integration/apiserver/certreload/certreload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,6 @@ func TestClientCARecreate(t *testing.T) {
}

func testClientCA(t *testing.T, recreate bool) {
stopCh := make(chan struct{})
defer close(stopCh)

frontProxyCA, err := newTestCAWithClient(
pkix.Name{
CommonName: "test-front-proxy-ca",
Expand Down Expand Up @@ -173,14 +170,15 @@ func testClientCA(t *testing.T, recreate bool) {
clientCAFilename := ""
frontProxyCAFilename := ""

kubeClient, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
kubeClient, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
clientCAFilename = opts.Authentication.ClientCert.ClientCA
frontProxyCAFilename = opts.Authentication.RequestHeader.ClientCAFile
opts.Authentication.RequestHeader.AllowedNames = append(opts.Authentication.RequestHeader.AllowedNames, "test-aggregated-apiserver")
},
})
defer tearDownFn()

// wait for request header info
err = wait.PollImmediate(100*time.Millisecond, 30*time.Second, waitForConfigMapCAContent(t, kubeClient, "requestheader-client-ca-file", "-----BEGIN CERTIFICATE-----", 1))
Expand Down Expand Up @@ -470,17 +468,15 @@ func TestServingCertRecreate(t *testing.T) {
}

func testServingCert(t *testing.T, recreate bool) {
stopCh := make(chan struct{})
defer close(stopCh)

var servingCertPath string

_, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
_, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
},
})
defer tearDownFn()

if recreate {
if err := os.Remove(path.Join(servingCertPath, "apiserver.key")); err != nil {
Expand Down Expand Up @@ -511,12 +507,9 @@ func testServingCert(t *testing.T, recreate bool) {
}

func TestSNICert(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

var servingCertPath string

_, kubeconfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
_, kubeconfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
servingCertPath = opts.SecureServing.ServerCert.CertDirectory
Expand All @@ -535,6 +528,7 @@ func TestSNICert(t *testing.T) {
}}
},
})
defer tearDownFn()

// When we run this the second time, we know which one we are expecting.
_, actualCerts, err := cert.GetServingCertificatesForURL(kubeconfig.Host, "foo")
Expand Down
5 changes: 2 additions & 3 deletions test/integration/apiserver/max_json_patch_operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,12 @@ import (

// Tests that the apiserver limits the number of operations in a json patch.
func TestMaxJSONPatchOperations(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
},
})
defer tearDownFn()

p := `{"op":"add","path":"/x","value":"y"}`
// maxJSONPatchOperations = 10000
Expand Down
5 changes: 2 additions & 3 deletions test/integration/apiserver/max_request_body_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ import (

// Tests that the apiserver limits the resource size in write operations.
func TestMaxResourceSize(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{})
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{})
defer tearDownFn()

hugeData := []byte(strings.Repeat("x", 3*1024*1024+1))

Expand Down
5 changes: 2 additions & 3 deletions test/integration/apiserver/podlogs/podlogs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
)

func TestInsecurePodLogs(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)
clientSet, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
// I have no idea what this cert is, but it doesn't matter, we just want something that always fails validation
Expand Down Expand Up @@ -63,6 +61,7 @@ Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq
`)
},
})
defer tearDownFn()

fakeKubeletServer := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("fake-log"))
Expand Down
6 changes: 2 additions & 4 deletions test/integration/auth/dynamic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,7 @@ func TestDynamicClientBuilder(t *testing.T) {
t.Fatalf("parse duration failed: %v", err)
}

stopCh := make(chan struct{})
defer close(stopCh)

baseClient, baseConfig := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
baseClient, baseConfig, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
opts.ServiceAccountSigningKeyFile = tmpfile.Name()
opts.ServiceAccountTokenMaxExpiration = maxExpirationDuration
Expand All @@ -75,6 +72,7 @@ func TestDynamicClientBuilder(t *testing.T) {
config.GenericConfig.Authorization.Authorizer = authorizerfactory.NewAlwaysAllowAuthorizer()
},
})
defer tearDownFn()

// We want to test if the token rotation works fine here.
// To minimize the time this test would consume, we use the minimial token expiration.
Expand Down
11 changes: 10 additions & 1 deletion test/integration/controlplane/graceful_shutdown_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ func TestGracefulShutdown(t *testing.T) {
resp.Body.Close()

t.Logf("shutting down server")
tearDownOnce.Do(server.TearDownFn)
// We tear it down in the background to ensure that
// pending requests should work fine.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
tearDownOnce.Do(server.TearDownFn)
}()

t.Logf("server should fail new requests")
if err := wait.Poll(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
Expand Down Expand Up @@ -100,6 +107,8 @@ func TestGracefulShutdown(t *testing.T) {
t.Fatal(err)
}
t.Logf("response: code %d, body: %s", respErr.resp.StatusCode, string(bs))

wg.Wait()
}

type responseErrorPair struct {
Expand Down
44 changes: 24 additions & 20 deletions test/integration/etcd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
Expand Down Expand Up @@ -118,24 +117,11 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
t.Fatal(err)
}

// get a leased session
session, err := concurrency.NewSession(rawClient)
if err != nil {
t.Fatal(err)
}

// then build and use an etcd lock
// this prevents more than one of these api servers from running at the same time
lock := concurrency.NewLocker(session, "kube_integration_etcd_raw")
lock.Lock()

// make sure we start with a clean slate
if _, err := kvClient.Delete(context.Background(), "/registry/", clientv3.WithPrefix()); err != nil {
t.Fatal(err)
}

stopCh := make(chan struct{})

kubeAPIServer, err := app.CreateServerChain(completedOptions)
if err != nil {
t.Fatal(err)
Expand All @@ -152,26 +138,38 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu

kubeClient := clientset.NewForConfigOrDie(kubeClientConfig)

stopCh := make(chan struct{})
errCh := make(chan error)
go func() {
// Catch panics that occur in this go routine so we get a comprehensible failure
defer func() {
if err := recover(); err != nil {
t.Errorf("Unexpected panic trying to start API server: %#v", err)
}
}()
defer close(errCh)

prepared, err := kubeAPIServer.PrepareRun()
if err != nil {
t.Error(err)
errCh <- err
return
}
if err := prepared.Run(stopCh); err != nil {
errCh <- err
t.Error(err)
return
}
}()

lastHealth := ""
attempt := 0
if err := wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
select {
case err := <-errCh:
return false, err
default:
}

// wait for the server to be healthy
result := kubeClient.RESTClient().Get().AbsPath("/healthz").Do(context.TODO())
content, _ := result.Raw()
Expand Down Expand Up @@ -207,12 +205,18 @@ func StartRealAPIServerOrDie(t *testing.T, configFuncs ...func(*options.ServerRu
}

cleanup := func() {
if err := os.RemoveAll(certDir); err != nil {
t.Log(err)
}
// Closing stopCh is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
lock.Unlock()
if err := session.Close(); err != nil {

// If the apiserver was started, let's wait for it to
// shutdown clearly.
err, ok := <-errCh
if ok && err != nil {
t.Error(err)
}
rawClient.Close()
if err := os.RemoveAll(certDir); err != nil {
t.Log(err)
}
}
Expand Down
6 changes: 2 additions & 4 deletions test/integration/examples/webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ import (
)

func TestWebhookLoopback(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

webhookPath := "/webhook-test"

called := int32(0)

client, _ := framework.StartTestServer(t, stopCh, framework.TestServerSetup{
client, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
},
ModifyServerConfig: func(config *controlplane.Config) {
Expand All @@ -66,6 +63,7 @@ func TestWebhookLoopback(t *testing.T) {
})
},
})
defer tearDownFn()

fail := admissionregistrationv1.Fail
noSideEffects := admissionregistrationv1.SideEffectClassNone
Expand Down

0 comments on commit eb37a5d

Please sign in to comment.