Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controllers + apiserver: enhance context support #122148

Merged
merged 2 commits into from Apr 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 5 additions & 3 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package app

import (
"context"
"crypto/tls"
"fmt"
"net/http"
Expand Down Expand Up @@ -114,7 +115,7 @@ cluster's shared state through which all other components interact.`,
}
// add feature enablement metrics
utilfeature.DefaultMutableFeatureGate.AddMetrics()
return Run(completedOptions, genericapiserver.SetupSignalHandler())
return Run(cmd.Context(), completedOptions)
},
Args: func(cmd *cobra.Command, args []string) error {
for _, arg := range args {
Expand All @@ -125,6 +126,7 @@ cluster's shared state through which all other components interact.`,
return nil
},
}
cmd.SetContext(genericapiserver.SetupSignalContext())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 – some go proverb: don't store contexts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know, but sometimes the alternative is breaking the API badly. I bet that was the reason for cobra's decision to add a command context. As that is now part of their API, we should use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand why they do it. It's always about late initialization of a context that is wired in constructor code. So you have to cut the link and that only work by accessing a stored context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: is the context from the root command automatically propagated to the sub commands? after taking a quick glance, looks like it rests with the author to propagate the ctx to the sub commands.
Just wondering if genericapiserver.SetupSignalContext() can get called more than once for a kube-apiserver process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to check how Cobra handles sub-commands.

Regarding genericapiserver.SetupSignalContext that can be called more than once because it uses signal.Notify under the hood and that can be called more than once:

It is allowed to call Notify multiple times with different channels
and the same signals: each channel receives copies of incoming signals independently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps because that code not only maps the signal to the context, it also invokes os.Exit. IMHO that would still be safe to set up more than once, just not desirable.

@stts: you wrote that code, do you remember the rationale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the context from the root command automatically propagated to the sub commands?

Yes. I checked with this patch:

diff --git a/cmd/kubectl/kubectl.go b/cmd/kubectl/kubectl.go
index 09c18cfa209..5143366839a 100644
--- a/cmd/kubectl/kubectl.go
+++ b/cmd/kubectl/kubectl.go
@@ -21,12 +21,15 @@ import (
        "k8s.io/kubectl/pkg/cmd"
        "k8s.io/kubectl/pkg/cmd/util"
 
+       genericapiserver "k8s.io/apiserver/pkg/server"
+
        // Import to initialize client auth plugins.
        _ "k8s.io/client-go/plugin/pkg/client/auth"
 )
 
 func main() {
        command := cmd.NewDefaultKubectlCommand()
+       command.SetContext(genericapiserver.SetupSignalContext())
        if err := cli.RunNoErrOutput(command); err != nil {
                // Pretty-print the error and exit with an error.
                util.CheckErr(err)
diff --git a/staging/src/k8s.io/apiserver/pkg/server/signal.go b/staging/src/k8s.io/apiserver/pkg/server/signal.go
index e5334ae4c15..24883d02bfe 100644
--- a/staging/src/k8s.io/apiserver/pkg/server/signal.go
+++ b/staging/src/k8s.io/apiserver/pkg/server/signal.go
@@ -51,7 +51,7 @@ func SetupSignalContext() context.Context {
                os.Exit(1) // second signal. Exit directly.
        }()
 
-       return ctx
+       return context.WithValue(ctx, "foo", "bar")
 }
 
 // RequestShutdown emulates a received event that is considered as shutdown signal (SIGTERM/SIGINT)

Then in a sub-command like

cmdutil.CheckErr(o.Complete(f, cmd))
dlv confirms that cmd.ctx is indeed the context prepared by SetupSignalContext:

> k8s.io/kubectl/pkg/cmd/run.NewCmdRun.func1() ./staging/src/k8s.io/kubectl/pkg/cmd/run/run.go:157 (hits goroutine(1):1 total:1) (PC: 0x3890707)
...
(dlv) p cmd.ctx
context.Context(*context.valueCtx) *{
	Context: context.Context(*context.cancelCtx) *{
		Context: context.Context(context.backgroundCtx) *(*context.Context)(0xc000522550),
		mu: (*sync.Mutex)(0xc000522560),
		done: (*"sync/atomic.Value")(0xc000522568),
		children: map[context.canceler]struct {} nil,
		err: error nil,
		cause: error nil,},
	key: interface {}(string) "foo",
	val: interface {}(string) "bar",}


fs := cmd.Flags()
namedFlagSets := s.Flags()
Expand All @@ -142,7 +144,7 @@ cluster's shared state through which all other components interact.`,
}

