Skip to content

Commit

Permalink
apiserver + controllers: enhance context support
Browse files Browse the repository at this point in the history
27a68ae introduced context support for events. Creating an event
broadcaster with context makes tests more resilient against leaking goroutines
when that context gets canceled at the end of a test and enables per-test
output via ktesting.

The context could get passed to the constructor. A cleaner solution is to
enhance context support for the apiserver and then pass the context into the
controller's run method. This ripples up the call stack to all places which
start an apiserver.
  • Loading branch information
pohly committed Apr 24, 2024
1 parent cfe9104 commit cd457e1
Show file tree
Hide file tree
Showing 26 changed files with 200 additions and 129 deletions.
8 changes: 5 additions & 3 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package app

import (
"context"
"crypto/tls"
"fmt"
"net/http"
Expand Down Expand Up @@ -115,7 +116,7 @@ cluster's shared state through which all other components interact.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
return Run(cmd.Context(), completedOptions)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand All @@ -126,6 +127,7 @@ cluster's shared state through which all other components interact.`,
return nil
},
}
cmd.SetContext(genericapiserver.SetupSignalContext())

fs := cmd.Flags()
namedFlagSets := s.Flags()
Expand All @@ -143,7 +145,7 @@ cluster's shared state through which all other components interact.`,
}

// Run runs the specified APIServer. This should never exit.
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

Expand All @@ -167,7 +169,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
return err
}

return prepared.Run(stopCh)
return prepared.Run(ctx)
}

// CreateServerChain creates the apiservers connected via delegation.
Expand Down
21 changes: 12 additions & 9 deletions cmd/kube-apiserver/app/testing/testserver.go
Expand Up @@ -32,6 +32,7 @@ import (
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/spf13/pflag"
Expand All @@ -56,10 +57,11 @@ import (
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"

"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
)

func init() {
Expand Down Expand Up @@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
tCtx := ktesting.Init(t)

if instanceOptions == nil {
instanceOptions = NewDefaultTestServerOptions()
}
Expand All @@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, fmt.Errorf("failed to create temp dir: %v", err)
}

stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("tearing down")

// If the apiserver was started, let's wait for it to
// shutdown clearly.
Expand Down Expand Up @@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}

errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
} else if err := prepared.Run(stopCh); err != nil {
} else if err := prepared.Run(tCtx); err != nil {
errCh <- err
}
}(stopCh)
}()

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
Expand Down Expand Up @@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}

// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
if err == nil {
return &result
Expand Down
Expand Up @@ -55,9 +55,6 @@ func NewController(
secondaryRange net.IPNet,
client clientset.Interface,
) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

c := &Controller{
client: client,
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
Expand All @@ -79,9 +76,6 @@ func NewController(
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced

c.eventBroadcaster = broadcaster
c.eventRecorder = recorder

return c
}

Expand All @@ -101,9 +95,12 @@ type Controller struct {
}

// Start will not return until the default ServiceCIDR exists or stopCh is closed.
func (c *Controller) Start(stopCh <-chan struct{}) {
func (c *Controller) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
stopCh := ctx.Done()

c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
Expand All @@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
return
}

// derive a context from the stopCh so we can cancel the poll loop
ctx := wait.ContextForChannel(stopCh)
// wait until first successfully sync
// this blocks apiserver startup so poll with a short interval
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controlplane/instance.go
Expand Up @@ -524,7 +524,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
)
// The default serviceCIDR must exist before the apiserver is healthy
// otherwise the allocators for Services will not work.
controller.Start(hookContext.StopCh)
controller.Start(hookContext)
return nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions staging/src/k8s.io/apiextensions-apiserver/main.go
Expand Up @@ -25,8 +25,8 @@ import (
)

func main() {
stopCh := genericapiserver.SetupSignalHandler()
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
ctx := genericapiserver.SetupSignalContext()
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
code := cli.Run(cmd)
os.Exit(code)
}
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package server

import (
"context"
"io"

"github.com/spf13/cobra"
Expand All @@ -25,7 +26,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
)

func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)

cmd := &cobra.Command{
Expand All @@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
if err := o.Validate(); err != nil {
return err
}
if err := Run(o, stopCh); err != nil {
if err := Run(c.Context(), o); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)

fs := cmd.Flags()
o.AddFlags(fs)
return cmd
}

func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
config, err := o.Config()
if err != nil {
return err
Expand All @@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
if err != nil {
return err
}
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}
Expand Up @@ -18,6 +18,7 @@ package testing

import (
"context"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
stopCh := make(chan struct{})
// TODO: this is a candidate for using what is now test/utils/ktesting,
// should that become a staging repo.
ctx, cancel := context.WithCancelCause(context.Background())
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiextensions apiserver and its
// Cancel is stopping apiextensions apiserver and its
// delegates, which itself is cleaning up after itself,
// including shutting down its storage layer.
close(stopCh)
cancel(errors.New("tearing down"))

// If the apiextensions apiserver was started, let's wait for
// it to shutdown clearly.
Expand Down Expand Up @@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
}

errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)

if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}(stopCh)
}()

t.Logf("Waiting for /healthz to be ok...")

Expand Down
10 changes: 7 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/server/config_test.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package server

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -42,6 +44,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)

Expand Down Expand Up @@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}

func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test is done"))
delegateConfig := NewConfig(codecs)
delegateConfig.ExternalAddress = "192.168.10.4:443"
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
Expand Down Expand Up @@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
return nil
})

stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks(stopCh)
wrappingServer.RunPostStartHooks(ctx)

server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()
Expand Down

0 comments on commit cd457e1

Please sign in to comment.