Skip to content

Commit

Permalink
test: fix a flaky podlogstream test (#4402)
Browse files Browse the repository at this point in the history
I think this is a good example of how having a public status
field will make this a bit easier to test in general
  • Loading branch information
nicks committed Apr 6, 2021
1 parent 92bf9b6 commit c48c42a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 26 deletions.
26 changes: 20 additions & 6 deletions internal/engine/runtimelog/podlogmanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ func TestLogsFailed(t *testing.T) {
assert.Contains(t, f.out.String(), "my-error")

// Check to make sure the status has an error.
stream := &PodLogStream{}
streamNN := types.NamespacedName{Name: fmt.Sprintf("default-%s", podID)}
err := f.client.Get(f.ctx, streamNN, stream)
require.NoError(t, err)
stream := f.getPodLogStream(podID)
assert.Equal(t, stream.Status, PodLogStreamStatus{
ContainerStatuses: []ContainerLogStreamStatus{
ContainerLogStreamStatus{
Expand Down Expand Up @@ -131,8 +128,17 @@ func TestLogsCanceledUnexpectedly(t *testing.T) {
f.onChange(podID)
f.AssertOutputContains("hello world!\n")

// Previous log stream has finished, so the first pod watch has been canceled,
// but not cleaned up; check that we start a new watch .OnChange
// Wait until the previous log stream finishes.
assert.Eventually(f.T(), func() bool {
stream := f.getPodLogStream(podID)
statuses := stream.Status.ContainerStatuses
if len(statuses) != 1 {
return false
}
return !statuses[0].Active
}, time.Second, 5*time.Millisecond)

// Set new logs, as if the pod restarted.
f.kClient.SetLogsForPodContainer(podID, cName, "goodbye world!\n")
f.onChange(podID)
f.AssertOutputContains("goodbye world!\n")
Expand Down Expand Up @@ -496,6 +502,14 @@ func (f *plmFixture) onChange(podID k8s.PodID) {
f.store.clearSummary()
}

func (f *plmFixture) getPodLogStream(id k8s.PodID) *PodLogStream {
stream := &PodLogStream{}
streamNN := types.NamespacedName{Name: fmt.Sprintf("default-%s", id)}
err := f.client.Get(f.ctx, streamNN, stream)
require.NoError(f.T(), err)
return stream
}

func (f *plmFixture) ConsumeLogActionsUntil(expected string) {
start := time.Now()
for time.Since(start) < time.Second {
Expand Down
44 changes: 24 additions & 20 deletions internal/engine/runtimelog/podlogstreamcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,35 @@ func (c *PodLogStreamController) deleteStreams(streamName types.NamespacedName)
}

func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore) {
pID := watch.podID
ctx := watch.ctx
containerName := watch.cName
var exitError error

defer func() {
// When the log streaming ends, log it and report the status change to the
// apiserver.
m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = false
if exitError == nil {
cs.Error = ""
} else {
cs.Error = exitError.Error()
}
})
m.updateStatus(watch.streamName)

if exitError != nil {
// TODO(nick): Should this be Warnf/Errorf?
logger.Get(ctx).Infof("Error streaming %s logs: %v", pID, exitError)
}

watch.terminationTime <- m.now()
watch.cancel()
}()

pID := watch.podID
containerName := watch.cName
ns := watch.namespace
startReadTime := watch.startWatchTime
ctx := watch.ctx
if watch.shouldPrefix {
prefix := fmt.Sprintf("[%s] ", watch.cName)
ctx = logger.WithLogger(ctx, logger.NewPrefixedLogger(prefix, logger.Get(ctx)))
Expand All @@ -285,15 +304,7 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
readCloser, err := m.kClient.ContainerLogs(ctx, pID, containerName, ns, startReadTime)
if err != nil {
cancel()

m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = false
cs.Error = err.Error()
})
m.updateStatus(watch.streamName)

// TODO(nick): Should this be Warnf/Errorf?
logger.Get(ctx).Infof("Error streaming %s logs: %v", pID, err)
exitError = err
return
}

Expand Down Expand Up @@ -347,14 +358,7 @@ func (m *PodLogStreamController) consumeLogs(watch PodLogWatch, st store.RStore)
cancel()

if !retry && err != nil && ctx.Err() == nil {
m.mutateStatus(watch.streamName, containerName, func(cs *ContainerLogStreamStatus) {
cs.Active = false
cs.Error = err.Error()
})
m.updateStatus(watch.streamName)

// TODO(nick): Should this be Warnf/Errorf?
logger.Get(ctx).Infof("Error streaming %s logs: %v", pID, err)
exitError = err
return
}
}
Expand Down

0 comments on commit c48c42a

Please sign in to comment.