From 60a8dd179c71fff330015eb550ff343e1d16a381 Mon Sep 17 00:00:00 2001 From: gabemontero Date: Fri, 1 Mar 2024 09:12:07 -0500 Subject: [PATCH] reintroduce child context with cancel along with CloseAndRecv to force goroutine cleanup by not interrupt UpdateLog call before it finishes rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED --- pkg/watcher/reconciler/dynamic/dynamic.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index 0873c279d..f3d75d682 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -393,13 +393,16 @@ func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container, labelKe } func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, logName string) error { - logger := logging.FromContext(ctx) + // TODO consider making configurable after we get some real world usage feedback + streamCtx, streamCancel := context.WithTimeout(ctx, 5*time.Minute) + defer streamCancel() + logger := logging.FromContext(streamCtx) logger.Debugw("Streaming log started", zap.String("namespace", o.GetNamespace()), zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind), zap.String("name", o.GetName()), ) - logsClient, err := r.resultsClient.UpdateLog(ctx) + logsClient, err := r.resultsClient.UpdateLog(streamCtx) if err != nil { return fmt.Errorf("failed to create UpdateLog client: %w", err) } @@ -412,7 +415,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, LabelSelector: fmt.Sprintf("%s=%s", labelKey, o.GetName()), } var pods *corev1.PodList - pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(ctx, lo) + pods, err = r.kubernetesClientset.CoreV1().Pods(o.GetNamespace()).List(streamCtx, lo) if err != nil { return err } @@ -430,7 +433,7 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, if len(task) == 0 { task = pipelineTaskName } - ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name, labelKey, task) + ba, podLogsErr := r.getPodLogs(streamCtx, o.GetNamespace(), pod.Name, container.Name, labelKey, task) if podLogsErr != nil { return podLogsErr } @@ -468,6 +471,14 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, logger.Error(flushErr) return flushErr } + // so we use CloseAndRecv vs. just CloseSent to achieve a few things: + // 1) CloseAndRecv calls CloseSend under the covers, followed by a Recv call to obtain a LogSummary + // 2) LogSummary appears to have some stats on the state of operations + // 3) It also appears to be the best form of "confirmation" that the asynchronous operation of UpdateLog on the api + // server side has reached a terminal state + // 4) Hence, creating a child context which we cancel hopefully does not interrupt the UpdateLog call when this method exits, + // 5) However, we need the context cancel to close out the last goroutine launched in newClientStreamWithParams that does + // the final clean, otherwise we end up with our now familiar goroutine leak, which in the end is a memory leak if logSummary, closeErr := logsClient.CloseAndRecv(); closeErr != nil && !strings.Contains(closeErr.Error(), "EOF") { logger.Warnw("CloseAndRecv ret err", zap.String("name", o.GetName()),