// Run runs the specified APIServer. This should never exit.
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, opts options.CompletedOptions) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())

Expand All @@ -166,7 +168,7 @@ func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
return err
}

return prepared.Run(stopCh)
return prepared.Run(ctx)
}

// CreateServerChain creates the apiservers connected via delegation.
Expand Down
21 changes: 12 additions & 9 deletions cmd/kube-apiserver/app/testing/testserver.go
Expand Up @@ -32,6 +32,7 @@ import (
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/spf13/pflag"
Expand All @@ -56,10 +57,11 @@ import (
"k8s.io/klog/v2"
"k8s.io/kube-aggregator/pkg/apiserver"
"k8s.io/kubernetes/pkg/features"
testutil "k8s.io/kubernetes/test/utils"
"k8s.io/kubernetes/test/utils/ktesting"

"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
testutil "k8s.io/kubernetes/test/utils"
)

func init() {
Expand Down Expand Up @@ -139,7 +141,9 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// Note: we return a tear-down func instead of a stop channel because the later will leak temporary
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
func StartTestServer(t ktesting.TB, instanceOptions *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
tCtx := ktesting.Init(t)

if instanceOptions == nil {
instanceOptions = NewDefaultTestServerOptions()
}
Expand All @@ -149,12 +153,11 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
return result, fmt.Errorf("failed to create temp dir: %v", err)
}

stopCh := make(chan struct{})
var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiserver and cleaning up
// Cancel is stopping apiserver and cleaning up
// after itself, including shutting down its storage layer.
close(stopCh)
tCtx.Cancel("tearing down")

// If the apiserver was started, let's wait for it to
// shutdown clearly.
Expand Down Expand Up @@ -359,15 +362,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}

errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)
prepared, err := server.PrepareRun()
if err != nil {
errCh <- err
} else if err := prepared.Run(stopCh); err != nil {
} else if err := prepared.Run(tCtx); err != nil {
errCh <- err
}
}(stopCh)
}()

client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil {
Expand Down Expand Up @@ -465,7 +468,7 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
}

