Skip to content

Commit

Permalink
UPSTREAM: <carry>: create termination events
Browse files Browse the repository at this point in the history
Origin-commit: a869af0c97e3d97bddedcd76af8a62da6c879c02

UPSTREAM: <carry>: apiserver: log new connections during termination

Origin-commit: 89d1c3ceeb91755aae9099cd5f76c42a22de18c5

UPSTREAM: <carry>: apiserver: create LateConnections events on events in the last 20% of graceful termination time

Origin-commit: 91bc33b6ddf9e1d80906717db5bd9096183e8795

UPSTREAM: <carry>: apiserver: log source in LateConnections event

Origin-commit: 575e54740eb7c2ba635c73f24c22ad77cb5a6e70

UPSTREAM: <carry>: apiserver: skip local IPs and probes for LateConnections

Origin-commit: 2109b95866e81b84a290f34f0806becc2cbd83e9

UPSTREAM: <carry>: only create valid LateConnections/GracefulTermination events

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready

UPSTREAM: <carry>: apiserver: create hasBeenReadyCh channel

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready

UPSTREAM: <carry>: kube-apiserver: log non-probe requests before ready

UPSTREAM: <carry>: fix termination event(s) validation failures

UPSTREAM: <carry>: during the rebase collapse to create termination event

it makes recording termination events a non-blocking operation.
previously closing delayedStopCh might have been delayed on preserving data in the storage.
the delayedStopCh is important as it signals the HTTP server to start the shutdown procedure.

it also sets a hard timeout of 3 seconds for the storage layer since we are bypassing the API layer.

openshift-rebase(v1.24):source=7b9aa03e491

UPSTREAM: <carry>: rename termination events to use lifecycleSignals

openshift-rebase(v1.24):source=e90b78a9199
  • Loading branch information
