Skip to content

Commit

Permalink
eliminate the closed channel race in pod_log_streamer
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Nov 13, 2023
1 parent 0766e37 commit 31cd1c0
Showing 1 changed file with 7 additions and 16 deletions.
23 changes: 7 additions & 16 deletions pkg/monitortestlibrary/podaccess/pod_log_streamer.go
Expand Up @@ -95,23 +95,13 @@ func (s *PodStreamer) streamLogs(ctx context.Context) (bool, error) {
return false, nil
}

exit := make(chan struct{})
go func(ctx context.Context, currPod *corev1.Pod) {
defer utilruntime.HandleCrash()
defer close(exit)
s.streamLogsReader(ctx, currPod)
}(ctx, currPod)

select {
case <-exit:
return false, nil
case <-ctx.Done():
// wait just a moment so that the streamLogsReader doesn't write to a closed channel
time.Sleep(100 * time.Millisecond)
return false, nil
}
s.streamLogsReader(ctx, currPod)
return false, nil
}

// streamLogsReader will run and block until
// 1. server closes the http connection
// 2. context is closed.
func (s *PodStreamer) streamLogsReader(ctx context.Context, currPod *corev1.Pod) {
caughtUpToLastLine := false
if currPod.UID != s.lastUID {
Expand All @@ -131,7 +121,7 @@ func (s *PodStreamer) streamLogsReader(ctx context.Context, currPod *corev1.Pod)
Container: s.containerName,
Follow: true,
Timestamps: true,
}).Stream(context.Background())
}).Stream(ctx)
if err != nil {
s.errs <- LogError{
Pod: currPod,
Expand All @@ -140,6 +130,7 @@ func (s *PodStreamer) streamLogsReader(ctx context.Context, currPod *corev1.Pod)
}
return
}

scan := bufio.NewScanner(reader)
for scan.Scan() {
// exit if we have been stopped.
Expand Down

0 comments on commit 31cd1c0

Please sign in to comment.