// StartTestServerOrDie calls StartTestServer t.Fatal if it does not succeed.
func StartTestServerOrDie(t Logger, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
func StartTestServerOrDie(t testing.TB, instanceOptions *TestServerInstanceOptions, flags []string, storageConfig *storagebackend.Config) *TestServer {
result, err := StartTestServer(t, instanceOptions, flags, storageConfig)
if err == nil {
return &result
Expand Down
Expand Up @@ -55,9 +55,6 @@ func NewController(
secondaryRange net.IPNet,
client clientset.Interface,
) *Controller {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})

c := &Controller{
client: client,
interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
Expand All @@ -79,9 +76,6 @@ func NewController(
c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced

c.eventBroadcaster = broadcaster
c.eventRecorder = recorder

return c
}

Expand All @@ -101,9 +95,12 @@ type Controller struct {
}

// Start will not return until the default ServiceCIDR exists or stopCh is closed.
func (c *Controller) Start(stopCh <-chan struct{}) {
func (c *Controller) Start(ctx context.Context) {
defer utilruntime.HandleCrash()
stopCh := ctx.Done()

c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
c.eventBroadcaster.StartStructuredLogging(0)
c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
defer c.eventBroadcaster.Shutdown()
Expand All @@ -116,8 +113,6 @@ func (c *Controller) Start(stopCh <-chan struct{}) {
return
}

// derive a context from the stopCh so we can cancel the poll loop
ctx := wait.ContextForChannel(stopCh)
// wait until first successfully sync
// this blocks apiserver startup so poll with a short interval
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controlplane/instance.go
Expand Up @@ -420,7 +420,7 @@ func (c CompletedConfig) New(delegationTarget genericapiserver.DelegationTarget)
)
// The default serviceCIDR must exist before the apiserver is healthy
// otherwise the allocators for Services will not work.
controller.Start(hookContext.StopCh)
controller.Start(hookContext)
return nil
})
}
Expand Down
4 changes: 2 additions & 2 deletions staging/src/k8s.io/apiextensions-apiserver/main.go
Expand Up @@ -25,8 +25,8 @@ import (
)

func main() {
stopCh := genericapiserver.SetupSignalHandler()
cmd := server.NewServerCommand(os.Stdout, os.Stderr, stopCh)
ctx := genericapiserver.SetupSignalContext()
cmd := server.NewServerCommand(ctx, os.Stdout, os.Stderr)
code := cli.Run(cmd)
os.Exit(code)
}
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package server

import (
"context"
"io"

"github.com/spf13/cobra"
Expand All @@ -25,7 +26,7 @@ import (
genericapiserver "k8s.io/apiserver/pkg/server"
)

func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Command {
func NewServerCommand(ctx context.Context, out, errOut io.Writer) *cobra.Command {
o := options.NewCustomResourceDefinitionsServerOptions(out, errOut)

cmd := &cobra.Command{
Expand All @@ -38,19 +39,20 @@ func NewServerCommand(out, errOut io.Writer, stopCh <-chan struct{}) *cobra.Comm
if err := o.Validate(); err != nil {
return err
}
if err := Run(o, stopCh); err != nil {
if err := Run(c.Context(), o); err != nil {
return err
}
return nil
},
}
cmd.SetContext(ctx)

fs := cmd.Flags()
o.AddFlags(fs)
return cmd
}

func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct{}) error {
func Run(ctx context.Context, o *options.CustomResourceDefinitionsServerOptions) error {
config, err := o.Config()
if err != nil {
return err
Expand All @@ -60,5 +62,5 @@ func Run(o *options.CustomResourceDefinitionsServerOptions, stopCh <-chan struct
if err != nil {
return err
}
return server.GenericAPIServer.PrepareRun().Run(stopCh)
return server.GenericAPIServer.PrepareRun().RunWithContext(ctx)
}
Expand Up @@ -18,6 +18,7 @@ package testing

import (
"context"
"errors"
"fmt"
"net"
"os"
Expand Down Expand Up @@ -83,13 +84,15 @@ func NewDefaultTestServerOptions() *TestServerInstanceOptions {
// files that because Golang testing's call to os.Exit will not give a stop channel go routine
// enough time to remove temporary files.
func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []string, storageConfig *storagebackend.Config) (result TestServer, err error) {
stopCh := make(chan struct{})
// TODO: this is a candidate for using what is now test/utils/ktesting,
// should that become a staging repo.
ctx, cancel := context.WithCancelCause(context.Background())
Copy link
Contributor Author

@pohly pohly Mar 28, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use test/utils/ktesting here because it is not a staging repo (yet). We can use k8s.io/klog/v2/ktesting (done below) but that only provides per-unit-test output.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add TODO?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

var errCh chan error
tearDown := func() {
// Closing stopCh is stopping apiextensions apiserver and its
// Cancel is stopping apiextensions apiserver and its
// delegates, which itself is cleaning up after itself,
// including shutting down its storage layer.
close(stopCh)
cancel(errors.New("tearing down"))

// If the apiextensions apiserver was started, let's wait for
// it to shutdown clearly.
Expand Down Expand Up @@ -166,13 +169,13 @@ func StartTestServer(t Logger, _ *TestServerInstanceOptions, customFlags []strin
}

errCh = make(chan error)
go func(stopCh <-chan struct{}) {
go func() {
defer close(errCh)

if err := server.GenericAPIServer.PrepareRun().Run(stopCh); err != nil {
if err := server.GenericAPIServer.PrepareRun().RunWithContext(ctx); err != nil {
errCh <- err
}
}(stopCh)
}()

t.Logf("Waiting for /healthz to be ok...")

Expand Down
10 changes: 7 additions & 3 deletions staging/src/k8s.io/apiserver/pkg/server/config_test.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package server

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -42,6 +44,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
)

Expand Down Expand Up @@ -79,6 +82,9 @@ func TestAuthorizeClientBearerTokenNoops(t *testing.T) {
}

func TestNewWithDelegate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(errors.New("test is done"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the ctx derived from context.WithCancelCause , it looks like we are not using the cause or reporting it anywhere

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would have to happen at the place where some code current reports the ctx.Err() value: instead, it should use context.Cause(ctx). That will return the error set here instead of the uninformative "context canceled".

We don't take much advantage of this relatively new Go feature yet. I'm adopting it for all new code that I am writing because it is so damn hard to figure out why a context is canceled during debugging.

delegateConfig := NewConfig(codecs)
delegateConfig.ExternalAddress = "192.168.10.4:443"
delegateConfig.PublicAddress = netutils.ParseIPSloppy("192.168.10.4")
Expand Down Expand Up @@ -136,10 +142,8 @@ func TestNewWithDelegate(t *testing.T) {
return nil
})

stopCh := make(chan struct{})
defer close(stopCh)
wrappingServer.PrepareRun()
wrappingServer.RunPostStartHooks(stopCh)
wrappingServer.RunPostStartHooks(ctx)

server := httptest.NewServer(wrappingServer.Handler)
defer server.Close()
Expand Down