sttts authored and soltysh committed Aug 19, 2022
1 parent 72f4012 commit 21435ee
Show file tree
Hide file tree
Showing 4 changed files with 350 additions and 2 deletions.
49 changes: 49 additions & 0 deletions cmd/kube-apiserver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package app

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand All @@ -37,7 +38,9 @@ import (

oteltrace "go.opentelemetry.io/otel/trace"

corev1 "k8s.io/api/core/v1"
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
Expand All @@ -46,6 +49,7 @@ import (
"k8s.io/apiserver/pkg/authorization/authorizer"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
"k8s.io/apiserver/pkg/endpoints/request"
genericfeatures "k8s.io/apiserver/pkg/features"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
Expand Down Expand Up @@ -76,6 +80,8 @@ import (

"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core"
v1 "k8s.io/kubernetes/pkg/apis/core/v1"
"k8s.io/kubernetes/pkg/capabilities"
"k8s.io/kubernetes/pkg/controlplane"
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
Expand All @@ -84,6 +90,7 @@ import (
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
kubeauthenticator "k8s.io/kubernetes/pkg/kubeapiserver/authenticator"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
eventstorage "k8s.io/kubernetes/pkg/registry/core/event/storage"
rbacrest "k8s.io/kubernetes/pkg/registry/rbac/rest"
"k8s.io/kubernetes/pkg/serviceaccount"
)
Expand Down Expand Up @@ -297,6 +304,13 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
s.Metrics.Apply()
serviceaccount.RegisterMetrics()

var eventStorage *eventstorage.REST
eventStorage, err = eventstorage.NewREST(genericConfig.RESTOptionsGetter, uint64(s.EventTTL.Seconds()))
if err != nil {
return nil, nil, nil, err
}
genericConfig.EventSink = eventRegistrySink{eventStorage}

config := &controlplane.Config{
GenericConfig: genericConfig,
ExtraConfig: controlplane.ExtraConfig{
Expand Down Expand Up @@ -744,3 +758,38 @@ func getServiceIPAndRanges(serviceClusterIPRanges string) (net.IP, net.IPNet, ne
}
return apiServerServiceIP, primaryServiceIPRange, secondaryServiceIPRange, nil
}

// eventRegistrySink wraps an event registry in order to be used as direct event sync, without going through the API.
type eventRegistrySink struct {
*eventstorage.REST
}

var _ genericapiserver.EventSink = eventRegistrySink{}

func (s eventRegistrySink) Create(v1event *corev1.Event) (*corev1.Event, error) {
ctx := request.WithNamespace(request.WithRequestInfo(request.NewContext(), &request.RequestInfo{APIVersion: "v1"}), v1event.Namespace)
// since we are bypassing the API set a hard timeout for the storage layer
ctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

var event core.Event
if err := v1.Convert_v1_Event_To_core_Event(v1event, &event, nil); err != nil {
return nil, err
}

obj, err := s.REST.Create(ctx, &event, nil, &metav1.CreateOptions{})
if err != nil {
return nil, err
}
ret, ok := obj.(*core.Event)
if !ok {
return nil, fmt.Errorf("expected corev1.Event, got %T", obj)
}

var v1ret corev1.Event
if err := v1.Convert_core_Event_To_v1_Event(ret, &v1ret, nil); err != nil {
return nil, err
}

return &v1ret, nil
}
83 changes: 83 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package server

import (
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
goruntime "runtime"
"runtime/debug"
"sort"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/google/uuid"
oteltrace "go.opentelemetry.io/otel/trace"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -65,6 +68,8 @@ import (
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/component-base/logs"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -239,6 +244,9 @@ type Config struct {
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool

// EventSink receives events about the life cycle of the API server, e.g. readiness, serving, signals and termination.
EventSink EventSink

//===========================================================================
// values below here are targets for removal
//===========================================================================
Expand All @@ -262,6 +270,11 @@ type Config struct {
CompressionDisabledFunc genericapifilters.CompressionDisabledFunc
}

// EventSink allows to create events.
type EventSink interface {
Create(event *corev1.Event) (*corev1.Event, error)
}

type RecommendedConfig struct {
Config

Expand Down Expand Up @@ -565,6 +578,10 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
c.DiscoveryAddresses = discovery.DefaultAddresses{DefaultAddress: c.ExternalAddress}
}

if c.EventSink == nil {
c.EventSink = nullEventSink{}
}

AuthorizeClientBearerToken(c.LoopbackClientConfig, &c.Authentication, &c.Authorization)

if c.RequestInfoResolver == nil {
Expand Down Expand Up @@ -592,9 +609,58 @@ func (c *Config) Complete(informers informers.SharedInformerFactory) CompletedCo
// Complete fills in any fields not set that are required to have valid data and can be derived
// from other fields. If you're going to `ApplyOptions`, do that first. It's mutating the receiver.
func (c *RecommendedConfig) Complete() CompletedConfig {
if c.ClientConfig != nil {
ref, err := eventReference()
if err != nil {
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
c.EventSink = nullEventSink{}
} else {
ns := ref.Namespace
if len(ns) == 0 {
ns = "default"
}
c.EventSink = &v1.EventSinkImpl{
Interface: kubernetes.NewForConfigOrDie(c.ClientConfig).CoreV1().Events(ns),
}
}
}

return c.Config.Complete(c.SharedInformerFactory)
}

func eventReference() (*corev1.ObjectReference, error) {
ns := os.Getenv("POD_NAMESPACE")
pod := os.Getenv("POD_NAME")
if len(ns) == 0 && len(pod) > 0 {
serviceAccountNamespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if _, err := os.Stat(serviceAccountNamespaceFile); err == nil {
bs, err := ioutil.ReadFile(serviceAccountNamespaceFile)
if err != nil {
return nil, err
}
ns = string(bs)
}
}
if len(ns) == 0 {
pod = ""
ns = "kube-system"
}
if len(pod) == 0 {
return &corev1.ObjectReference{
Kind: "Namespace",
Name: ns,
APIVersion: "v1",
}, nil
}

return &corev1.ObjectReference{
Kind: "Pod",
Namespace: ns,
Name: pod,
APIVersion: "v1",
}, nil
}

// New creates a new server which logically combines the handling chain with the passed server.
// name is used to differentiate for logging. The handler chain in particular can be difficult as it starts delegating.
// delegationTarget may not be nil.
Expand Down Expand Up @@ -663,7 +729,16 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
Version: c.Version,

muxAndDiscoveryCompleteSignals: map[string]<-chan struct{}{},

eventSink: c.EventSink,
}

ref, err := eventReference()
if err != nil {
klog.Warningf("Failed to derive event reference, won't create events: %v", err)
c.EventSink = nullEventSink{}
}
s.eventRef = ref

for {
if c.JSONPatchMaxCopyBytes <= 0 {
Expand Down Expand Up @@ -849,6 +924,8 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = WithNonReadyRequestLogging(handler, c.lifecycleSignals.HasBeenReady)
handler = WithLateConnectionFilter(handler)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
Expand Down Expand Up @@ -969,3 +1046,9 @@ func AuthorizeClientBearerToken(loopback *restclient.Config, authn *Authenticati
authz.Authorizer = authorizerunion.New(tokenAuthorizer, authz.Authorizer)
}
}

type nullEventSink struct{}

func (nullEventSink) Create(event *corev1.Event) (*corev1.Event, error) {
return nil, nil
}
68 changes: 66 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ package server
import (
"fmt"
"net/http"
"os"
gpath "path"
"strings"
"sync"
"time"

systemd "github.com/coreos/go-systemd/v22/daemon"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -247,6 +249,10 @@ type GenericAPIServer struct {
// If enabled, after ShutdownDelayDuration elapses, any incoming request is
// rejected with a 429 status code and a 'Retry-After' response.
ShutdownSendRetryAfter bool

// EventSink creates events.
eventSink EventSink
eventRef *corev1.ObjectReference
}

// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
Expand Down Expand Up @@ -482,7 +488,10 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {

go func() {
defer delayedStopCh.Signal()
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
defer func() {
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", delayedStopCh.Name())
s.Eventf(corev1.EventTypeNormal, delayedStopCh.Name(), "The minimal shutdown duration of %v finished", s.ShutdownDelayDuration)
}()

<-stopCh

Expand All @@ -491,10 +500,28 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
// and stop sending traffic to this server.
shutdownInitiatedCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", shutdownInitiatedCh.Name())
s.Eventf(corev1.EventTypeNormal, shutdownInitiatedCh.Name(), "Received signal to terminate, becoming unready, but keeping serving")

time.Sleep(s.ShutdownDelayDuration)
}()

lateStopCh := make(chan struct{})
if s.ShutdownDelayDuration > 0 {
go func() {
defer close(lateStopCh)

<-stopCh

time.Sleep(s.ShutdownDelayDuration * 8 / 10)
}()
}

s.SecureServingInfo.Listener = &terminationLoggingListener{
Listener: s.SecureServingInfo.Listener,
lateStopCh: lateStopCh,
}
unexpectedRequestsEventf.Store(s.Eventf)

// close socket after delayed stopCh
shutdownTimeout := s.ShutdownTimeout
if s.ShutdownSendRetryAfter {
Expand Down Expand Up @@ -541,13 +568,17 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
<-listenerStoppedCh
httpServerStoppedListeningCh.Signal()
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", httpServerStoppedListeningCh.Name())
s.Eventf(corev1.EventTypeNormal, httpServerStoppedListeningCh.Name(), "HTTP Server has stopped listening")
}()

// we don't accept new request as soon as both ShutdownDelayDuration has
// elapsed and preshutdown hooks have completed.
preShutdownHooksHasStoppedCh := s.lifecycleSignals.PreShutdownHooksStopped
go func() {
defer klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
defer func() {
klog.V(1).InfoS("[graceful-termination] shutdown event", "name", notAcceptingNewRequestCh.Name())
s.Eventf(corev1.EventTypeNormal, drainedCh.Name(), "All non long-running request(s) in-flight have drained")
}()
defer notAcceptingNewRequestCh.Signal()

// wait for the delayed stopCh before closing the handler chain
Expand Down Expand Up @@ -591,6 +622,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
defer func() {
preShutdownHooksHasStoppedCh.Signal()
klog.V(1).InfoS("[graceful-termination] pre-shutdown hooks completed", "name", preShutdownHooksHasStoppedCh.Name())
s.Eventf(corev1.EventTypeNormal, "TerminationPreShutdownHooksFinished", "All pre-shutdown hooks have been finished")
}()
err = s.RunPreShutdownHooks()
}()
Expand All @@ -611,6 +643,8 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
<-stoppedCh

klog.V(1).Info("[graceful-termination] apiserver is exiting")
s.Eventf(corev1.EventTypeNormal, "TerminationGracefulTerminationFinished", "All pending requests processed")

return nil
}

Expand Down Expand Up @@ -883,3 +917,33 @@ func getResourceNamesForGroup(apiPrefix string, apiGroupInfo *APIGroupInfo, path

return resourceNames, nil
}

// Eventf creates an event with the API server as source, either in default namespace against default namespace, or
// if POD_NAME/NAMESPACE are set against that pod.
func (s *GenericAPIServer) Eventf(eventType, reason, messageFmt string, args ...interface{}) {
t := metav1.Time{Time: time.Now()}
host, _ := os.Hostname() // expicitly ignore error. Empty host is fine

ref := *s.eventRef
if len(ref.Namespace) == 0 {
ref.Namespace = "default" // TODO: event broadcaster sets event ns to default. We have to match. Odd.
}

e := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", ref.Name, t.UnixNano()),
Namespace: ref.Namespace,
},
InvolvedObject: ref,
Reason: reason,
Message: fmt.Sprintf(messageFmt, args...),
Type: eventType,
Source: corev1.EventSource{Component: "apiserver", Host: host},
}

klog.V(2).Infof("Event(%#v): type: '%v' reason: '%v' %v", e.InvolvedObject, e.Type, e.Reason, e.Message)

if _, err := s.eventSink.Create(e); err != nil {
klog.Warningf("failed to create event %s/%s: %v", e.Namespace, e.Name, err)
}
}

0 comments on commit 21435ee

Please sign in to comment.