From 97634f48cd6403c2e1238269c1fc6f9aa04c4024 Mon Sep 17 00:00:00 2001 From: gabemontero Date: Thu, 1 Feb 2024 16:07:06 -0500 Subject: [PATCH] switch from tkn client to k8s pod log retrieval; fix panic, add debug, to e2e_gcs_test rh-pre-commit.version: 2.2.0 rh-pre-commit.check-secrets: ENABLED --- pkg/watcher/reconciler/dynamic/dynamic.go | 134 ++++++++++++++++------ test/e2e/e2e_gcs_test.go | 93 +++++++++++++-- 2 files changed, 178 insertions(+), 49 deletions(-) diff --git a/pkg/watcher/reconciler/dynamic/dynamic.go b/pkg/watcher/reconciler/dynamic/dynamic.go index 1eddb2683..d5471c9a3 100644 --- a/pkg/watcher/reconciler/dynamic/dynamic.go +++ b/pkg/watcher/reconciler/dynamic/dynamic.go @@ -15,15 +15,20 @@ package dynamic import ( + "bytes" "context" "fmt" + "io" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "github.com/fatih/color" "github.com/jonboulle/clockwork" "github.com/tektoncd/cli/pkg/cli" tknlog "github.com/tektoncd/cli/pkg/log" - tknopts "github.com/tektoncd/cli/pkg/options" + "github.com/tektoncd/pipeline/pkg/apis/pipeline" pipelinev1beta1 "github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1" "github.com/tektoncd/results/pkg/api/server/v1alpha2/log" "github.com/tektoncd/results/pkg/api/server/v1alpha2/record" @@ -353,10 +358,29 @@ func (r *Reconciler) sendLog(ctx context.Context, o results.Object) error { return nil } +func getPodLogs(ctx context.Context, client kubernetes.Interface, ns, pod, container string) ([]byte, error) { + podLogOpts := corev1.PodLogOptions{ + Container: container, + } + req := client.CoreV1().Pods(ns).GetLogs(pod, &podLogOpts) + podLogs, err := req.Stream(ctx) + if err != nil { + return nil, err + } + defer podLogs.Close() + + if err != nil { + msg := fmt.Sprintf("error getting logs for pod %s container %s: %s", pod, container, err.Error()) + msgBytes := []byte(msg) + return msgBytes, nil + } + buf := new(bytes.Buffer) + _, err = io.Copy(buf, podLogs) + return buf.Bytes(), err +} + func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, logName string) error { logger := logging.FromContext(ctx) - ctx, cancel := context.WithCancel(ctx) - defer cancel() logsClient, err := r.resultsClient.UpdateLog(ctx) if err != nil { return fmt.Errorf("failed to create UpdateLog client: %w", err) @@ -364,51 +388,85 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType, writer := logs.NewBufferedWriter(logsClient, logName, logs.DefaultBufferSize) + inMemWriteBufferStdout := bytes.NewBuffer(make([]byte, 0)) + tknParams := &cli.TektonParams{} tknParams.SetNamespace(o.GetNamespace()) - // KLUGE: tkn reader.Read() will raise an error if a step in the TaskRun failed and there is no - // Err writer in the Stream object. This will result in some "error" messages being written to - // the log. - - reader, err := tknlog.NewReader(logType, &tknopts.LogOptions{ - AllSteps: true, - Params: tknParams, - PipelineRunName: o.GetName(), - TaskrunName: o.GetName(), - Stream: &cli.Stream{ - Out: writer, - Err: writer, - }, - }) + k8sClient, err := tknParams.KubeClient() if err != nil { - return fmt.Errorf("failed to create tkn reader: %w", err) + return err + } + + labelType := pipeline.PipelineRunLabelKey + if logType == tknlog.LogTypeTask { + labelType = pipeline.TaskRunLabelKey + } + lo := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s", labelType, o.GetName()), } - logChan, errChan, err := reader.Read() + var pods *corev1.PodList + pods, err = k8sClient.CoreV1().Pods(o.GetNamespace()).List(ctx, lo) if err != nil { - return fmt.Errorf("error reading from tkn reader: %w", err) + return err } - errChanRepeater := make(chan error) - go func(echan <-chan error, o metav1.Object) { - writeErr := <-echan - errChanRepeater <- writeErr - - _, err := writer.Flush() - if err != nil { - logger.Error(err) - } - if err = logsClient.CloseSend(); err != nil { - logger.Error(err) + for _, pod := range pods.Items { + containers := []corev1.Container{} + copy(containers, pod.Spec.InitContainers) + containers = append(containers, pod.Spec.Containers...) + for _, container := range containers { + ba, podLogsErr := getPodLogs(ctx, k8sClient, o.GetNamespace(), pod.Name, container.Name) + if podLogsErr != nil { + return podLogsErr + } + hdr := fmt.Sprintf("*** Logs for pod %s container %s ***\n", pod.Name, container.Name) + inMemWriteBufferStdout.Write([]byte(hdr)) + inMemWriteBufferStdout.Write(ba) } - }(errChan, o) + } - // errChanRepeater receives stderr from the TaskRun containers. - // This will be forwarded as combined output (stdout and stderr) + bufStdout := inMemWriteBufferStdout.Bytes() + cntStdout, writeStdOutErr := writer.Write(bufStdout) + if writeStdOutErr != nil { + logger.Warnw("streamLogs in mem bufStdout write err", + zap.String("error", writeStdOutErr.Error()), + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + return writeStdOutErr + } + if cntStdout != len(bufStdout) { + logger.Warnw("streamLogs bufStdout write len inconsistent", + zap.Int("in", len(bufStdout)), + zap.Int("out", cntStdout), + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) + + } + + flushCount, flushErr := writer.Flush() + logger.Warnw("flush ret count", + zap.String("name", o.GetName()), + zap.Int("flushCount", flushCount)) + if flushErr != nil { + logger.Warnw("flush ret err", + zap.String("error", flushErr.Error())) + logger.Error(flushErr) + return flushErr + } + if closeErr := logsClient.CloseSend(); closeErr != nil { + logger.Warnw("CloseSend ret err", + zap.String("name", o.GetName()), + zap.String("error", closeErr.Error())) + logger.Error(closeErr) + return closeErr + } - tknlog.NewWriter(logType, true).Write(&cli.Stream{ - Out: writer, - Err: writer, - }, logChan, errChanRepeater) + logger.Debugw("Exiting streamLogs", + zap.String("namespace", o.GetNamespace()), + zap.String("name", o.GetName()), + ) return nil } diff --git a/test/e2e/e2e_gcs_test.go b/test/e2e/e2e_gcs_test.go index 4e4670e4b..1614ff780 100644 --- a/test/e2e/e2e_gcs_test.go +++ b/test/e2e/e2e_gcs_test.go @@ -18,10 +18,15 @@ package e2e import ( + "bytes" "context" + "io" + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" "testing" resultsv1alpha2 "github.com/tektoncd/results/proto/v1alpha2/results_go_proto" + "google.golang.org/genproto/googleapis/api/httpbody" "strings" "time" @@ -75,6 +80,10 @@ func TestGCSLog(t *testing.T) { } return false, nil }); err != nil { + t.Log("dumping watcher logs") + podLogs(t, "tekton-pipelines", "watcher") + t.Log("dumping api logs") + podLogs(t, "tekton-pipelines", "api") t.Fatalf("Error waiting for PipelineRun creation: %v", err) } }) @@ -93,17 +102,79 @@ func TestGCSLog(t *testing.T) { if logName == "" { t.Skip("log name not found") } - logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName}) - if err != nil { - t.Errorf("Error getting Log Client: %v", err) - } - log, err := logClient.Recv() - if err != nil { - t.Errorf("Error getting Log: %v", err) - } - want := "[hello : hello] hello world!" - if !strings.Contains(string(log.Data), want) { - t.Errorf("Log Data inconsistent got: %s, doesn't have: %s", string(log.Data), want) + if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (done bool, err error) { + logClient, err := gc.GetLog(context.Background(), &resultsv1alpha2.GetLogRequest{Name: logName}) + if err != nil { + t.Logf("Error getting Log Client: %v", err) + return false, nil + } + var log *httpbody.HttpBody + var cerr error + log, cerr = logClient.Recv() + if cerr != nil { + t.Logf("Error getting Log for %s: %v", logName, cerr) + return false, nil + } + want := "hello world!" + if log == nil { + t.Logf("Nil return from logClient.Recv()") + return false, nil + } + if !strings.Contains(string(log.Data), want) { + t.Logf("Log Data inconsistent for %s got: %s, doesn't have: %s", logName, string(log.Data), want) + return false, nil + } + return true, nil + + }); err != nil { + t.Log("dumping watcher logs") + podLogs(t, "tekton-pipelines", "watcher") + t.Log("dumping api logs") + podLogs(t, "tekton-pipelines", "api") + t.Fatalf("Error waiting for check log: %v", err) } }) } + +func podLogs(t *testing.T, ns string, name string) { + t.Logf("getting pod logs for the pattern %s", name) + clientset := kubernetes.NewForConfigOrDie(clientConfig(t)) + ctx := context.Background() + list, err := clientset.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{}) + if err != nil { + t.Errorf("pod list error %s", err) + } + for _, pod := range list.Items { + if strings.Contains(pod.Name, name) { + t.Logf("found pod %s matcher pattern %s", pod.Name, name) + for _, c := range pod.Spec.Containers { + containerLogs(t, ctx, ns, pod.Name, c.Name) + } + break + } + } +} + +func containerLogs(t *testing.T, ctx context.Context, ns, podName, containerName string) { + podLogOpts := corev1.PodLogOptions{} + podLogOpts.Container = containerName + t.Logf("print container %s from pod %s:", containerName, podName) + clientset := kubernetes.NewForConfigOrDie(clientConfig(t)) + req := clientset.CoreV1().Pods(ns).GetLogs(podName, &podLogOpts) + logs, err := req.Stream(ctx) + if err != nil { + t.Errorf("error streaming pod logs %s", err.Error()) + return + } + defer logs.Close() + + buf := new(bytes.Buffer) + _, err = io.Copy(buf, logs) + if err != nil { + t.Errorf("error copying pod logs %s", err.Error()) + return + } + str := buf.String() + t.Logf("%s", str) + +}