Skip to content

Commit

Permalink
fix: simplify logs collector by removing goroutine which had a concur…
Browse files Browse the repository at this point in the history
…rency bug (#1155)

* fix: logs collector timeout prevent concurrent map iteration and map write

* remove goroutine

* log pod logs timeout
  • Loading branch information
emosbaugh committed May 18, 2023
1 parent 6de79af commit af4cc8a
Showing 1 changed file with 51 additions and 69 deletions.
120 changes: 51 additions & 69 deletions pkg/collect/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,90 +52,72 @@ func (c *CollectLogs) Collect(progressChan chan<- interface{}) (CollectorResult,
// client for collectors or leave the implementation as is.
// Ref: https://github.com/replicatedhq/troubleshoot/pull/821#discussion_r1026258904
func (c *CollectLogs) CollectWithClient(progressChan chan<- interface{}, client kubernetes.Interface) (CollectorResult, error) {
out := NewResult()
output := NewResult()

ctx, cancel := context.WithTimeout(c.Context, constants.DEFAULT_LOGS_COLLECTOR_TIMEOUT)
defer cancel()

errCh := make(chan error, 1)
done := make(chan struct{}, 1)

// Collect logs in a go routine to allow timing out of long running operations
// If a timeout occurs, the passed in collector result will contain logs collected
// prior. We want this to be the case so as to have some logs in the support bundle
// even if not from all expected pods.
// TODO: In future all collectors will have a timeout. This will be implemented in the
// framework level (caller of Collect function). Remove this code when we get there.
go func(output CollectorResult) {
if c.SinceTime != nil {
if c.Collector.Limits == nil {
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
}
c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime)
if c.SinceTime != nil {
if c.Collector.Limits == nil {
c.Collector.Limits = new(troubleshootv1beta2.LogLimits)
}
c.Collector.Limits.SinceTime = metav1.NewTime(*c.SinceTime)
}

pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector)
if len(podsErrors) > 0 {
output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors))
}
pods, podsErrors := listPodsInSelectors(ctx, client, c.Collector.Namespace, c.Collector.Selector)
if len(podsErrors) > 0 {
output.SaveResult(c.BundlePath, getLogsErrorsFileName(c.Collector), marshalErrors(podsErrors))
}

for _, pod := range pods {
if len(c.Collector.ContainerNames) == 0 {
// make a list of all the containers in the pod, so that we can get logs from all of them
containerNames := []string{}
for _, container := range pod.Spec.Containers {
containerNames = append(containerNames, container.Name)
}
for _, container := range pod.Spec.InitContainers {
containerNames = append(containerNames, container.Name)
}

if len(pods) > 0 {
for _, pod := range pods {
if len(c.Collector.ContainerNames) == 0 {
// make a list of all the containers in the pod, so that we can get logs from all of them
containerNames := []string{}
for _, container := range pod.Spec.Containers {
containerNames = append(containerNames, container.Name)
for _, containerName := range containerNames {
podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err)
}
for _, container := range pod.Spec.InitContainers {
containerNames = append(containerNames, container.Name)
key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name)
if containerName != "" {
key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
}

for _, containerName := range containerNames {
podLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
if err != nil {
key := fmt.Sprintf("%s/%s-errors.json", c.Collector.Name, pod.Name)
if containerName != "" {
key = fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
}
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
if err != nil {
errCh <- err
}
continue
}
output.AddResult(podLogs)
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
if err != nil {
klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err)
}
continue
}
output.AddResult(podLogs)
}
} else {
for _, containerName := range c.Collector.ContainerNames {
containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, containerName, c.Collector.Limits, false, true)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
klog.Errorf("Pod logs timed out for pod %s and container %s: %v", pod.Name, containerName, err)
}
} else {
for _, container := range c.Collector.ContainerNames {
containerLogs, err := savePodLogs(ctx, c.BundlePath, client, &pod, c.Collector.Name, container, c.Collector.Limits, false, true)
if err != nil {
key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, container)
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
if err != nil {
errCh <- err
}
continue
}
output.AddResult(containerLogs)
key := fmt.Sprintf("%s/%s/%s-errors.json", c.Collector.Name, pod.Name, containerName)
err := output.SaveResult(c.BundlePath, key, marshalErrors([]string{err.Error()}))
if err != nil {
klog.Errorf("Failed to save pod logs result for pod %s and container %s: %v", pod.Name, containerName, err)
}
continue
}
output.AddResult(containerLogs)
}
}

// Send a signal to indicate that we are done collecting logs
done <- struct{}{}
}(out)

select {
case <-ctx.Done():
// When we timeout, return the logs we have collected so far
return out, fmt.Errorf("%s (%s) collector timeout exceeded", c.Title(), c.Collector.CollectorName)
case <-done:
return out, nil
case err := <-errCh:
return nil, err
}

return output, nil
}

func listPodsInSelectors(ctx context.Context, client kubernetes.Interface, namespace string, selector []string) ([]corev1.Pod, []string) {
Expand Down

0 comments on commit af4cc8a

Please sign in to comment.