New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Support for Evented PLEG #111384
Add Support for Evented PLEG #111384
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ import ( | |
"time" | ||
|
||
"k8s.io/apimachinery/pkg/types" | ||
utilfeature "k8s.io/apiserver/pkg/util/feature" | ||
"k8s.io/kubernetes/pkg/features" | ||
) | ||
|
||
// Cache stores the PodStatus for the pods. It represents *all* the visible | ||
|
@@ -36,7 +38,10 @@ import ( | |
// cache entries. | ||
type Cache interface { | ||
Get(types.UID) (*PodStatus, error) | ||
Set(types.UID, *PodStatus, error, time.Time) | ||
// Set updates the cache by setting the PodStatus for the pod only | ||
// if the data is newer than the cache based on the provided | ||
// time stamp. Returns if the cache was updated. | ||
Set(types.UID, *PodStatus, error, time.Time) (updated bool) | ||
// GetNewerThan is a blocking call that only returns the status | ||
// when it is newer than the given time. | ||
GetNewerThan(types.UID, time.Time) (*PodStatus, error) | ||
|
@@ -93,12 +98,22 @@ func (c *cache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error | |
return d.status, d.err | ||
} | ||
|
||
// Set sets the PodStatus for the pod. | ||
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) { | ||
// Set sets the PodStatus for the pod only if the data is newer than the cache | ||
func (c *cache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) (updated bool) { | ||
c.lock.Lock() | ||
defer c.lock.Unlock() | ||
defer c.notify(id, timestamp) | ||
|
||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
// Set the value in the cache only if it's not present already | ||
// or the timestamp in the cache is older than the current update timestamp | ||
if cachedVal, ok := c.pods[id]; ok && cachedVal.modified.After(timestamp) { | ||
return false | ||
} | ||
} | ||
|
||
c.pods[id] = &data{status: status, err: err, modified: timestamp} | ||
c.notify(id, timestamp) | ||
return true | ||
} | ||
|
||
// Delete removes the entry of the pod. | ||
|
@@ -142,6 +157,29 @@ func (c *cache) get(id types.UID) *data { | |
// Otherwise, it returns nil. The caller should acquire the lock. | ||
func (c *cache) getIfNewerThan(id types.UID, minTime time.Time) *data { | ||
d, ok := c.pods[id] | ||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
// Evented PLEG has CREATED, STARTED, STOPPED and DELETED events | ||
// However if the container creation fails for some reason there is no | ||
// CRI event received by the kubelet and that pod will get stuck a | ||
// GetNewerThan call in the pod workers. This is reproducible with | ||
// the node e2e test, | ||
// https://github.com/kubernetes/kubernetes/blob/83415e5c9e6e59a3d60a148160490560af2178a1/test/e2e_node/pod_hostnamefqdn_test.go#L161 | ||
// which forces failure during pod creation. This issue also exists in | ||
// Generic PLEG but since it updates global timestamp periodically | ||
// the GetNewerThan call gets unstuck. | ||
|
||
// During node e2e tests, it was observed this change does not have any | ||
// adverse impact on the behaviour of the Generic PLEG as well. | ||
switch { | ||
case !ok: | ||
return makeDefaultData(id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so the difference when EventedPLEG is enabled is that: when EventedPLEG is enabled : when pod ID is not found in when EventedPLEG is disabled : when pod ID is not found in Why the difference? I see that evented.go updates the global timestamp every 5s. Is it too infrequent and therefore pod workers might get stuck for 5 seconds? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually the change works fine even when the Evented PLEG is not enabled. I didn't observe any different behaviour in the tests. However, I thought it would be safer to keep the existing behaviour as is, and change the behavior only when Evented PLEG is enabled just out of abundance of caution. We could allow this change to go through some rigorous testing before making it default without gating it with Evented PLEG. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I understand that we want to keep the existing behavior as is. But I wanted to understand why eventedPLEG always return default data when pod ID is not found, as opposed to the behavior of existing pleg (only returns default data when the global timestamp is newer). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While using Evented PLEG there can be a case, where CRI fails to create container and the podID would be unknown in the cache, which is similar to node e2e test: https://bit.ly/3yJur4P, essentially we noticed that without this change the pod workers get stuck and would cause the test to fail. The default data returned from this function Noticeably, changing this behavior also doesn't have any adverse impact while just using Generic PLEG, however as @harche suggested we could allow this change to go through some rigorous testing before putting this change out of the feature gate. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to clarify, do you mean if we change this to the following:
, (in other words, same as generic pleg (https://github.com/kubernetes/kubernetes/pull/111384/files#diff-b010d1c9ff51c8975875be97aeebd3c8c6206128f520115752f9dc007e8afcd1R187), then generic PLEG mitigates this problem by periodically updating the global timestamp, which I understand. But doesn't evented PLEG update global timestamp every 5 seconds too (https://github.com/kubernetes/kubernetes/pull/111384/files#diff-465aae123f6877816ff0428968fc74f503fe15b27b434480bf57b79e62c7fa2fR114)? So This change looks ok to me because in theory the cache entry is always up-to-date with evented pleg, just trying to understand why the issue happens. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that correctly describes the scenario. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had the same question when I was reviewing this, I was also trying to understand the reason for the difference here since it is not immediately clear :). Can we maybe summarize the reasoning for this change as a comment in the code? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We tried to capture this in the comment above the code, but feel free to suggest any amendments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thanks for the comment, i was similar to @bobbypage when catching up on latest. |
||
case ok && (d.modified.After(minTime) || (c.timestamp != nil && c.timestamp.After(minTime))): | ||
return d | ||
default: | ||
return nil | ||
} | ||
} | ||
|
||
globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) | ||
if !ok && globalTimestampIsNewer { | ||
// Status is not cached, but the global timestamp is newer than | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ import ( | |
"context" | ||
"errors" | ||
"fmt" | ||
"io" | ||
"strings" | ||
"time" | ||
|
||
|
@@ -792,5 +793,25 @@ func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options | |
} | ||
|
||
func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error { | ||
return nil | ||
containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what happens when this times out? isnt this subject to max time of open grpc connection? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added code to retry on error and force a relisting to update the cache and (pod and container running) metric, https://github.com/kubernetes/kubernetes/pull/111384/files#diff-465aae123f6877816ff0428968fc74f503fe15b27b434480bf57b79e62c7fa2fR118 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an area where being able to trigger this in a test (ideally above unit) will expose low frequency errors sooner. Does the event stream include heartbeats? There are lots of failure modes for event streams that involve not explicitly getting events even though the connection is still open. If we detect an anomaly during relist (missed event), we may want to force this connection closed and reopen it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe I should have asked this before - but how are you synchronizing this with the "List" action so that you don't lose events between the time of List and when you invoke GetContainerEvents? You need an equivalent to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
This is an interesting idea. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are getting the grpc streaming client at the kubelet start up and then keep receiving the events using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We are simply using grpc streaming client, and depend on their mechanism to maintain and verify a healthy stream. We do get error back from that client should they detect anything wrong. |
||
if err != nil { | ||
klog.ErrorS(err, "GetContainerEvents failed to get streaming client") | ||
return err | ||
} | ||
|
||
for { | ||
resp, err := containerEventsStreamingClient.Recv() | ||
if err == io.EOF { | ||
klog.ErrorS(err, "container events stream is closed") | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make this error more descriptive. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added. |
||
} | ||
if err != nil { | ||
klog.ErrorS(err, "failed to receive streaming container event") | ||
return err | ||
} | ||
if resp != nil { | ||
harche marked this conversation as resolved.
Show resolved
Hide resolved
|
||
containerEventsCh <- resp | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The resp != nil can be removed since resp cannot ever be nil. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the |
||
klog.V(4).InfoS("container event received", "resp", resp) | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -162,7 +162,13 @@ const ( | |
// Note that even though we set the period to 1s, the relisting itself can | ||
// take more than 1s to finish if the container runtime responds slowly | ||
// and/or when there are many container changes in one cycle. | ||
plegRelistPeriod = time.Second * 1 | ||
genericPlegRelistPeriod = time.Second * 1 | ||
genericPlegRelistThreshold = time.Minute * 3 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just noting to myself as i read implementation, but its not clear the value for this field when existing generic pleg is used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the feature gate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this 3m value is the same value that used to be in generic but has moved here with the same value and is passed down, so its not a behavior change, etc. |
||
|
||
// Generic PLEG relist period and threshold when used with Evented PLEG. | ||
eventedPlegRelistPeriod = time.Second * 300 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: naming on this would be clearer if it was the following:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, will rename this variable. |
||
eventedPlegRelistThreshold = time.Minute * 10 | ||
eventedPlegMaxStreamRetries = 5 | ||
|
||
// backOffPeriod is the period to back off when pod syncing results in an | ||
// error. It is also used as the base period for the exponential backoff | ||
|
@@ -699,9 +705,37 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, | |
utilfeature.DefaultFeatureGate.Enabled(features.PodAndContainerStatsFromCRI)) | ||
} | ||
|
||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{}) | ||
eventChannel := make(chan *pleg.PodLifecycleEvent, plegChannelCapacity) | ||
|
||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
// adjust Generic PLEG relisting period and threshold to higher value when Evented PLEG is turned on | ||
genericRelistDuration := &pleg.RelistDuration{ | ||
RelistPeriod: eventedPlegRelistPeriod, | ||
RelistThreshold: eventedPlegRelistThreshold, | ||
} | ||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) | ||
// In case Evented PLEG has to fall back on Generic PLEG due to an error, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. before beta, we should clarify how we respond to these error conditions. notably when is the evented pleg re-established versus just fall back to generic behavior. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree. Thanks. |
||
// Evented PLEG should be able to reset the Generic PLEG relisting duration | ||
// to the default value. | ||
eventedRelistDuration := &pleg.RelistDuration{ | ||
RelistPeriod: genericPlegRelistPeriod, | ||
RelistThreshold: genericPlegRelistThreshold, | ||
} | ||
klet.eventedPleg = pleg.NewEventedPLEG(klet.containerRuntime, klet.runtimeService, eventChannel, | ||
klet.podCache, klet.pleg, eventedPlegMaxStreamRetries, eventedRelistDuration, clock.RealClock{}) | ||
} else { | ||
genericRelistDuration := &pleg.RelistDuration{ | ||
RelistPeriod: genericPlegRelistPeriod, | ||
RelistThreshold: genericPlegRelistThreshold, | ||
} | ||
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, eventChannel, genericRelistDuration, klet.podCache, clock.RealClock{}) | ||
} | ||
|
||
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime) | ||
klet.runtimeState.addHealthCheck("PLEG", klet.pleg.Healthy) | ||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
klet.runtimeState.addHealthCheck("EventedPLEG", klet.eventedPleg.Healthy) | ||
} | ||
if _, err := klet.updatePodCIDR(ctx, kubeCfg.PodCIDR); err != nil { | ||
klog.ErrorS(err, "Pod CIDR update failed") | ||
} | ||
|
@@ -1062,6 +1096,9 @@ type Kubelet struct { | |
// Generates pod events. | ||
pleg pleg.PodLifecycleEventGenerator | ||
|
||
// Evented PLEG | ||
eventedPleg pleg.PodLifecycleEventGenerator | ||
|
||
// Store kubecontainer.PodStatus for all pods. | ||
podCache kubecontainer.Cache | ||
|
||
|
@@ -1485,6 +1522,12 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { | |
|
||
// Start the pod lifecycle event generator. | ||
kl.pleg.Start() | ||
|
||
// Start eventedPLEG only if EventedPLEG feature gate is enabled. | ||
if utilfeature.DefaultFeatureGate.Enabled(features.EventedPLEG) { | ||
kl.eventedPleg.Start() | ||
} | ||
|
||
kl.syncLoop(ctx, updates, kl) | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -490,6 +490,29 @@ func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) | |
return buf.String() | ||
} | ||
|
||
func (m *kubeGenericRuntimeManager) convertToKubeContainerStatus(status *runtimeapi.ContainerStatus) (cStatus *kubecontainer.Status) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just noting to myself this is the same code as elsewhere just extracted for reuse. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's correct. |
||
cStatus = toKubeContainerStatus(status, m.runtimeName) | ||
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED { | ||
// Populate the termination message if needed. | ||
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) | ||
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs. | ||
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && | ||
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun" | ||
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs) | ||
if checkLogs { | ||
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath()) | ||
} | ||
// Enrich the termination message written by the application is not empty | ||
if len(tMessage) != 0 { | ||
if len(cStatus.Message) != 0 { | ||
cStatus.Message += ": " | ||
} | ||
cStatus.Message += tMessage | ||
} | ||
} | ||
return cStatus | ||
} | ||
|
||
// getPodContainerStatuses gets all containers' statuses for the pod. | ||
func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context, uid kubetypes.UID, name, namespace string) ([]*kubecontainer.Status, error) { | ||
// Select all containers of the given pod. | ||
|
@@ -521,25 +544,7 @@ func (m *kubeGenericRuntimeManager) getPodContainerStatuses(ctx context.Context, | |
if status == nil { | ||
return nil, remote.ErrContainerStatusNil | ||
} | ||
cStatus := toKubeContainerStatus(status, m.runtimeName) | ||
if status.State == runtimeapi.ContainerState_CONTAINER_EXITED { | ||
// Populate the termination message if needed. | ||
annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) | ||
// If a container cannot even be started, it certainly does not have logs, so no need to fallbackToLogs. | ||
fallbackToLogs := annotatedInfo.TerminationMessagePolicy == v1.TerminationMessageFallbackToLogsOnError && | ||
cStatus.ExitCode != 0 && cStatus.Reason != "ContainerCannotRun" | ||
tMessage, checkLogs := getTerminationMessage(status, annotatedInfo.TerminationMessagePath, fallbackToLogs) | ||
if checkLogs { | ||
tMessage = m.readLastStringFromContainerLogs(status.GetLogPath()) | ||
} | ||
// Enrich the termination message written by the application is not empty | ||
if len(tMessage) != 0 { | ||
if len(cStatus.Message) != 0 { | ||
cStatus.Message += ": " | ||
} | ||
cStatus.Message += tMessage | ||
} | ||
} | ||
cStatus := m.convertToKubeContainerStatus(status) | ||
statuses = append(statuses, cStatus) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we can return early if no update is needed, to save a couple of lines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks better, thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we going to take this suggestion? I am still seeing the old code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh sorry my bad, I missed that somehow. I will update the code.