Skip to content

Commit

Permalink
add back tkn cli style per log line prefixing
Browse files Browse the repository at this point in the history
rh-pre-commit.version: 2.2.0
rh-pre-commit.check-secrets: ENABLED
  • Loading branch information
gabemontero committed Feb 26, 2024
1 parent 038781b commit 769afbd
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 7 deletions.
59 changes: 53 additions & 6 deletions pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package dynamic

import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -346,7 +348,8 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error {
return nil
}

func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container string) ([]byte, error) {
func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container, labelKey, task string) ([]byte, error) {
logger := logging.FromContext(ctx)
podLogOpts := corev1.PodLogOptions{
Container: container,
}
Expand All @@ -365,9 +368,43 @@ func (r *Reconciler) getPodLogs(ctx context.Context, ns, pod, container string)
if err != nil {
return nil, err
}
logger.Infow("GGM starting new logging",
zap.String("ns", ns),
zap.String("pod", pod),
zap.String("container", container),
zap.String("labelKey", labelKey),
zap.String("task", task),
)
rdr := bufio.NewReader(podLogs)
buf := new(bytes.Buffer)
_, err = io.Copy(buf, podLogs)
return buf.Bytes(), err
for {
var line []byte
line, _, err = rdr.ReadLine()
logger.Infow("GGM readline return line",
zap.String("line", string(line)))
if err != nil && err == io.EOF {
logger.Infow("GGM readline eof")
break
}
if err != nil {
logger.Infow("GGM readline unexpect err",
zap.String("err", err.Error()))
return nil, err
}
s := ""
stepName := strings.TrimPrefix(container, "step-")
s = fmt.Sprintf("[%s : %s] %s", task, stepName, string(line))
if labelKey == pipeline.TaskRunLabelKey {
s = fmt.Sprintf("[%s] %s", stepName, string(line))
}
logger.Infow("GGM readLine formatted line",
zap.String("line", s))
_, err = buf.Write([]byte(s))
if err != nil {
return nil, err
}
}
return buf.Bytes(), nil
}

func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey, logName string) error {
Expand Down Expand Up @@ -401,12 +438,22 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, labelKey,
copy(containers, pod.Spec.InitContainers)
containers = append(containers, pod.Spec.Containers...)
for _, container := range containers {
ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name)
pipelineName := pod.Labels[pipeline.PipelineLabelKey]
pipelineTaskName := pod.Labels[pipeline.PipelineTaskLabelKey]
taskName := pod.Labels[pipeline.TaskLabelKey]
logger.Infow("GGMGGM labels",
zap.String("pipeline", pipelineName),
zap.String("pipelineTask", pipelineTaskName),
zap.String("task", taskName))

task := taskName
if len(task) == 0 {
task = pipelineTaskName
}
ba, podLogsErr := r.getPodLogs(ctx, o.GetNamespace(), pod.Name, container.Name, labelKey, task)
if podLogsErr != nil {
return podLogsErr
}
hdr := fmt.Sprintf("*** Logs for pod %s container %s ***\n", pod.Name, container.Name)
inMemWriteBufferStdout.Write([]byte(hdr))
inMemWriteBufferStdout.Write(ba)
}
}
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/e2e_gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestGCSLog(t *testing.T) {
t.Logf("Error getting Log for %s: %v", logName, cerr)
return false, nil
}
want := "hello world!"
want := "[hello : hello] hello world!"
if log == nil {
t.Logf("Nil return from logClient.Recv()")
return false, nil
Expand Down

0 comments on commit 769afbd

Please sign in to comment.