Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When pleg channel is full, discard events and record its count #72709

Merged
merged 3 commits into from
Feb 13, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/kubelet/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
CgroupManagerOperationsKey = "cgroup_manager_latency_microseconds"
PodWorkerStartLatencyKey = "pod_worker_start_latency_microseconds"
PLEGRelistLatencyKey = "pleg_relist_latency_microseconds"
PLEGDiscardEventsKey = "pleg_discard_events"
PLEGRelistIntervalKey = "pleg_relist_interval_microseconds"
EvictionStatsAgeKey = "eviction_stats_age_microseconds"
VolumeStatsCapacityBytesKey = "volume_stats_capacity_bytes"
Expand Down Expand Up @@ -109,6 +110,14 @@ var (
Help: "Latency in microseconds for relisting pods in PLEG.",
},
)
PLEGDiscardEvents = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: KubeletSubsystem,
Name: PLEGDiscardEventsKey,
Help: "The number of discard events in PLEG.",
},
[]string{},
)
PLEGRelistInterval = prometheus.NewSummary(
prometheus.SummaryOpts{
Subsystem: KubeletSubsystem,
Expand Down Expand Up @@ -214,6 +223,7 @@ func Register(containerCache kubecontainer.RuntimeCache, collectors ...prometheu
prometheus.MustRegister(ContainersPerPodCount)
prometheus.MustRegister(newPodAndContainerCollector(containerCache))
prometheus.MustRegister(PLEGRelistLatency)
prometheus.MustRegister(PLEGDiscardEvents)
prometheus.MustRegister(PLEGRelistInterval)
prometheus.MustRegister(RuntimeOperations)
prometheus.MustRegister(RuntimeOperationsLatency)
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/pleg/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ func (g *GenericPLEG) relist() {
if events[i].Type == ContainerChanged {
continue
}
g.eventChannel <- events[i]
select {
case g.eventChannel <- events[i]:
default:
metrics.PLEGDiscardEvents.WithLabelValues().Inc()
klog.Error("event channel is full, discard this relist() cycle event")
}
}
}

Expand Down
79 changes: 72 additions & 7 deletions pkg/kubelet/pleg/generic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (

const (
testContainerRuntimeType = "fooRuntime"
// largeChannelCap is a large enough capacity to hold all events in a single test.
largeChannelCap = 100
)

type TestGenericPLEG struct {
Expand All @@ -42,15 +44,19 @@ type TestGenericPLEG struct {
clock *clock.FakeClock
}

func newTestGenericPLEG() *TestGenericPLEG {
changyaowei marked this conversation as resolved.
Show resolved Hide resolved
func newTestGenericPLEGWithLargeChannel() *TestGenericPLEG {
return newTestGenericPLEG(largeChannelCap)
}

func newTestGenericPLEG(eventChannelCap int) *TestGenericPLEG {
fakeRuntime := &containertest.FakeRuntime{}
clock := clock.NewFakeClock(time.Time{})
// The channel capacity should be large enough to hold all events in a
// single test.
pleg := &GenericPLEG{
relistPeriod: time.Hour,
runtime: fakeRuntime,
eventChannel: make(chan *PodLifecycleEvent, 100),
eventChannel: make(chan *PodLifecycleEvent, eventChannelCap),
podRecords: make(podRecords),
clock: clock,
}
Expand Down Expand Up @@ -93,7 +99,7 @@ func verifyEvents(t *testing.T, expected, actual []*PodLifecycleEvent) {
}

func TestRelisting(t *testing.T) {
testPleg := newTestGenericPLEG()
testPleg := newTestGenericPLEGWithLargeChannel()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
Expand Down Expand Up @@ -157,6 +163,65 @@ func TestRelisting(t *testing.T) {
verifyEvents(t, expected, actual)
}

// TestEventChannelFull test when channel is full, the events will be discard.
func TestEventChannelFull(t *testing.T) {

This comment was marked as resolved.

testPleg := newTestGenericPLEG(4)
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
createTestContainer("c2", kubecontainer.ContainerStateRunning),
createTestContainer("c3", kubecontainer.ContainerStateUnknown),
},
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c1", kubecontainer.ContainerStateExited),
},
}},
}
pleg.relist()
// Report every running/exited container if we see them for the first time.
expected := []*PodLifecycleEvent{
{ID: "1234", Type: ContainerStarted, Data: "c2"},
{ID: "4567", Type: ContainerDied, Data: "c1"},
{ID: "1234", Type: ContainerDied, Data: "c1"},
}
actual := getEventsFromChannel(ch)
verifyEvents(t, expected, actual)

runtime.AllPodList = []*containertest.FakePod{
{Pod: &kubecontainer.Pod{
ID: "1234",
Containers: []*kubecontainer.Container{
createTestContainer("c2", kubecontainer.ContainerStateExited),
createTestContainer("c3", kubecontainer.ContainerStateRunning),
},
}},
{Pod: &kubecontainer.Pod{
ID: "4567",
Containers: []*kubecontainer.Container{
createTestContainer("c4", kubecontainer.ContainerStateRunning),
},
}},
}
pleg.relist()
// event channel is full, discard events
expected = []*PodLifecycleEvent{
{ID: "1234", Type: ContainerRemoved, Data: "c1"},
{ID: "1234", Type: ContainerDied, Data: "c2"},
{ID: "1234", Type: ContainerStarted, Data: "c3"},
{ID: "4567", Type: ContainerRemoved, Data: "c1"},
}
actual = getEventsFromChannel(ch)
verifyEvents(t, expected, actual)
}

func TestDetectingContainerDeaths(t *testing.T) {
// Vary the number of relists after the container started and before the
// container died to account for the changes in pleg's internal states.
Expand All @@ -168,7 +233,7 @@ func TestDetectingContainerDeaths(t *testing.T) {
}

func testReportMissingContainers(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
testPleg := newTestGenericPLEGWithLargeChannel()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*containertest.FakePod{
Expand Down Expand Up @@ -209,7 +274,7 @@ func testReportMissingContainers(t *testing.T, numRelists int) {
}

func testReportMissingPods(t *testing.T, numRelists int) {
testPleg := newTestGenericPLEG()
testPleg := newTestGenericPLEGWithLargeChannel()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
runtime.AllPodList = []*containertest.FakePod{
Expand Down Expand Up @@ -345,7 +410,7 @@ func TestRemoveCacheEntry(t *testing.T) {
}

func TestHealthy(t *testing.T) {
testPleg := newTestGenericPLEG()
testPleg := newTestGenericPLEGWithLargeChannel()

// pleg should initially be unhealthy
pleg, _, clock := testPleg.pleg, testPleg.runtime, testPleg.clock
Expand Down Expand Up @@ -441,7 +506,7 @@ func TestRelistWithReinspection(t *testing.T) {

// Test detecting sandbox state changes.
func TestRelistingWithSandboxes(t *testing.T) {
testPleg := newTestGenericPLEG()
testPleg := newTestGenericPLEGWithLargeChannel()
pleg, runtime := testPleg.pleg, testPleg.runtime
ch := pleg.Watch()
// The first relist should send a PodSync event to each pod.
Expand Down