From 72d996c38e2e6b5b05f77c332ea97447414affbe Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Sun, 22 Feb 2026 19:53:27 -0800 Subject: [PATCH 01/10] pkg/cg: add command guard tool for annotated child process output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wraps a child process and prefixes each stdout / stderr line with a timestamp and stream indicator (O/E/I), à la annotate-output. Adds tests lol. --- cmd/cg/main.go | 28 +++ hypercmd/rt/main.go | 2 + pkg/cg/cg.go | 32 ++++ pkg/cg/exitcode.go | 48 ++++++ pkg/cg/helpers.go | 68 ++++++++ pkg/cg/runner.go | 99 +++++++++++ pkg/cg/runner_test.go | 385 ++++++++++++++++++++++++++++++++++++++++++ pkg/cg/writer.go | 89 ++++++++++ pkg/cg/writer_test.go | 135 +++++++++++++++ 9 files changed, 886 insertions(+) create mode 100644 cmd/cg/main.go create mode 100644 pkg/cg/cg.go create mode 100644 pkg/cg/exitcode.go create mode 100644 pkg/cg/helpers.go create mode 100644 pkg/cg/runner.go create mode 100644 pkg/cg/runner_test.go create mode 100644 pkg/cg/writer.go create mode 100644 pkg/cg/writer_test.go diff --git a/cmd/cg/main.go b/cmd/cg/main.go new file mode 100644 index 0000000..6a8fa95 --- /dev/null +++ b/cmd/cg/main.go @@ -0,0 +1,28 @@ +package main + +import ( + "errors" + "fmt" + "os" + + "github.com/ripta/rt/pkg/cg" + "github.com/ripta/rt/pkg/version" +) + +func main() { + cmd := cg.NewCommand() + cmd.AddCommand(version.NewCommand()) + + err := cmd.Execute() + if err == nil { + return + } + + var exitErr *cg.ExitError + if errors.As(err, &exitErr) { + os.Exit(exitErr.Code) + } + + fmt.Fprintf(os.Stderr, "Error: %+v\n", err) + os.Exit(1) +} diff --git a/hypercmd/rt/main.go b/hypercmd/rt/main.go index 0f78619..e7ad8f7 100644 --- a/hypercmd/rt/main.go +++ b/hypercmd/rt/main.go @@ -7,6 +7,7 @@ import ( "github.com/ripta/hypercmd/pkg/hypercmd" "github.com/ripta/rt/pkg/calc" + "github.com/ripta/rt/pkg/cg" "github.com/ripta/rt/pkg/enc" "github.com/ripta/rt/pkg/grpcto" "github.com/ripta/rt/pkg/hashsum" @@ -38,6 +39,7 @@ func main() { root.AddCommand(streamdiff.NewCommand()) root.AddCommand(calc.NewCommand()) + root.AddCommand(cg.NewCommand()) v := version.NewCommand() root.Root().AddCommand(v) diff --git a/pkg/cg/cg.go b/pkg/cg/cg.go new file mode 100644 index 0000000..6646287 --- /dev/null +++ b/pkg/cg/cg.go @@ -0,0 +1,32 @@ +package cg + +import ( + "github.com/spf13/cobra" +) + +// DefaultFormat is the default time prefix format: short time. +const DefaultFormat = "15:04:05 " + +type Options struct { + Format string +} + +// NewCommand creates the cg cobra command. +func NewCommand() *cobra.Command { + opts := &Options{} + c := &cobra.Command{ + Use: "cg [flags] -- COMMAND [ARGS...]", + Short: "Execute a command and annotate its output", + Long: "Execute a child command, annotating each line of stdout and stderr with a timestamp prefix and stream indicator.", + + SilenceErrors: true, + SilenceUsage: true, + + Args: cobra.MinimumNArgs(1), + RunE: opts.run, + } + + c.Flags().StringVar(&opts.Format, "format", DefaultFormat, "time prefix format (Go time.Format layout)") + + return c +} diff --git a/pkg/cg/exitcode.go b/pkg/cg/exitcode.go new file mode 100644 index 0000000..68c06a6 --- /dev/null +++ b/pkg/cg/exitcode.go @@ -0,0 +1,48 @@ +package cg + +import ( + "errors" + "fmt" + "os/exec" + "syscall" +) + +// ExitError represents a non-zero exit from a child process. It satisfies the +// error interface and carries the exit code for use by the entry point. +type ExitError struct { + Code int +} + +func (e *ExitError) Error() string { + return fmt.Sprintf("exit code %d", e.Code) +} + +// ExitCodeFromError maps an error from exec.Cmd.Wait into a conventional exit +// code. Returns 127 for command-not-found, 126 for permission denied, and the +// child's actual exit code for exec.ExitError. +// +// For any other error it returns 1 as a generic failure code. +func ExitCodeFromError(err error) int { + if err == nil { + return 0 + } + + if errors.Is(err, exec.ErrNotFound) { + return 127 + } + + var pathErr *exec.Error + if errors.As(err, &pathErr) { + if errors.Is(pathErr.Err, syscall.EACCES) { + return 126 + } + return 127 + } + + var exitErr *exec.ExitError + if errors.As(err, &exitErr) { + return exitErr.ExitCode() + } + + return 1 +} diff --git a/pkg/cg/helpers.go b/pkg/cg/helpers.go new file mode 100644 index 0000000..ba071e1 --- /dev/null +++ b/pkg/cg/helpers.go @@ -0,0 +1,68 @@ +package cg + +import ( + "os/exec" + "strings" + "syscall" +) + +// escapeArgs formats a command and its arguments in a shell-readable style. +// Arguments containing spaces, quotes, or shell metacharacters are quoted. +func escapeArgs(args []string) string { + escaped := make([]string, len(args)) + for i, arg := range args { + escaped[i] = shellQuote(arg) + } + + return strings.Join(escaped, " ") +} + +func shellQuote(s string) string { + if s == "" { + return "''" + } + + safe := true + for _, c := range s { + if !isShellSafe(c) { + safe = false + break + } + } + + if safe { + return s + } + + return "'" + strings.ReplaceAll(s, "'", `'\''`) + "'" +} + +func isShellSafe(c rune) bool { + if c >= 'a' && c <= 'z' { + return true + } + if c >= 'A' && c <= 'Z' { + return true + } + if c >= '0' && c <= '9' { + return true + } + switch c { + case '-', '_', '.', '/', ':', '@', '+', ',', '=': + return true + } + return false +} + +func exitStatus(cmd *exec.Cmd) *syscall.WaitStatus { + if cmd.ProcessState == nil { + return nil + } + + ws, ok := cmd.ProcessState.Sys().(syscall.WaitStatus) + if !ok { + return nil + } + + return &ws +} diff --git a/pkg/cg/runner.go b/pkg/cg/runner.go new file mode 100644 index 0000000..f0be117 --- /dev/null +++ b/pkg/cg/runner.go @@ -0,0 +1,99 @@ +package cg + +import ( + "fmt" + "os" + "os/exec" + "os/signal" + "sync" + "syscall" + "time" + + "github.com/spf13/cobra" + + "github.com/ripta/rt/pkg/version" +) + +func (opts *Options) run(cmd *cobra.Command, args []string) error { + w := NewAnnotatedWriter(cmd.OutOrStdout(), func() string { + return time.Now().Format(opts.Format) + }) + + v := version.GetString() + if v == "" { + v = "unknown" + } + + if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("cg %s", v)); err != nil { + return fmt.Errorf("writing version info: %w", err) + } + + if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("prefix=%q", opts.Format)); err != nil { + return fmt.Errorf("writing prefix info: %w", err) + } + + if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("Started %s", escapeArgs(args))); err != nil { + return fmt.Errorf("writing start info: %w", err) + } + + child := exec.CommandContext(cmd.Context(), args[0], args[1:]...) + child.SysProcAttr = &syscall.SysProcAttr{Setpgid: true} + + stdout, err := child.StdoutPipe() + if err != nil { + return fmt.Errorf("creating stdout pipe: %w", err) + } + + stderr, err := child.StderrPipe() + if err != nil { + return fmt.Errorf("creating stderr pipe: %w", err) + } + + if err := child.Start(); err != nil { + code := ExitCodeFromError(err) + _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with exitcode %d", code)) + return &ExitError{Code: code} + } + + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + defer func() { + signal.Stop(sigCh) + close(sigCh) + }() + + go func() { + for sig := range sigCh { + // Forward signal to the child's process group + _ = syscall.Kill(-child.Process.Pid, sig.(syscall.Signal)) + } + }() + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + _ = w.WriteLines(stdout, IndicatorOut) + }() + go func() { + defer wg.Done() + _ = w.WriteLines(stderr, IndicatorErr) + }() + + wg.Wait() + + waitErr := child.Wait() + code := ExitCodeFromError(waitErr) + if ws := exitStatus(child); ws != nil && ws.Signaled() { + _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with signal %d", ws.Signal())) + } else { + _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with exitcode %d", code)) + } + + if code != 0 { + return &ExitError{Code: code} + } + + return nil +} diff --git a/pkg/cg/runner_test.go b/pkg/cg/runner_test.go new file mode 100644 index 0000000..b3e279a --- /dev/null +++ b/pkg/cg/runner_test.go @@ -0,0 +1,385 @@ +package cg + +import ( + "bytes" + "errors" + "os/exec" + "strings" + "syscall" + "testing" + "time" +) + +type exitCodeFromErrorTest struct { + name string + err error + want int +} + +var exitCodeFromErrorTests = []exitCodeFromErrorTest{ + { + name: "nil error", + err: nil, + want: 0, + }, + { + name: "command not found", + err: exec.ErrNotFound, + want: 127, + }, + { + name: "exec.Error wrapping ErrNotFound", + err: &exec.Error{Name: "nosuchcmd", Err: exec.ErrNotFound}, + want: 127, + }, + { + name: "exec.Error wrapping EACCES", + err: &exec.Error{Name: "/bin/noperm", Err: syscall.EACCES}, + want: 126, + }, + { + name: "generic error", + err: errors.New("something broke"), + want: 1, + }, +} + +func TestExitCodeFromError(t *testing.T) { + t.Parallel() + + for _, tt := range exitCodeFromErrorTests { + t.Run(tt.name, func(t *testing.T) { + got := ExitCodeFromError(tt.err) + if got != tt.want { + t.Errorf("ExitCodeFromError() = %d, want %d", got, tt.want) + } + }) + } +} + +type shellQuoteTest struct { + name string + input string + want string +} + +var shellQuoteTests = []shellQuoteTest{ + { + name: "simple word", + input: "hello", + want: "hello", + }, + { + name: "path", + input: "/usr/bin/echo", + want: "/usr/bin/echo", + }, + { + name: "empty string", + input: "", + want: "''", + }, + { + name: "contains space", + input: "hello world", + want: "'hello world'", + }, + { + name: "contains single quote", + input: "it's", + want: `'it'\''s'`, + }, + { + name: "contains semicolon", + input: "echo;rm", + want: "'echo;rm'", + }, +} + +func TestShellQuote(t *testing.T) { + t.Parallel() + + for _, tt := range shellQuoteTests { + t.Run(tt.name, func(t *testing.T) { + got := shellQuote(tt.input) + if got != tt.want { + t.Errorf("shellQuote(%q) = %q, want %q", tt.input, got, tt.want) + } + }) + } +} + +type escapeArgsTest struct { + name string + args []string + want string +} + +var escapeArgsTests = []escapeArgsTest{ + { + name: "simple command", + args: []string{"echo", "hello"}, + want: "echo hello", + }, + { + name: "command with spaces in arg", + args: []string{"echo", "hello world"}, + want: "echo 'hello world'", + }, + { + name: "sh -c with quoted arg", + args: []string{"sh", "-c", "echo out; echo err >&2"}, + want: "sh -c 'echo out; echo err >&2'", + }, +} + +func TestEscapeArgs(t *testing.T) { + t.Parallel() + + for _, tt := range escapeArgsTests { + t.Run(tt.name, func(t *testing.T) { + got := escapeArgs(tt.args) + if got != tt.want { + t.Errorf("escapeArgs() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestIntegrationStdout(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + cmd := exec.Command("echo", "hello") + out, err := cmd.Output() + if err != nil { + t.Fatalf("echo command failed: %v", err) + } + if got := string(out); got != "hello\n" { + t.Errorf("echo output = %q, want %q", got, "hello\n") + } +} + +func TestIntegrationExitCode(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + cmd := exec.Command("sh", "-c", "exit 42") + err := cmd.Run() + if err == nil { + t.Fatal("expected error from exit 42") + } + + code := ExitCodeFromError(err) + if code != 42 { + t.Errorf("ExitCodeFromError() = %d, want 42", code) + } +} + +func TestIntegrationCommandNotFound(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + cmd := exec.Command("__nonexistent_command_for_cg_test__") + err := cmd.Run() + if err == nil { + t.Fatal("expected error for nonexistent command") + } + + code := ExitCodeFromError(err) + if code != 127 { + t.Errorf("ExitCodeFromError() = %d, want 127", code) + } +} + +// runCgCommand executes the cg cobra command with the given args and returns +// the captured output and error. +func runCgCommand(args ...string) (string, error) { + var buf bytes.Buffer + cmd := NewCommand() + cmd.SetOut(&buf) + cmd.SetErr(&buf) + cmd.SetArgs(args) + err := cmd.Execute() + return buf.String(), err +} + +func TestCommandLifecycleMessages(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--", "echo", "hello") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + lines := strings.Split(strings.TrimRight(out, "\n"), "\n") + if len(lines) < 4 { + t.Fatalf("expected at least 4 lines, got %d: %q", len(lines), out) + } + + // First line: version info + if !strings.HasPrefix(lines[0], "T I: cg ") { + t.Errorf("line 0 = %q, want prefix %q", lines[0], "T I: cg ") + } + + // Second line: prefix info + if !strings.HasPrefix(lines[1], "T I: prefix=") { + t.Errorf("line 1 = %q, want prefix %q", lines[1], "T I: prefix=") + } + if !strings.Contains(lines[1], `"T "`) { + t.Errorf("line 1 = %q, want to contain %q", lines[1], `"T "`) + } + + // Third line: Started + if !strings.HasPrefix(lines[2], "T I: Started echo hello") { + t.Errorf("line 2 = %q, want prefix %q", lines[2], "T I: Started echo hello") + } + + // Fourth line: child output + if lines[3] != "T O: hello" { + t.Errorf("line 3 = %q, want %q", lines[3], "T O: hello") + } + + // Last line: Finished + last := lines[len(lines)-1] + if last != "T I: Finished with exitcode 0" { + t.Errorf("last line = %q, want %q", last, "T I: Finished with exitcode 0") + } +} + +func TestCommandStderrOutput(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--", "sh", "-c", "echo out; echo err >&2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "T O: out\n") { + t.Errorf("output missing stdout line, got: %q", out) + } + if !strings.Contains(out, "T E: err\n") { + t.Errorf("output missing stderr line, got: %q", out) + } +} + +func TestCommandExitCodePropagation(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--", "sh", "-c", "exit 42") + if err == nil { + t.Fatal("expected error from exit 42") + } + + var exitErr *ExitError + if !errors.As(err, &exitErr) { + t.Fatalf("expected *ExitError, got %T: %v", err, err) + } + if exitErr.Code != 42 { + t.Errorf("exit code = %d, want 42", exitErr.Code) + } + + if !strings.Contains(out, "T I: Finished with exitcode 42") { + t.Errorf("output missing finish message, got: %q", out) + } +} + +func TestCommandPartialLine(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--", "sh", "-c", `printf "no newline"`) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "T O: no newline") { + t.Errorf("output missing partial line, got: %q", out) + } +} + +func TestCommandCustomFormat(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "2006-01-02 ", "--", "echo", "test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // The prefix should be a date like "2026-02-22 " + lines := strings.Split(strings.TrimRight(out, "\n"), "\n") + for _, line := range lines { + // Each line should start with a date pattern + if len(line) < 11 { + t.Errorf("line too short: %q", line) + continue + } + // Rough check: starts with 4 digits + if line[0] < '0' || line[0] > '9' { + t.Errorf("line does not start with date: %q", line) + } + } +} + +func TestCommandSignalForwarding(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Run a child that traps SIGTERM and echoes it, then exits + script := `trap 'echo got_sigterm; exit 0' TERM; echo ready; sleep 10` + + var buf bytes.Buffer + cmd := NewCommand() + cmd.SetOut(&buf) + cmd.SetErr(&buf) + cmd.SetArgs([]string{"--format", "T ", "--", "sh", "-c", script}) + + done := make(chan error, 1) + go func() { + done <- cmd.Execute() + }() + + // Wait for the child to be ready + deadline := time.After(5 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timed out waiting for child to become ready") + case err := <-done: + t.Fatalf("command finished before signal: %v, output: %q", err, buf.String()) + default: + } + if strings.Contains(buf.String(), "T O: ready") { + break + } + time.Sleep(10 * time.Millisecond) + } + + // Send SIGTERM to our own process group; the child should receive it via + // the signal forwarding goroutine + syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for command to finish after signal") + } + + out := buf.String() + if !strings.Contains(out, "T O: got_sigterm") { + t.Errorf("child did not receive signal, output: %q", out) + } +} diff --git a/pkg/cg/writer.go b/pkg/cg/writer.go new file mode 100644 index 0000000..089049c --- /dev/null +++ b/pkg/cg/writer.go @@ -0,0 +1,89 @@ +package cg + +import ( + "bufio" + "fmt" + "io" + "sync" +) + +// Indicator identifies the source of an annotated output line. +type Indicator byte + +const ( + // IndicatorInfo marks informational messages from cg itself. + IndicatorInfo Indicator = 'I' + // IndicatorOut marks lines from the child's stdout. + IndicatorOut Indicator = 'O' + // IndicatorErr marks lines from the child's stderr. + IndicatorErr Indicator = 'E' +) + +// PrefixFunc returns the prefix string to prepend to each annotated line +type PrefixFunc func() string + +// AnnotatedWriter writes lines with a prefix and stream indicator to an +// underlying writer. All writes are mutex-protected to prevent interleaving. +type AnnotatedWriter struct { + mu sync.Mutex + dest io.Writer + prefix PrefixFunc +} + +// NewAnnotatedWriter cre ates an AnnotatedWriter that writes to dest, calling +// prefix before each line to obtain the current prefix string. +func NewAnnotatedWriter(dest io.Writer, prefix PrefixFunc) *AnnotatedWriter { + return &AnnotatedWriter{ + dest: dest, + prefix: prefix, + } +} + +// WriteLine writes a single annotated line to the destination. The line should +// not include a trailing newline; one will be appended. The write is atomic +// with respect to other WriteLine and WriteLines calls. +func (w *AnnotatedWriter) WriteLine(ind Indicator, line string) error { + w.mu.Lock() + defer w.mu.Unlock() + + _, err := fmt.Fprintf(w.dest, "%s%c: %s\n", w.prefix(), ind, line) + return err +} + +// WritePartialLine writes a single annotated line without a trailing newline. +// Used for the final line of output when it does not end with a newline. +func (w *AnnotatedWriter) WritePartialLine(ind Indicator, line string) error { + w.mu.Lock() + defer w.mu.Unlock() + + _, err := fmt.Fprintf(w.dest, "%s%c: %s", w.prefix(), ind, line) + return err +} + +// WriteLines reads linewise from r, and writes each as an annotated line. +// +// If the final line does not end with a newline, it is written without a +// trailing newline to preserve the child's exact output. +func (w *AnnotatedWriter) WriteLines(r io.Reader, ind Indicator) error { + br := bufio.NewReader(r) + for { + line, err := br.ReadBytes('\n') + if err != nil { + if err == io.EOF { + return nil + } + return err + } + + if line[len(line)-1] == '\n' { + if werr := w.WriteLine(ind, string(line[:len(line)-1])); werr != nil { + return werr + } + continue + } + + if werr := w.WritePartialLine(ind, string(line)); werr != nil { + return werr + } + } +} diff --git a/pkg/cg/writer_test.go b/pkg/cg/writer_test.go new file mode 100644 index 0000000..9f16e7f --- /dev/null +++ b/pkg/cg/writer_test.go @@ -0,0 +1,135 @@ +package cg + +import ( + "bytes" + "strings" + "testing" +) + +type writeLineTest struct { + name string + indicator Indicator + line string + want string +} + +var writeLineTests = []writeLineTest{ + { + name: "stdout line", + indicator: IndicatorOut, + line: "hello world", + want: "PFX O: hello world\n", + }, + { + name: "stderr line", + indicator: IndicatorErr, + line: "an error", + want: "PFX E: an error\n", + }, + { + name: "info line", + indicator: IndicatorInfo, + line: "cg v1.0", + want: "PFX I: cg v1.0\n", + }, + { + name: "empty line", + indicator: IndicatorOut, + line: "", + want: "PFX O: \n", + }, +} + +func TestWriteLine(t *testing.T) { + t.Parallel() + + for _, tt := range writeLineTests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + w := NewAnnotatedWriter(&buf, func() string { return "PFX " }) + + if err := w.WriteLine(tt.indicator, tt.line); err != nil { + t.Fatalf("WriteLine() error = %v", err) + } + + if got := buf.String(); got != tt.want { + t.Errorf("WriteLine() = %q, want %q", got, tt.want) + } + }) + } +} + +type writeLinesTest struct { + name string + // indicator is the stream indicator to use. + indicator Indicator + // input is the data to feed to WriteLines. + input string + // want is the expected annotated output. + want string +} + +var writeLinesTests = []writeLinesTest{ + { + name: "single line with newline", + indicator: IndicatorOut, + input: "hello\n", + want: "T O: hello\n", + }, + { + name: "multiple lines", + indicator: IndicatorOut, + input: "line1\nline2\nline3\n", + want: "T O: line1\nT O: line2\nT O: line3\n", + }, + { + name: "partial final line", + indicator: IndicatorOut, + input: "hello\npartial", + want: "T O: hello\nT O: partial", + }, + { + name: "single partial line no newline", + indicator: IndicatorErr, + input: "no newline", + want: "T E: no newline", + }, + { + name: "empty input", + indicator: IndicatorOut, + input: "", + want: "", + }, + { + name: "stderr multi-line", + indicator: IndicatorErr, + input: "err1\nerr2\n", + want: "T E: err1\nT E: err2\n", + }, + { + name: "only newline", + indicator: IndicatorOut, + input: "\n", + want: "T O: \n", + }, +} + +func TestWriteLines(t *testing.T) { + t.Parallel() + + for _, tt := range writeLinesTests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + w := NewAnnotatedWriter(&buf, func() string { return "T " }) + + err := w.WriteLines(strings.NewReader(tt.input), tt.indicator) + if err != nil { + t.Fatalf("WriteLines() error = %v", err) + } + + if got := buf.String(); got != tt.want { + t.Errorf("WriteLines() = %q, want %q", got, tt.want) + } + }) + } +} From 70afbd98d274d4c91f836568d9a13c2b2a52c80a Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Sun, 22 Feb 2026 20:01:05 -0800 Subject: [PATCH 02/10] README: document `cg` --- README.md | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/README.md b/README.md index fdbfacb..b8f53bc 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ go install github.com/ripta/rt/cmd/...@latest or pick-and-choose each tool to individually install: +* [cg](#cg) to run a command and annotate its output with timestamps * [enc](#enc) to encode and decode STDIN * [grpcto](#grpcto) to frame and unframe gRPC messages * [hs](#hs) to hash STDIN @@ -43,6 +44,67 @@ Pull requests welcome, though you should probably check first before sinking any +`cg` +---- + +Run a command and annotate each line of its stdout and stderr with a timestamp +and stream indicator (`O` for stdout, `E` for stderr, `I` for cg's own +lifecycle messages). + +Acts like `annotate-output` script. + +``` +go install github.com/ripta/rt/cmd/cg@latest +``` + +Basic usage: + +``` +❯ cg -- echo hello +19:02:59 I: cg v0.1.0 +19:02:59 I: prefix="15:04:05 " +19:02:59 I: Started echo hello +19:02:59 O: hello +19:02:59 I: Finished with exitcode 0 +``` + +Stdout and stderr are distinguished: + +``` +❯ cg -- sh -c 'echo out; echo err >&2' +19:03:04 I: cg v0.1.0 +19:03:04 I: prefix="15:04:05 " +19:03:04 I: Started sh -c 'echo out; echo err >&2' +19:03:04 O: out +19:03:04 E: err +19:03:04 I: Finished with exitcode 0 +``` + +The child's exit code is propagated: + +``` +❯ cg -- sh -c 'exit 42'; echo $? +19:20:35 I: cg v0.1.0 +19:20:35 I: prefix="15:04:05 " +19:20:35 I: Started sh -c 'exit 42' +19:20:35 I: Finished with exitcode 42 +42 +``` + +Use `--format` to change the timestamp prefix. It takes the golang `time.Format` layout: + +``` +❯ cg --format '2006-01-02T15:04:05 ' -- echo hello +2026-02-22T19:05:00 I: cg v0.1.0 +2026-02-22T19:05:00 I: prefix="2006-01-02T15:04:05 " +2026-02-22T19:05:00 I: Started echo hello +2026-02-22T19:05:00 O: hello +2026-02-22T19:05:00 I: Finished with exitcode 0 +``` + +Signals SIGINT and SIGTERM are forwarded to the child process. + + `enc` ---- From 6a63ea368971808d58926322196d7662f7b81499 Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Sun, 22 Feb 2026 23:08:02 -0800 Subject: [PATCH 03/10] pkg/cg: fix partial line bug --- pkg/cg/writer.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/cg/writer.go b/pkg/cg/writer.go index 089049c..7f69d49 100644 --- a/pkg/cg/writer.go +++ b/pkg/cg/writer.go @@ -30,7 +30,7 @@ type AnnotatedWriter struct { prefix PrefixFunc } -// NewAnnotatedWriter cre ates an AnnotatedWriter that writes to dest, calling +// NewAnnotatedWriter creates an AnnotatedWriter that writes to dest, calling // prefix before each line to obtain the current prefix string. func NewAnnotatedWriter(dest io.Writer, prefix PrefixFunc) *AnnotatedWriter { return &AnnotatedWriter{ @@ -70,6 +70,9 @@ func (w *AnnotatedWriter) WriteLines(r io.Reader, ind Indicator) error { line, err := br.ReadBytes('\n') if err != nil { if err == io.EOF { + if len(line) > 0 { + return w.WritePartialLine(ind, string(line)) + } return nil } return err From f50f675c4afd8d15af0fe356cefb802e5fec51fe Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Tue, 24 Feb 2026 21:34:25 -0800 Subject: [PATCH 04/10] pkg/cg: add --capture flag for file capture When `--capture` is active, stdout and stderr are written to separate temporary files containing raw, unannotated output. A separate lifecycle file captures all I: messages. --- pkg/cg/capture.go | 64 +++++++++ pkg/cg/capture_test.go | 300 +++++++++++++++++++++++++++++++++++++++++ pkg/cg/cg.go | 4 +- pkg/cg/runner.go | 93 ++++++++++--- 4 files changed, 444 insertions(+), 17 deletions(-) create mode 100644 pkg/cg/capture.go create mode 100644 pkg/cg/capture_test.go diff --git a/pkg/cg/capture.go b/pkg/cg/capture.go new file mode 100644 index 0000000..e966c2a --- /dev/null +++ b/pkg/cg/capture.go @@ -0,0 +1,64 @@ +package cg + +import ( + "fmt" + "os" + "path/filepath" +) + +// Capture manages temporary files for capturing raw output. +type Capture struct { + Stdout *os.File + Stderr *os.File + Lifecycle *os.File + prefix PrefixFunc +} + +// NewCapture creates three temporary files for capturing stdout, stderr, +// and lifecycle output. +func NewCapture(pid int, prefix PrefixFunc) (*Capture, error) { + dir := os.TempDir() + + stdout, err := os.Create(filepath.Join(dir, fmt.Sprintf("cg-%d-stdout", pid))) + if err != nil { + return nil, fmt.Errorf("creating stdout capture file: %w", err) + } + + stderr, err := os.Create(filepath.Join(dir, fmt.Sprintf("cg-%d-stderr", pid))) + if err != nil { + stdout.Close() + return nil, fmt.Errorf("creating stderr capture file: %w", err) + } + + lifecycle, err := os.Create(filepath.Join(dir, fmt.Sprintf("cg-%d-lifecycle", pid))) + if err != nil { + stdout.Close() + stderr.Close() + return nil, fmt.Errorf("creating lifecycle capture file: %w", err) + } + + return &Capture{ + Stdout: stdout, + Stderr: stderr, + Lifecycle: lifecycle, + prefix: prefix, + }, nil +} + +// WriteLifecycle writes an annotated lifecycle message. +func (c *Capture) WriteLifecycle(msg string) error { + _, err := fmt.Fprintf(c.Lifecycle, "%s%c: %s\n", c.prefix(), IndicatorInfo, msg) + return err +} + +// Close closes all three capture files. Returns the first error encountered. +func (c *Capture) Close() error { + first := c.Stdout.Close() + if err := c.Stderr.Close(); err != nil && first == nil { + first = err + } + if err := c.Lifecycle.Close(); err != nil && first == nil { + first = err + } + return first +} diff --git a/pkg/cg/capture_test.go b/pkg/cg/capture_test.go new file mode 100644 index 0000000..9d3468a --- /dev/null +++ b/pkg/cg/capture_test.go @@ -0,0 +1,300 @@ +package cg + +import ( + "fmt" + "os" + "path/filepath" + "strings" + "testing" +) + +func TestNewCapture(t *testing.T) { + prefix := func() string { return "T " } + pid := os.Getpid() + + cap, err := NewCapture(pid, prefix) + if err != nil { + t.Fatalf("NewCapture() error = %v", err) + } + defer cap.Close() + defer os.Remove(cap.Stdout.Name()) + defer os.Remove(cap.Stderr.Name()) + defer os.Remove(cap.Lifecycle.Name()) + + dir := os.TempDir() + wantStdout := filepath.Join(dir, fmt.Sprintf("cg-%d-stdout", pid)) + wantStderr := filepath.Join(dir, fmt.Sprintf("cg-%d-stderr", pid)) + wantLifecycle := filepath.Join(dir, fmt.Sprintf("cg-%d-lifecycle", pid)) + + if cap.Stdout.Name() != wantStdout { + t.Errorf("Stdout.Name() = %q, want %q", cap.Stdout.Name(), wantStdout) + } + if cap.Stderr.Name() != wantStderr { + t.Errorf("Stderr.Name() = %q, want %q", cap.Stderr.Name(), wantStderr) + } + if cap.Lifecycle.Name() != wantLifecycle { + t.Errorf("Lifecycle.Name() = %q, want %q", cap.Lifecycle.Name(), wantLifecycle) + } + + for _, f := range []*os.File{cap.Stdout, cap.Stderr, cap.Lifecycle} { + if _, err := os.Stat(f.Name()); err != nil { + t.Errorf("file %s does not exist: %v", f.Name(), err) + } + } +} + +func TestCaptureWriteLifecycle(t *testing.T) { + prefix := func() string { return "T " } + pid := os.Getpid() + 10000 // offset to avoid collision with other tests + + cap, err := NewCapture(pid, prefix) + if err != nil { + t.Fatalf("NewCapture() error = %v", err) + } + defer os.Remove(cap.Stdout.Name()) + defer os.Remove(cap.Stderr.Name()) + defer os.Remove(cap.Lifecycle.Name()) + + if err := cap.WriteLifecycle("Started echo hello"); err != nil { + t.Fatalf("WriteLifecycle() error = %v", err) + } + if err := cap.WriteLifecycle("Finished with exitcode 0"); err != nil { + t.Fatalf("WriteLifecycle() error = %v", err) + } + cap.Close() + + data, err := os.ReadFile(cap.Lifecycle.Name()) + if err != nil { + t.Fatalf("reading lifecycle file: %v", err) + } + + want := "T I: Started echo hello\nT I: Finished with exitcode 0\n" + if got := string(data); got != want { + t.Errorf("lifecycle file = %q, want %q", got, want) + } +} + +// extractCapturePath finds a capture path announcement in cg output. +func extractCapturePath(output, key string) string { + for _, line := range strings.Split(output, "\n") { + idx := strings.Index(line, key) + if idx >= 0 { + return line[idx+len(key):] + } + } + return "" +} + +// cleanupCaptureFiles removes capture files found in cg output. +func cleanupCaptureFiles(t *testing.T, output string) { + t.Helper() + for _, key := range []string{"capture.stdout=", "capture.stderr=", "capture.lifecycle="} { + path := extractCapturePath(output, key) + if path != "" { + os.Remove(path) + } + } +} + +func TestCommandCaptureCreatesFiles(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--capture", "--", "echo", "hello") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + stdoutPath := extractCapturePath(out, "capture.stdout=") + stderrPath := extractCapturePath(out, "capture.stderr=") + lifecyclePath := extractCapturePath(out, "capture.lifecycle=") + + if stdoutPath == "" { + t.Fatal("missing capture.stdout path in output") + } + if stderrPath == "" { + t.Fatal("missing capture.stderr path in output") + } + if lifecyclePath == "" { + t.Fatal("missing capture.lifecycle path in output") + } + + // Stdout file contains raw output + data, err := os.ReadFile(stdoutPath) + if err != nil { + t.Fatalf("reading stdout capture: %v", err) + } + if got := string(data); got != "hello\n" { + t.Errorf("stdout capture = %q, want %q", got, "hello\n") + } + + // Stderr file exists but is empty + data, err = os.ReadFile(stderrPath) + if err != nil { + t.Fatalf("reading stderr capture: %v", err) + } + if len(data) != 0 { + t.Errorf("stderr capture = %q, want empty", string(data)) + } + + // Lifecycle file contains Started and Finished + data, err = os.ReadFile(lifecyclePath) + if err != nil { + t.Fatalf("reading lifecycle capture: %v", err) + } + lifecycle := string(data) + if !strings.Contains(lifecycle, "Started echo hello") { + t.Errorf("lifecycle missing Started message: %q", lifecycle) + } + if !strings.Contains(lifecycle, "Finished with exitcode 0") { + t.Errorf("lifecycle missing Finished message: %q", lifecycle) + } +} + +func TestCommandCaptureSuppressesChildOutput(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--capture", "--", "echo", "hello") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + // Child output must not appear annotated on cg's stdout + if strings.Contains(out, "O: hello") { + t.Errorf("child stdout should not appear on cg output in capture mode, got: %q", out) + } + + // Lifecycle messages must appear + if !strings.Contains(out, "I: Started echo hello") { + t.Errorf("lifecycle messages should appear on stdout, got: %q", out) + } + if !strings.Contains(out, "I: Finished with exitcode 0") { + t.Errorf("finished message should appear on stdout, got: %q", out) + } +} + +func TestCommandCaptureSeparateStreams(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--capture", "--", "sh", "-c", "echo out; echo err >&2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + stdoutPath := extractCapturePath(out, "capture.stdout=") + stderrPath := extractCapturePath(out, "capture.stderr=") + + data, err := os.ReadFile(stdoutPath) + if err != nil { + t.Fatalf("reading stdout capture: %v", err) + } + if got := string(data); got != "out\n" { + t.Errorf("stdout capture = %q, want %q", got, "out\n") + } + + data, err = os.ReadFile(stderrPath) + if err != nil { + t.Fatalf("reading stderr capture: %v", err) + } + if got := string(data); got != "err\n" { + t.Errorf("stderr capture = %q, want %q", got, "err\n") + } +} + +func TestCommandCaptureLifecycleFileContents(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--capture", "--", "echo", "hello") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + lifecyclePath := extractCapturePath(out, "capture.lifecycle=") + data, err := os.ReadFile(lifecyclePath) + if err != nil { + t.Fatalf("reading lifecycle capture: %v", err) + } + lifecycle := string(data) + + // Should contain version info + if !strings.Contains(lifecycle, "I: cg ") { + t.Errorf("lifecycle missing version info: %q", lifecycle) + } + + // Should contain prefix info + if !strings.Contains(lifecycle, "I: prefix=") { + t.Errorf("lifecycle missing prefix info: %q", lifecycle) + } + + // Should contain capture path announcements + if !strings.Contains(lifecycle, "I: capture.stdout=") { + t.Errorf("lifecycle missing capture.stdout path: %q", lifecycle) + } + + // Should contain Started and Finished + if !strings.Contains(lifecycle, "I: Started echo hello") { + t.Errorf("lifecycle missing Started: %q", lifecycle) + } + if !strings.Contains(lifecycle, "I: Finished with exitcode 0") { + t.Errorf("lifecycle missing Finished: %q", lifecycle) + } + + // Should NOT contain child output + if strings.Contains(lifecycle, "O: hello") { + t.Errorf("lifecycle should not contain child output: %q", lifecycle) + } +} + +func TestCommandCaptureEmptyStreams(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--capture", "--", "true") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + stdoutPath := extractCapturePath(out, "capture.stdout=") + stderrPath := extractCapturePath(out, "capture.stderr=") + lifecyclePath := extractCapturePath(out, "capture.lifecycle=") + + // All three files should exist + for _, path := range []string{stdoutPath, stderrPath, lifecyclePath} { + if _, err := os.Stat(path); err != nil { + t.Errorf("file %s should exist: %v", path, err) + } + } + + // Stdout and stderr should be empty + for _, path := range []string{stdoutPath, stderrPath} { + data, err := os.ReadFile(path) + if err != nil { + t.Fatalf("reading %s: %v", path, err) + } + if len(data) != 0 { + t.Errorf("%s should be empty, got %q", path, string(data)) + } + } + + // Lifecycle should still have content + data, err := os.ReadFile(lifecyclePath) + if err != nil { + t.Fatalf("reading lifecycle: %v", err) + } + if !strings.Contains(string(data), "Finished with exitcode 0") { + t.Errorf("lifecycle missing Finished message: %q", string(data)) + } +} diff --git a/pkg/cg/cg.go b/pkg/cg/cg.go index 6646287..bdd68ac 100644 --- a/pkg/cg/cg.go +++ b/pkg/cg/cg.go @@ -8,7 +8,8 @@ import ( const DefaultFormat = "15:04:05 " type Options struct { - Format string + Format string + Capture bool } // NewCommand creates the cg cobra command. @@ -27,6 +28,7 @@ func NewCommand() *cobra.Command { } c.Flags().StringVar(&opts.Format, "format", DefaultFormat, "time prefix format (Go time.Format layout)") + c.Flags().BoolVar(&opts.Capture, "capture", false, "capture child output to temporary files") return c } diff --git a/pkg/cg/runner.go b/pkg/cg/runner.go index f0be117..ce8a06f 100644 --- a/pkg/cg/runner.go +++ b/pkg/cg/runner.go @@ -2,6 +2,7 @@ package cg import ( "fmt" + "io" "os" "os/exec" "os/signal" @@ -15,24 +16,38 @@ import ( ) func (opts *Options) run(cmd *cobra.Command, args []string) error { - w := NewAnnotatedWriter(cmd.OutOrStdout(), func() string { + prefix := func() string { return time.Now().Format(opts.Format) - }) + } + w := NewAnnotatedWriter(cmd.OutOrStdout(), prefix) + + // writeInfo writes a lifecycle message to the annotated writer and + // optionally buffers it for later replay to the capture lifecycle file. + var preCaptureMsgs []string + writeInfo := func(msg string) error { + if err := w.WriteLine(IndicatorInfo, msg); err != nil { + return err + } + if opts.Capture { + preCaptureMsgs = append(preCaptureMsgs, msg) + } + return nil + } v := version.GetString() if v == "" { v = "unknown" } - if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("cg %s", v)); err != nil { + if err := writeInfo(fmt.Sprintf("cg %s", v)); err != nil { return fmt.Errorf("writing version info: %w", err) } - if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("prefix=%q", opts.Format)); err != nil { + if err := writeInfo(fmt.Sprintf("prefix=%q", opts.Format)); err != nil { return fmt.Errorf("writing prefix info: %w", err) } - if err := w.WriteLine(IndicatorInfo, fmt.Sprintf("Started %s", escapeArgs(args))); err != nil { + if err := writeInfo(fmt.Sprintf("Started %s", escapeArgs(args))); err != nil { return fmt.Errorf("writing start info: %w", err) } @@ -51,10 +66,45 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { if err := child.Start(); err != nil { code := ExitCodeFromError(err) - _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with exitcode %d", code)) + _ = writeInfo(fmt.Sprintf("Finished with exitcode %d", code)) return &ExitError{Code: code} } + var cap *Capture + if opts.Capture { + cap, err = NewCapture(child.Process.Pid, prefix) + if err != nil { + _ = child.Process.Kill() + _, _ = child.Process.Wait() + return fmt.Errorf("creating capture files: %w", err) + } + defer cap.Close() + + for _, msg := range preCaptureMsgs { + if err := cap.WriteLifecycle(msg); err != nil { + return fmt.Errorf("writing buffered lifecycle: %w", err) + } + } + + // Rebind writeInfo to write to both destinations + writeInfo = func(msg string) error { + if err := w.WriteLine(IndicatorInfo, msg); err != nil { + return err + } + return cap.WriteLifecycle(msg) + } + + if err := writeInfo(fmt.Sprintf("capture.stdout=%s", cap.Stdout.Name())); err != nil { + return fmt.Errorf("writing capture stdout path: %w", err) + } + if err := writeInfo(fmt.Sprintf("capture.stderr=%s", cap.Stderr.Name())); err != nil { + return fmt.Errorf("writing capture stderr path: %w", err) + } + if err := writeInfo(fmt.Sprintf("capture.lifecycle=%s", cap.Lifecycle.Name())); err != nil { + return fmt.Errorf("writing capture lifecycle path: %w", err) + } + } + sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) defer func() { @@ -72,23 +122,34 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { var wg sync.WaitGroup wg.Add(2) - go func() { - defer wg.Done() - _ = w.WriteLines(stdout, IndicatorOut) - }() - go func() { - defer wg.Done() - _ = w.WriteLines(stderr, IndicatorErr) - }() + if cap != nil { + go func() { + defer wg.Done() + _, _ = io.Copy(cap.Stdout, stdout) + }() + go func() { + defer wg.Done() + _, _ = io.Copy(cap.Stderr, stderr) + }() + } else { + go func() { + defer wg.Done() + _ = w.WriteLines(stdout, IndicatorOut) + }() + go func() { + defer wg.Done() + _ = w.WriteLines(stderr, IndicatorErr) + }() + } wg.Wait() waitErr := child.Wait() code := ExitCodeFromError(waitErr) if ws := exitStatus(child); ws != nil && ws.Signaled() { - _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with signal %d", ws.Signal())) + _ = writeInfo(fmt.Sprintf("Finished with signal %d", ws.Signal())) } else { - _ = w.WriteLine(IndicatorInfo, fmt.Sprintf("Finished with exitcode %d", code)) + _ = writeInfo(fmt.Sprintf("Finished with exitcode %d", code)) } if code != 0 { From 483bcfcad3253a4f8e1e18291306cbcfa9ddc61b Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Tue, 24 Feb 2026 22:15:03 -0800 Subject: [PATCH 05/10] pkg/cg: add `--buffered` flag for deferred output When set, stdout / stderr is accumulated in memory and replayed after exit preceded by a section header. When `--capture` is active, writes into the files happen immediately instead of deferred. --- pkg/cg/buffer.go | 100 +++++++++ pkg/cg/buffer_test.go | 462 ++++++++++++++++++++++++++++++++++++++++++ pkg/cg/cg.go | 6 +- pkg/cg/runner.go | 41 +++- pkg/cg/writer.go | 21 ++ pkg/cg/writer_test.go | 102 ++++++++++ 6 files changed, 728 insertions(+), 4 deletions(-) create mode 100644 pkg/cg/buffer.go create mode 100644 pkg/cg/buffer_test.go diff --git a/pkg/cg/buffer.go b/pkg/cg/buffer.go new file mode 100644 index 0000000..099a9b0 --- /dev/null +++ b/pkg/cg/buffer.go @@ -0,0 +1,100 @@ +package cg + +import ( + "bufio" + "io" + "sync" +) + +type bufferedLine struct { + prefix string + line string + partial bool +} + +// LineBuffer accumulates child output lines in memory, grouped by indicator. +// Lines are stored with the prefix captured at receive time so they can be +// replayed later with their original timestamps. +type LineBuffer struct { + mu sync.Mutex + prefix PrefixFunc + streams map[Indicator][]bufferedLine +} + +// NewLineBuffer creates a LineBuffer that calls prefix to obtain the prefix +// string for each line as it arrives. +func NewLineBuffer(prefix PrefixFunc) *LineBuffer { + return &LineBuffer{ + prefix: prefix, + streams: make(map[Indicator][]bufferedLine), + } +} + +// WriteLines reads line-by-line from r, capturing each line with its +// receive-time prefix under the given indicator. Partial final lines (no +// trailing newline) are recorded with partial set to true. +func (b *LineBuffer) WriteLines(r io.Reader, ind Indicator) error { + br := bufio.NewReader(r) + for { + line, err := br.ReadBytes('\n') + if err != nil { + if err == io.EOF { + if len(line) > 0 { + b.mu.Lock() + b.streams[ind] = append(b.streams[ind], bufferedLine{ + prefix: b.prefix(), + line: string(line), + partial: true, + }) + b.mu.Unlock() + } + return nil + } + return err + } + + p := b.prefix() + b.mu.Lock() + b.streams[ind] = append(b.streams[ind], bufferedLine{ + prefix: p, + line: string(line[:len(line)-1]), + }) + b.mu.Unlock() + } +} + +// Flush writes all buffered lines to w, grouped by stream. Stdout lines are +// written first, then stderr. Each non-empty stream is preceded by a section +// header. Lines are replayed with their original receive-time prefix. +func (b *LineBuffer) Flush(w *AnnotatedWriter) error { + b.mu.Lock() + defer b.mu.Unlock() + + for _, ind := range []Indicator{IndicatorOut, IndicatorErr} { + lines := b.streams[ind] + if len(lines) == 0 { + continue + } + + header := "--- stdout ---" + if ind == IndicatorErr { + header = "--- stderr ---" + } + if err := w.WriteLine(IndicatorInfo, header); err != nil { + return err + } + + for _, bl := range lines { + if bl.partial { + if err := w.WritePartialLineWithPrefix(bl.prefix, ind, bl.line); err != nil { + return err + } + } else { + if err := w.WriteLineWithPrefix(bl.prefix, ind, bl.line); err != nil { + return err + } + } + } + } + return nil +} diff --git a/pkg/cg/buffer_test.go b/pkg/cg/buffer_test.go new file mode 100644 index 0000000..8212b50 --- /dev/null +++ b/pkg/cg/buffer_test.go @@ -0,0 +1,462 @@ +package cg + +import ( + "bytes" + "errors" + "fmt" + "os" + "path/filepath" + "strings" + "sync" + "syscall" + "testing" + "time" +) + +type writeLinesBufferTest struct { + name string + indicator Indicator + input string + wantCount int + wantLines []bufferedLine +} + +var writeLinesBufferTests = []writeLinesBufferTest{ + { + name: "single line", + indicator: IndicatorOut, + input: "hello\n", + wantCount: 1, + wantLines: []bufferedLine{ + {prefix: "P ", line: "hello"}, + }, + }, + { + name: "multiple lines", + indicator: IndicatorOut, + input: "line1\nline2\nline3\n", + wantCount: 3, + wantLines: []bufferedLine{ + {prefix: "P ", line: "line1"}, + {prefix: "P ", line: "line2"}, + {prefix: "P ", line: "line3"}, + }, + }, + { + name: "partial final line", + indicator: IndicatorErr, + input: "hello\npartial", + wantCount: 2, + wantLines: []bufferedLine{ + {prefix: "P ", line: "hello"}, + {prefix: "P ", line: "partial", partial: true}, + }, + }, + { + name: "empty input", + indicator: IndicatorOut, + input: "", + wantCount: 0, + }, +} + +func TestLineBufferWriteLines(t *testing.T) { + t.Parallel() + + for _, tt := range writeLinesBufferTests { + t.Run(tt.name, func(t *testing.T) { + buf := NewLineBuffer(func() string { return "P " }) + + err := buf.WriteLines(strings.NewReader(tt.input), tt.indicator) + if err != nil { + t.Fatalf("WriteLines() error = %v", err) + } + + got := buf.streams[tt.indicator] + if len(got) != tt.wantCount { + t.Fatalf("WriteLines() stored %d lines, want %d", len(got), tt.wantCount) + } + + for i, want := range tt.wantLines { + if got[i].prefix != want.prefix { + t.Errorf("line[%d].prefix = %q, want %q", i, got[i].prefix, want.prefix) + } + if got[i].line != want.line { + t.Errorf("line[%d].line = %q, want %q", i, got[i].line, want.line) + } + if got[i].partial != want.partial { + t.Errorf("line[%d].partial = %v, want %v", i, got[i].partial, want.partial) + } + } + }) + } +} + +type flushTest struct { + name string + streams map[Indicator][]bufferedLine + want string +} + +var flushTests = []flushTest{ + { + name: "stdout only", + streams: map[Indicator][]bufferedLine{ + IndicatorOut: { + {prefix: "T1 ", line: "out1"}, + {prefix: "T2 ", line: "out2"}, + }, + }, + want: "F I: --- stdout ---\nT1 O: out1\nT2 O: out2\n", + }, + { + name: "stderr only", + streams: map[Indicator][]bufferedLine{ + IndicatorErr: { + {prefix: "T1 ", line: "err1"}, + }, + }, + want: "F I: --- stderr ---\nT1 E: err1\n", + }, + { + name: "both streams", + streams: map[Indicator][]bufferedLine{ + IndicatorOut: { + {prefix: "T1 ", line: "out1"}, + }, + IndicatorErr: { + {prefix: "T2 ", line: "err1"}, + }, + }, + want: "F I: --- stdout ---\nT1 O: out1\nF I: --- stderr ---\nT2 E: err1\n", + }, + { + name: "empty streams", + streams: map[Indicator][]bufferedLine{}, + want: "", + }, + { + name: "partial final line", + streams: map[Indicator][]bufferedLine{ + IndicatorOut: { + {prefix: "T1 ", line: "full"}, + {prefix: "T2 ", line: "partial", partial: true}, + }, + }, + want: "F I: --- stdout ---\nT1 O: full\nT2 O: partial", + }, + { + name: "prefix preservation", + streams: map[Indicator][]bufferedLine{ + IndicatorOut: { + {prefix: "00:00:01 ", line: "first"}, + {prefix: "00:00:02 ", line: "second"}, + {prefix: "00:00:03 ", line: "third"}, + }, + }, + want: "F I: --- stdout ---\n00:00:01 O: first\n00:00:02 O: second\n00:00:03 O: third\n", + }, +} + +func TestLineBufferFlush(t *testing.T) { + t.Parallel() + + for _, tt := range flushTests { + t.Run(tt.name, func(t *testing.T) { + lb := &LineBuffer{ + streams: tt.streams, + } + + var out bytes.Buffer + w := NewAnnotatedWriter(&out, func() string { return "F " }) + + if err := lb.Flush(w); err != nil { + t.Fatalf("Flush() error = %v", err) + } + + if got := out.String(); got != tt.want { + t.Errorf("Flush() = %q, want %q", got, tt.want) + } + }) + } +} + +func TestLineBufferConcurrentWrite(t *testing.T) { + t.Parallel() + + buf := NewLineBuffer(func() string { return "P " }) + + const linesPerWriter = 100 + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + var sb strings.Builder + for i := 0; i < linesPerWriter; i++ { + fmt.Fprintf(&sb, "stdout-%d\n", i) + } + _ = buf.WriteLines(strings.NewReader(sb.String()), IndicatorOut) + }() + + go func() { + defer wg.Done() + var sb strings.Builder + for i := 0; i < linesPerWriter; i++ { + fmt.Fprintf(&sb, "stderr-%d\n", i) + } + _ = buf.WriteLines(strings.NewReader(sb.String()), IndicatorErr) + }() + + wg.Wait() + + if got := len(buf.streams[IndicatorOut]); got != linesPerWriter { + t.Errorf("stdout line count = %d, want %d", got, linesPerWriter) + } + if got := len(buf.streams[IndicatorErr]); got != linesPerWriter { + t.Errorf("stderr line count = %d, want %d", got, linesPerWriter) + } +} + +func TestCommandBufferedGroupsOutput(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--buffered", "--", "sh", "-c", "echo out; echo err >&2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + lines := strings.Split(strings.TrimRight(out, "\n"), "\n") + + // Find section headers and content + stdoutHeaderIdx := -1 + stderrHeaderIdx := -1 + stdoutLineIdx := -1 + stderrLineIdx := -1 + + for i, line := range lines { + switch { + case strings.Contains(line, "I: --- stdout ---"): + stdoutHeaderIdx = i + case strings.Contains(line, "I: --- stderr ---"): + stderrHeaderIdx = i + case strings.Contains(line, "O: out"): + stdoutLineIdx = i + case strings.Contains(line, "E: err"): + stderrLineIdx = i + } + } + + if stdoutHeaderIdx == -1 { + t.Fatalf("missing stdout header in output: %q", out) + } + if stderrHeaderIdx == -1 { + t.Fatalf("missing stderr header in output: %q", out) + } + if stdoutLineIdx == -1 { + t.Fatalf("missing stdout content in output: %q", out) + } + if stderrLineIdx == -1 { + t.Fatalf("missing stderr content in output: %q", out) + } + + // Stdout section comes before stderr section + if stdoutHeaderIdx >= stderrHeaderIdx { + t.Errorf("stdout header (%d) should come before stderr header (%d)", stdoutHeaderIdx, stderrHeaderIdx) + } + // Stdout content follows stdout header + if stdoutLineIdx <= stdoutHeaderIdx { + t.Errorf("stdout content (%d) should follow stdout header (%d)", stdoutLineIdx, stdoutHeaderIdx) + } + // Stderr content follows stderr header + if stderrLineIdx <= stderrHeaderIdx { + t.Errorf("stderr content (%d) should follow stderr header (%d)", stderrLineIdx, stderrHeaderIdx) + } + + // Buffered mode notice present + if !strings.Contains(out, "I: buffered mode, output deferred") { + t.Errorf("missing buffered mode notice in output: %q", out) + } +} + +func TestCommandBufferedEmptyStream(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--buffered", "--", "echo", "hello") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "I: --- stdout ---") { + t.Errorf("missing stdout header in output: %q", out) + } + if strings.Contains(out, "I: --- stderr ---") { + t.Errorf("unexpected stderr header in output: %q", out) + } + if !strings.Contains(out, "O: hello") { + t.Errorf("missing stdout content in output: %q", out) + } +} + +func TestCommandBufferedNoOutput(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--buffered", "--", "true") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if strings.Contains(out, "--- stdout ---") { + t.Errorf("unexpected stdout header for silent command: %q", out) + } + if strings.Contains(out, "--- stderr ---") { + t.Errorf("unexpected stderr header for silent command: %q", out) + } + if !strings.Contains(out, "Finished with exitcode 0") { + t.Errorf("missing finish message: %q", out) + } +} + +func TestCommandBufferedExitCode(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--buffered", "--", "sh", "-c", "echo before_exit; exit 42") + if err == nil { + t.Fatal("expected error from exit 42") + } + + var exitErr *ExitError + if !errors.As(err, &exitErr) { + t.Fatalf("expected *ExitError, got %T: %v", err, err) + } + if exitErr.Code != 42 { + t.Errorf("exit code = %d, want 42", exitErr.Code) + } + + if !strings.Contains(out, "O: before_exit") { + t.Errorf("buffered output should still be flushed, got: %q", out) + } + if !strings.Contains(out, "Finished with exitcode 42") { + t.Errorf("missing finish message, got: %q", out) + } +} + +func TestCommandBufferedWithCapture(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand("--format", "T ", "--buffered", "--capture", "--", "sh", "-c", "echo out; echo err >&2") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer cleanupCaptureFiles(t, out) + + // Capture files should have raw output + stdoutPath := extractCapturePath(out, "capture.stdout=") + stderrPath := extractCapturePath(out, "capture.stderr=") + if stdoutPath == "" || stderrPath == "" { + t.Fatalf("missing capture paths in output: %q", out) + } + + stdoutData, err := os.ReadFile(stdoutPath) + if err != nil { + t.Fatalf("reading stdout capture: %v", err) + } + if got := string(stdoutData); got != "out\n" { + t.Errorf("stdout capture = %q, want %q", got, "out\n") + } + + stderrData, err := os.ReadFile(stderrPath) + if err != nil { + t.Fatalf("reading stderr capture: %v", err) + } + if got := string(stderrData); got != "err\n" { + t.Errorf("stderr capture = %q, want %q", got, "err\n") + } + + // Terminal output should have grouped sections + if !strings.Contains(out, "I: --- stdout ---") { + t.Errorf("missing stdout header in terminal output: %q", out) + } + if !strings.Contains(out, "O: out") { + t.Errorf("missing stdout content in terminal output: %q", out) + } + if !strings.Contains(out, "I: --- stderr ---") { + t.Errorf("missing stderr header in terminal output: %q", out) + } + if !strings.Contains(out, "E: err") { + t.Errorf("missing stderr content in terminal output: %q", out) + } +} + +func TestCommandBufferedSignalFlush(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + // Use a temp file for the child to signal readiness, since buffered mode + // defers output and we cannot poll stdout. + readyFile := filepath.Join(t.TempDir(), "ready") + + script := fmt.Sprintf( + `trap 'echo got_sigterm; exit 0' TERM; touch %s; sleep 10`, + readyFile, + ) + + var outBuf bytes.Buffer + cmd := NewCommand() + cmd.SetOut(&outBuf) + cmd.SetErr(&outBuf) + cmd.SetArgs([]string{"--format", "T ", "--buffered", "--", "sh", "-c", script}) + + done := make(chan error, 1) + go func() { + done <- cmd.Execute() + }() + + // Poll for the ready file + deadline := time.After(5 * time.Second) + for { + select { + case <-deadline: + t.Fatal("timed out waiting for child to become ready") + case err := <-done: + t.Fatalf("command finished before signal: %v, output: %q", err, outBuf.String()) + default: + } + if _, err := os.Stat(readyFile); err == nil { + break + } + time.Sleep(10 * time.Millisecond) + } + + // Small delay to let the child settle into sleep + time.Sleep(50 * time.Millisecond) + + // Send SIGTERM to our own process; the signal forwarding goroutine in + // the runner will relay it to the child process group. + syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for command to finish after signal") + } + + out := outBuf.String() + if !strings.Contains(out, "O: got_sigterm") { + t.Errorf("child did not flush signal response, output: %q", out) + } + if !strings.Contains(out, "I: --- stdout ---") { + t.Errorf("missing stdout header after signal, output: %q", out) + } +} diff --git a/pkg/cg/cg.go b/pkg/cg/cg.go index bdd68ac..4fa151d 100644 --- a/pkg/cg/cg.go +++ b/pkg/cg/cg.go @@ -8,8 +8,9 @@ import ( const DefaultFormat = "15:04:05 " type Options struct { - Format string - Capture bool + Format string + Capture bool + Buffered bool } // NewCommand creates the cg cobra command. @@ -29,6 +30,7 @@ func NewCommand() *cobra.Command { c.Flags().StringVar(&opts.Format, "format", DefaultFormat, "time prefix format (Go time.Format layout)") c.Flags().BoolVar(&opts.Capture, "capture", false, "capture child output to temporary files") + c.Flags().BoolVar(&opts.Buffered, "buffered", false, "defer child output until command finishes, grouped by stream") return c } diff --git a/pkg/cg/runner.go b/pkg/cg/runner.go index ce8a06f..26753d6 100644 --- a/pkg/cg/runner.go +++ b/pkg/cg/runner.go @@ -21,6 +21,11 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { } w := NewAnnotatedWriter(cmd.OutOrStdout(), prefix) + var buf *LineBuffer + if opts.Buffered { + buf = NewLineBuffer(prefix) + } + // writeInfo writes a lifecycle message to the annotated writer and // optionally buffers it for later replay to the capture lifecycle file. var preCaptureMsgs []string @@ -47,6 +52,12 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { return fmt.Errorf("writing prefix info: %w", err) } + if opts.Buffered { + if err := writeInfo("buffered mode, output deferred"); err != nil { + return fmt.Errorf("writing buffered info: %w", err) + } + } + if err := writeInfo(fmt.Sprintf("Started %s", escapeArgs(args))); err != nil { return fmt.Errorf("writing start info: %w", err) } @@ -122,7 +133,17 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { var wg sync.WaitGroup wg.Add(2) - if cap != nil { + switch { + case cap != nil && buf != nil: + go func() { + defer wg.Done() + _ = buf.WriteLines(io.TeeReader(stdout, cap.Stdout), IndicatorOut) + }() + go func() { + defer wg.Done() + _ = buf.WriteLines(io.TeeReader(stderr, cap.Stderr), IndicatorErr) + }() + case cap != nil: go func() { defer wg.Done() _, _ = io.Copy(cap.Stdout, stdout) @@ -131,7 +152,16 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { defer wg.Done() _, _ = io.Copy(cap.Stderr, stderr) }() - } else { + case buf != nil: + go func() { + defer wg.Done() + _ = buf.WriteLines(stdout, IndicatorOut) + }() + go func() { + defer wg.Done() + _ = buf.WriteLines(stderr, IndicatorErr) + }() + default: go func() { defer wg.Done() _ = w.WriteLines(stdout, IndicatorOut) @@ -146,6 +176,13 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { waitErr := child.Wait() code := ExitCodeFromError(waitErr) + + if buf != nil { + if err := buf.Flush(w); err != nil { + return fmt.Errorf("flushing buffered output: %w", err) + } + } + if ws := exitStatus(child); ws != nil && ws.Signaled() { _ = writeInfo(fmt.Sprintf("Finished with signal %d", ws.Signal())) } else { diff --git a/pkg/cg/writer.go b/pkg/cg/writer.go index 7f69d49..29fc88b 100644 --- a/pkg/cg/writer.go +++ b/pkg/cg/writer.go @@ -60,6 +60,27 @@ func (w *AnnotatedWriter) WritePartialLine(ind Indicator, line string) error { return err } +// WriteLineWithPrefix writes a single annotated line using the given prefix +// instead of calling the prefix function. Used for replaying buffered lines +// with their original receive-time prefix. +func (w *AnnotatedWriter) WriteLineWithPrefix(prefix string, ind Indicator, line string) error { + w.mu.Lock() + defer w.mu.Unlock() + + _, err := fmt.Fprintf(w.dest, "%s%c: %s\n", prefix, ind, line) + return err +} + +// WritePartialLineWithPrefix writes a single annotated line without a trailing +// newline, using the given prefix instead of calling the prefix function. +func (w *AnnotatedWriter) WritePartialLineWithPrefix(prefix string, ind Indicator, line string) error { + w.mu.Lock() + defer w.mu.Unlock() + + _, err := fmt.Fprintf(w.dest, "%s%c: %s", prefix, ind, line) + return err +} + // WriteLines reads linewise from r, and writes each as an annotated line. // // If the final line does not end with a newline, it is written without a diff --git a/pkg/cg/writer_test.go b/pkg/cg/writer_test.go index 9f16e7f..f81a26f 100644 --- a/pkg/cg/writer_test.go +++ b/pkg/cg/writer_test.go @@ -114,6 +114,108 @@ var writeLinesTests = []writeLinesTest{ }, } +type writeLineWithPrefixTest struct { + name string + prefix string + indicator Indicator + line string + want string +} + +var writeLineWithPrefixTests = []writeLineWithPrefixTest{ + { + name: "stdout line with custom prefix", + prefix: "12:00:00 ", + indicator: IndicatorOut, + line: "hello world", + want: "12:00:00 O: hello world\n", + }, + { + name: "stderr line with custom prefix", + prefix: "12:00:01 ", + indicator: IndicatorErr, + line: "an error", + want: "12:00:01 E: an error\n", + }, + { + name: "info line with custom prefix", + prefix: "12:00:02 ", + indicator: IndicatorInfo, + line: "lifecycle", + want: "12:00:02 I: lifecycle\n", + }, + { + name: "empty line with custom prefix", + prefix: "T ", + indicator: IndicatorOut, + line: "", + want: "T O: \n", + }, +} + +func TestWriteLineWithPrefix(t *testing.T) { + t.Parallel() + + for _, tt := range writeLineWithPrefixTests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + w := NewAnnotatedWriter(&buf, func() string { return "UNUSED " }) + + if err := w.WriteLineWithPrefix(tt.prefix, tt.indicator, tt.line); err != nil { + t.Fatalf("WriteLineWithPrefix() error = %v", err) + } + + if got := buf.String(); got != tt.want { + t.Errorf("WriteLineWithPrefix() = %q, want %q", got, tt.want) + } + }) + } +} + +type writePartialLineWithPrefixTest struct { + name string + prefix string + indicator Indicator + line string + want string +} + +var writePartialLineWithPrefixTests = []writePartialLineWithPrefixTest{ + { + name: "stdout partial with custom prefix", + prefix: "12:00:00 ", + indicator: IndicatorOut, + line: "partial", + want: "12:00:00 O: partial", + }, + { + name: "stderr partial with custom prefix", + prefix: "12:00:01 ", + indicator: IndicatorErr, + line: "err partial", + want: "12:00:01 E: err partial", + }, +} + +func TestWritePartialLineWithPrefix(t *testing.T) { + t.Parallel() + + for _, tt := range writePartialLineWithPrefixTests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + w := NewAnnotatedWriter(&buf, func() string { return "UNUSED " }) + + if err := w.WritePartialLineWithPrefix(tt.prefix, tt.indicator, tt.line); err != nil { + t.Fatalf("WritePartialLineWithPrefix() error = %v", err) + } + + if got := buf.String(); got != tt.want { + t.Errorf("WritePartialLineWithPrefix() = %q, want %q", got, tt.want) + } + }) + } +} + func TestWriteLines(t *testing.T) { t.Parallel() From 10b9db15fb9aae790df9d34928380a2d5a56095b Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Tue, 24 Feb 2026 22:46:18 -0800 Subject: [PATCH 06/10] pkg/cg: add structured log line parsing Extract human-readable messages from JSON log lines, replacing raw JSON with the message field value while preserving cg's annotation format; non-JSON lines pass through unchanged. --- Makefile | 3 + pkg/cg/buffer.go | 39 ++- pkg/cg/cg.go | 44 +++ pkg/cg/fields.go | 57 ++++ pkg/cg/logparse.go | 93 +++++++ pkg/cg/logparse_test.go | 586 +++++++++++++++++++++++++++++++++++++++ pkg/cg/runner.go | 19 ++ pkg/cg/timestamp.go | 138 +++++++++ pkg/cg/timestamp_test.go | 211 ++++++++++++++ pkg/cg/writer.go | 36 ++- 10 files changed, 1214 insertions(+), 12 deletions(-) create mode 100644 pkg/cg/fields.go create mode 100644 pkg/cg/logparse.go create mode 100644 pkg/cg/logparse_test.go create mode 100644 pkg/cg/timestamp.go create mode 100644 pkg/cg/timestamp_test.go diff --git a/Makefile b/Makefile index 5fd399e..bcb90f8 100644 --- a/Makefile +++ b/Makefile @@ -22,4 +22,7 @@ test: samples: echo ABCDEFGHIJKLMNOPQRSTUVWXYZ abcdefghijklmnopqrstuvwxyz 0123456789 | go run ./cmd/uni map -a > samples/uni.txt +lint: + go vet ./... + .PHONY: generate-proto imports install install-protoc hyper fast-test test samples diff --git a/pkg/cg/buffer.go b/pkg/cg/buffer.go index 099a9b0..0358ad0 100644 --- a/pkg/cg/buffer.go +++ b/pkg/cg/buffer.go @@ -18,9 +18,15 @@ type bufferedLine struct { type LineBuffer struct { mu sync.Mutex prefix PrefixFunc + proc LineProcessor streams map[Indicator][]bufferedLine } +// SetProcessor sets the line processor for this buffer. +func (b *LineBuffer) SetProcessor(proc LineProcessor) { + b.proc = proc +} + // NewLineBuffer creates a LineBuffer that calls prefix to obtain the prefix // string for each line as it arrives. func NewLineBuffer(prefix PrefixFunc) *LineBuffer { @@ -40,10 +46,15 @@ func (b *LineBuffer) WriteLines(r io.Reader, ind Indicator) error { if err != nil { if err == io.EOF { if len(line) > 0 { + prefix, display := b.processLine(string(line)) + if prefix == "" { + prefix = b.prefix() + } + b.mu.Lock() b.streams[ind] = append(b.streams[ind], bufferedLine{ - prefix: b.prefix(), - line: string(line), + prefix: prefix, + line: display, partial: true, }) b.mu.Unlock() @@ -53,16 +64,34 @@ func (b *LineBuffer) WriteLines(r io.Reader, ind Indicator) error { return err } - p := b.prefix() + raw := string(line[:len(line)-1]) + prefix, display := b.processLine(raw) + if prefix == "" { + prefix = b.prefix() + } + b.mu.Lock() b.streams[ind] = append(b.streams[ind], bufferedLine{ - prefix: p, - line: string(line[:len(line)-1]), + prefix: prefix, + line: display, }) b.mu.Unlock() } } +func (b *LineBuffer) processLine(line string) (prefix string, display string) { + if b.proc == nil { + return "", line + } + + result := b.proc(line) + if result == nil { + return "", line + } + + return result.Prefix, result.Line +} + // Flush writes all buffered lines to w, grouped by stream. Stdout lines are // written first, then stderr. Each non-empty stream is preceded by a section // header. Lines are replayed with their original receive-time prefix. diff --git a/pkg/cg/cg.go b/pkg/cg/cg.go index 4fa151d..67670fc 100644 --- a/pkg/cg/cg.go +++ b/pkg/cg/cg.go @@ -1,6 +1,8 @@ package cg import ( + "fmt" + "github.com/spf13/cobra" ) @@ -11,6 +13,12 @@ type Options struct { Format string Capture bool Buffered bool + + LogParse string + LogMsgKey string + LogTSKey string + LogTSFmt string + LogFields string } // NewCommand creates the cg cobra command. @@ -31,6 +39,42 @@ func NewCommand() *cobra.Command { c.Flags().StringVar(&opts.Format, "format", DefaultFormat, "time prefix format (Go time.Format layout)") c.Flags().BoolVar(&opts.Capture, "capture", false, "capture child output to temporary files") c.Flags().BoolVar(&opts.Buffered, "buffered", false, "defer child output until command finishes, grouped by stream") + c.Flags().StringVar(&opts.LogParse, "log-parse", "", "log line parser (\"json\")") + c.Flags().StringVar(&opts.LogMsgKey, "log-message-key", "message", "JSON key for the log message") + c.Flags().StringVar(&opts.LogTSKey, "log-timestamp-key", "timestamp", "JSON key for the timestamp (empty to disable)") + c.Flags().StringVar(&opts.LogTSFmt, "log-timestamp-format", "", "timestamp format: rfc3339, unix-s, unix-ms (empty for auto-detect)") + c.Flags().StringVar(&opts.LogFields, "log-fields", "", "comma-separated JSON keys to append, or \"*\" for all") return c } + +// logDependentFlags are flags that require --log-parse to be set. +var logDependentFlags = []string{ + "log-message-key", + "log-timestamp-key", + "log-timestamp-format", + "log-fields", +} + +func (opts *Options) validateFlags(cmd *cobra.Command) error { + if opts.LogParse == "" { + for _, name := range logDependentFlags { + if cmd.Flags().Changed(name) { + return fmt.Errorf("--%s requires --log-parse", name) + } + } + return nil + } + + if opts.LogParse != "json" { + return fmt.Errorf("unsupported --log-parse value: %q (supported: json)", opts.LogParse) + } + + switch opts.LogTSFmt { + case "", "rfc3339", "unix-s", "unix-ms": + default: + return fmt.Errorf("unsupported --log-timestamp-format value: %q (supported: rfc3339, unix-s, unix-ms)", opts.LogTSFmt) + } + + return nil +} diff --git a/pkg/cg/fields.go b/pkg/cg/fields.go new file mode 100644 index 0000000..55bce0d --- /dev/null +++ b/pkg/cg/fields.go @@ -0,0 +1,57 @@ +package cg + +import ( + "fmt" + "sort" + "strings" +) + +// formatFields produces a " key=value key=value" suffix from the given JSON +// object. If fields is ["*"], all keys are included in alphabetical order, +// excluding any keys in excludeKeys. Values containing spaces are quoted. +func formatFields(obj map[string]any, fields []string, excludeKeys ...string) string { + if len(fields) == 0 { + return "" + } + + exclude := make(map[string]bool, len(excludeKeys)) + for _, k := range excludeKeys { + if k != "" { + exclude[k] = true + } + } + + var keys []string + if len(fields) == 1 && fields[0] == "*" { + for k := range obj { + if !exclude[k] { + keys = append(keys, k) + } + } + sort.Strings(keys) + } else { + for _, k := range fields { + if _, ok := obj[k]; ok && !exclude[k] { + keys = append(keys, k) + } + } + } + + if len(keys) == 0 { + return "" + } + + sb := strings.Builder{} + for _, k := range keys { + v := stringify(obj[k]) + sb.WriteByte(' ') + sb.WriteString(k) + sb.WriteByte('=') + if strings.ContainsRune(v, ' ') { + fmt.Fprintf(&sb, "%q", v) + } else { + sb.WriteString(v) + } + } + return sb.String() +} diff --git a/pkg/cg/logparse.go b/pkg/cg/logparse.go new file mode 100644 index 0000000..edb5a73 --- /dev/null +++ b/pkg/cg/logparse.go @@ -0,0 +1,93 @@ +package cg + +import ( + "encoding/json" + "fmt" + "strings" +) + +// ProcessedLine is the result of processing a raw output line for display. +type ProcessedLine struct { + // Prefix is an optional string to display before the message, e.g., timestamp + Prefix string + // Line is the main content to display + Line string +} + +// LineProcessor transforms raw output lines for display. Returns nil to pass +// the line through unchanged. +type LineProcessor func(line string) *ProcessedLine + +// JSONProcessorOptions configures the JSON log line processor. +type JSONProcessorOptions struct { + MessageKey string + TimestampKey string + TimestampFmt string + Fields []string + Format string +} + +// NewJSONProcessor returns a LineProcessor that extracts the message from JSON +// log lines. Non-JSON lines and lines missing the message key pass through +// unchanged. +func NewJSONProcessor(opts JSONProcessorOptions) LineProcessor { + var tsParser *TimestampParser + if opts.TimestampKey != "" { + tsParser = NewTimestampParser(opts.TimestampFmt, opts.Format) + } + + return func(line string) *ProcessedLine { + var obj map[string]any + if err := json.Unmarshal([]byte(line), &obj); err != nil { + return nil + } + + msgVal, ok := obj[opts.MessageKey] + if !ok { + return nil + } + + msg := stringify(msgVal) + + var suffix string + if len(opts.Fields) > 0 { + suffix = formatFields(obj, opts.Fields, opts.MessageKey, opts.TimestampKey) + } + + var prefix string + if tsParser != nil { + if tsVal, ok := obj[opts.TimestampKey]; ok { + prefix = tsParser.Parse(tsVal) + } + } + + return &ProcessedLine{ + Prefix: prefix, + Line: msg + suffix, + } + } +} + +// stringify converts a JSON value to its string representation. +func stringify(v any) string { + switch val := v.(type) { + case string: + return val + case float64, bool: + return fmt.Sprint(val) + case nil: + return "null" + default: + b, _ := json.Marshal(val) + return string(b) + } +} + +// parseFieldsFlag splits a comma-separated fields string into a slice. +// Returns nil for empty input. +func parseFieldsFlag(s string) []string { + if s == "" { + return nil + } + return strings.Split(s, ",") +} diff --git a/pkg/cg/logparse_test.go b/pkg/cg/logparse_test.go new file mode 100644 index 0000000..4cbc500 --- /dev/null +++ b/pkg/cg/logparse_test.go @@ -0,0 +1,586 @@ +package cg + +import ( + "bytes" + "errors" + "strings" + "testing" +) + +type stringifyTest struct { + name string + val any + want string +} + +var stringifyTests = []stringifyTest{ + { + name: "string value", + val: "hello", + want: "hello", + }, + { + name: "float64 integer", + val: float64(42), + want: "42", + }, + { + name: "float64 fraction", + val: float64(3.14), + want: "3.14", + }, + { + name: "bool true", + val: true, + want: "true", + }, + { + name: "bool false", + val: false, + want: "false", + }, + { + name: "nil", + val: nil, + want: "null", + }, + { + name: "slice", + val: []any{"a", "b"}, + want: `["a","b"]`, + }, + { + name: "map", + val: map[string]any{"k": "v"}, + want: `{"k":"v"}`, + }, +} + +func TestStringify(t *testing.T) { + t.Parallel() + + for _, tt := range stringifyTests { + t.Run(tt.name, func(t *testing.T) { + got := stringify(tt.val) + if got != tt.want { + t.Errorf("stringify(%v) = %q, want %q", tt.val, got, tt.want) + } + }) + } +} + +type parseFieldsFlagTest struct { + name string + input string + want []string +} + +var parseFieldsFlagTests = []parseFieldsFlagTest{ + { + name: "empty string", + input: "", + want: nil, + }, + { + name: "single field", + input: "level", + want: []string{"level"}, + }, + { + name: "multiple fields", + input: "level,host,pid", + want: []string{"level", "host", "pid"}, + }, + { + name: "wildcard", + input: "*", + want: []string{"*"}, + }, +} + +func TestParseFieldsFlag(t *testing.T) { + t.Parallel() + + for _, tt := range parseFieldsFlagTests { + t.Run(tt.name, func(t *testing.T) { + got := parseFieldsFlag(tt.input) + if tt.want == nil { + if got != nil { + t.Errorf("parseFieldsFlag(%q) = %v, want nil", tt.input, got) + } + return + } + if len(got) != len(tt.want) { + t.Fatalf("parseFieldsFlag(%q) = %v, want %v", tt.input, got, tt.want) + } + for i := range got { + if got[i] != tt.want[i] { + t.Errorf("parseFieldsFlag(%q)[%d] = %q, want %q", tt.input, i, got[i], tt.want[i]) + } + } + }) + } +} + +type jsonProcessorTest struct { + name string + opts JSONProcessorOptions + line string + // want is the expected result, nil means pass through + want *ProcessedLine +} + +var jsonProcessorTests = []jsonProcessorTest{ + { + name: "valid JSON with message", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{"message":"hello world","level":"info"}`, + want: &ProcessedLine{Line: "hello world"}, + }, + { + name: "missing message key", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{"msg":"hello world","level":"info"}`, + want: nil, + }, + { + name: "non-JSON line", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: "this is not json", + want: nil, + }, + { + name: "non-string message", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{"message":42,"level":"info"}`, + want: &ProcessedLine{Line: "42"}, + }, + { + name: "boolean message", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{"message":true}`, + want: &ProcessedLine{Line: "true"}, + }, + { + name: "null message", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{"message":null}`, + want: &ProcessedLine{Line: "null"}, + }, + { + name: "custom message key", + opts: JSONProcessorOptions{MessageKey: "msg"}, + line: `{"msg":"custom key"}`, + want: &ProcessedLine{Line: "custom key"}, + }, + { + name: "empty JSON object", + opts: JSONProcessorOptions{MessageKey: "message"}, + line: `{}`, + want: nil, + }, + { + name: "with fields", + opts: JSONProcessorOptions{ + MessageKey: "message", + Fields: []string{"level"}, + }, + line: `{"message":"hello","level":"info"}`, + want: &ProcessedLine{Line: "hello level=info"}, + }, + { + name: "with wildcard fields", + opts: JSONProcessorOptions{ + MessageKey: "message", + Fields: []string{"*"}, + }, + line: `{"message":"hello","level":"info","host":"srv1"}`, + want: &ProcessedLine{Line: "hello host=srv1 level=info"}, + }, + { + name: "with timestamp", + opts: JSONProcessorOptions{ + MessageKey: "message", + TimestampKey: "timestamp", + Format: "15:04:05 ", + }, + line: `{"message":"hello","timestamp":"2024-01-15T10:30:00Z"}`, + want: &ProcessedLine{Prefix: "10:30:00 ", Line: "hello"}, + }, + { + name: "with timestamp and fields", + opts: JSONProcessorOptions{ + MessageKey: "message", + TimestampKey: "ts", + Format: "15:04:05 ", + Fields: []string{"level"}, + }, + line: `{"message":"hello","ts":"2024-01-15T10:30:00Z","level":"warn"}`, + want: &ProcessedLine{Prefix: "10:30:00 ", Line: "hello level=warn"}, + }, +} + +func TestNewJSONProcessor(t *testing.T) { + t.Parallel() + + for _, tt := range jsonProcessorTests { + t.Run(tt.name, func(t *testing.T) { + proc := NewJSONProcessor(tt.opts) + got := proc(tt.line) + + if tt.want == nil { + if got != nil { + t.Errorf("processor returned %+v, want nil", got) + } + return + } + if got == nil { + t.Fatalf("processor returned nil, want %+v", tt.want) + } + if got.Line != tt.want.Line { + t.Errorf("Line = %q, want %q", got.Line, tt.want.Line) + } + if got.Prefix != tt.want.Prefix { + t.Errorf("Prefix = %q, want %q", got.Prefix, tt.want.Prefix) + } + }) + } +} + +type formatFieldsTest struct { + name string + obj map[string]any + fields []string + excludeKeys []string + want string +} + +var formatFieldsTests = []formatFieldsTest{ + { + name: "single field", + obj: map[string]any{"level": "info", "message": "hi"}, + fields: []string{"level"}, + want: " level=info", + }, + { + name: "multiple fields in order", + obj: map[string]any{"level": "info", "host": "srv1", "message": "hi"}, + fields: []string{"host", "level"}, + want: " host=srv1 level=info", + }, + { + name: "missing field ignored", + obj: map[string]any{"level": "info", "message": "hi"}, + fields: []string{"level", "missing"}, + want: " level=info", + }, + { + name: "wildcard all fields", + obj: map[string]any{"level": "info", "host": "srv1"}, + fields: []string{"*"}, + want: " host=srv1 level=info", + }, + { + name: "wildcard excludes keys", + obj: map[string]any{"level": "info", "message": "hi", "timestamp": "now"}, + fields: []string{"*"}, + excludeKeys: []string{"message", "timestamp"}, + want: " level=info", + }, + { + name: "value with spaces is quoted", + obj: map[string]any{"msg": "hello world"}, + fields: []string{"msg"}, + want: ` msg="hello world"`, + }, + { + name: "numeric value", + obj: map[string]any{"pid": float64(1234)}, + fields: []string{"pid"}, + want: " pid=1234", + }, + { + name: "boolean value", + obj: map[string]any{"debug": true}, + fields: []string{"debug"}, + want: " debug=true", + }, + { + name: "empty fields", + obj: map[string]any{"level": "info"}, + fields: []string{}, + want: "", + }, + { + name: "excluded field not shown in explicit list", + obj: map[string]any{"level": "info", "message": "hi"}, + fields: []string{"level", "message"}, + excludeKeys: []string{"message"}, + want: " level=info", + }, +} + +func TestFormatFields(t *testing.T) { + t.Parallel() + + for _, tt := range formatFieldsTests { + t.Run(tt.name, func(t *testing.T) { + got := formatFields(tt.obj, tt.fields, tt.excludeKeys...) + if got != tt.want { + t.Errorf("formatFields() = %q, want %q", got, tt.want) + } + }) + } +} + +type writeLinesProcessorTest struct { + name string + proc LineProcessor + indicator Indicator + input string + want string +} + +var writeLinesProcessorTests = []writeLinesProcessorTest{ + { + name: "JSON lines processed", + proc: NewJSONProcessor(JSONProcessorOptions{MessageKey: "message"}), + indicator: IndicatorOut, + input: "{\"message\":\"hello\"}\n{\"message\":\"world\"}\n", + want: "T O: hello\nT O: world\n", + }, + { + name: "non-JSON passes through", + proc: NewJSONProcessor(JSONProcessorOptions{MessageKey: "message"}), + indicator: IndicatorOut, + input: "plain text\n", + want: "T O: plain text\n", + }, + { + name: "mixed JSON and non-JSON", + proc: NewJSONProcessor(JSONProcessorOptions{MessageKey: "message"}), + indicator: IndicatorOut, + input: "{\"message\":\"parsed\"}\nnot json\n", + want: "T O: parsed\nT O: not json\n", + }, +} + +func TestWriteLinesWithProcessor(t *testing.T) { + t.Parallel() + + for _, tt := range writeLinesProcessorTests { + t.Run(tt.name, func(t *testing.T) { + var buf bytes.Buffer + w := NewAnnotatedWriter(&buf, func() string { return "T " }) + w.SetProcessor(tt.proc) + + err := w.WriteLines(strings.NewReader(tt.input), tt.indicator) + if err != nil { + t.Fatalf("WriteLines() error = %v", err) + } + + if got := buf.String(); got != tt.want { + t.Errorf("WriteLines() = %q, want %q", got, tt.want) + } + }) + } +} + +type bufferProcessorTest struct { + name string + proc LineProcessor + indicator Indicator + input string + wantLines []bufferedLine +} + +var bufferProcessorTests = []bufferProcessorTest{ + { + name: "JSON lines processed in buffer", + proc: NewJSONProcessor(JSONProcessorOptions{MessageKey: "message"}), + indicator: IndicatorOut, + input: "{\"message\":\"hello\"}\n{\"message\":\"world\"}\n", + wantLines: []bufferedLine{ + {prefix: "P ", line: "hello"}, + {prefix: "P ", line: "world"}, + }, + }, + { + name: "non-JSON passes through in buffer", + proc: NewJSONProcessor(JSONProcessorOptions{MessageKey: "message"}), + indicator: IndicatorOut, + input: "plain text\n", + wantLines: []bufferedLine{ + {prefix: "P ", line: "plain text"}, + }, + }, +} + +func TestLineBufferWriteLinesWithProcessor(t *testing.T) { + t.Parallel() + + for _, tt := range bufferProcessorTests { + t.Run(tt.name, func(t *testing.T) { + buf := NewLineBuffer(func() string { return "P " }) + buf.SetProcessor(tt.proc) + + err := buf.WriteLines(strings.NewReader(tt.input), tt.indicator) + if err != nil { + t.Fatalf("WriteLines() error = %v", err) + } + + got := buf.streams[tt.indicator] + if len(got) != len(tt.wantLines) { + t.Fatalf("stored %d lines, want %d", len(got), len(tt.wantLines)) + } + + for i, want := range tt.wantLines { + if got[i].prefix != want.prefix { + t.Errorf("line[%d].prefix = %q, want %q", i, got[i].prefix, want.prefix) + } + if got[i].line != want.line { + t.Errorf("line[%d].line = %q, want %q", i, got[i].line, want.line) + } + } + }) + } +} + +func TestCommandLogParseJSON(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "json", + "--", + "sh", "-c", `echo '{"message":"hello","level":"info"}'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: hello\n") { + t.Errorf("output missing parsed message, got: %q", out) + } + // The output line should be just the message, not the full JSON + for _, line := range strings.Split(out, "\n") { + if strings.Contains(line, "O: ") && strings.Contains(line, `"level"`) { + t.Errorf("output line should not contain raw JSON: %q", line) + } + } +} + +func TestCommandLogParseNonJSON(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "json", + "--", + "echo", "plain text", + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: plain text") { + t.Errorf("non-JSON should pass through, got: %q", out) + } +} + +func TestCommandLogFieldsWithoutParse(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + _, err := runCgCommand( + "--format", "T ", + "--log-fields", "level", + "--", + "echo", "hi", + ) + if err == nil { + t.Fatal("expected error when --log-fields used without --log-parse") + } + + var exitErr *ExitError + if !errors.As(err, &exitErr) { + t.Fatalf("expected *ExitError, got %T: %v", err, err) + } + if exitErr.Code != 2 { + t.Errorf("exit code = %d, want 2", exitErr.Code) + } +} + +func TestCommandLogParseInvalid(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + _, err := runCgCommand( + "--format", "T ", + "--log-parse", "xml", + "--", + "echo", "hi", + ) + if err == nil { + t.Fatal("expected error for unsupported --log-parse value") + } + + var exitErr *ExitError + if !errors.As(err, &exitErr) { + t.Fatalf("expected *ExitError, got %T: %v", err, err) + } + if exitErr.Code != 2 { + t.Errorf("exit code = %d, want 2", exitErr.Code) + } +} + +func TestCommandLogParseWithFields(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "json", + "--log-fields", "level", + "--", + "sh", "-c", `echo '{"message":"hello","level":"info"}'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: hello level=info") { + t.Errorf("output missing parsed message with fields, got: %q", out) + } +} + +func TestCommandLogParseBuffered(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "json", + "--buffered", + "--", + "sh", "-c", `echo '{"message":"buffered msg","level":"info"}'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: buffered msg") { + t.Errorf("output missing parsed message in buffered mode, got: %q", out) + } + if !strings.Contains(out, "I: --- stdout ---") { + t.Errorf("missing stdout header in buffered mode, got: %q", out) + } +} diff --git a/pkg/cg/runner.go b/pkg/cg/runner.go index 26753d6..9331779 100644 --- a/pkg/cg/runner.go +++ b/pkg/cg/runner.go @@ -16,14 +16,33 @@ import ( ) func (opts *Options) run(cmd *cobra.Command, args []string) error { + if err := opts.validateFlags(cmd); err != nil { + fmt.Fprintln(cmd.ErrOrStderr(), err) + return &ExitError{Code: 2} + } + prefix := func() string { return time.Now().Format(opts.Format) } w := NewAnnotatedWriter(cmd.OutOrStdout(), prefix) + if opts.LogParse != "" { + proc := NewJSONProcessor(JSONProcessorOptions{ + MessageKey: opts.LogMsgKey, + TimestampKey: opts.LogTSKey, + TimestampFmt: opts.LogTSFmt, + Fields: parseFieldsFlag(opts.LogFields), + Format: opts.Format, + }) + w.SetProcessor(proc) + } + var buf *LineBuffer if opts.Buffered { buf = NewLineBuffer(prefix) + if w.proc != nil { + buf.SetProcessor(w.proc) + } } // writeInfo writes a lifecycle message to the annotated writer and diff --git a/pkg/cg/timestamp.go b/pkg/cg/timestamp.go new file mode 100644 index 0000000..398cb01 --- /dev/null +++ b/pkg/cg/timestamp.go @@ -0,0 +1,138 @@ +package cg + +import ( + "math" + "strconv" + "sync" + "time" +) + +// TimestampFormat identifies a timestamp encoding. +type TimestampFormat int + +const ( + tsFormatRFC3339 TimestampFormat = iota + 1 + tsFormatUnixS + tsFormatUnixMS +) + +// TimestampParser auto-detects and parses timestamps from JSON values. It +// locks on the first successfully detected format, avoiding repeated probing. +type TimestampParser struct { + mu sync.Mutex + locked bool + format TimestampFormat + layout string +} + +// NewTimestampParser creates a parser. If explicitFmt is non-empty, it locks +// to that format immediately. layout is the Go time.Format layout used for +// rendering the prefix string. +func NewTimestampParser(explicitFmt string, layout string) *TimestampParser { + tp := &TimestampParser{layout: layout} + switch explicitFmt { + case "rfc3339": + tp.locked = true + tp.format = tsFormatRFC3339 + case "unix-s": + tp.locked = true + tp.format = tsFormatUnixS + case "unix-ms": + tp.locked = true + tp.format = tsFormatUnixMS + } + return tp +} + +// Parse attempts to extract a time from val and returns a formatted prefix +// string. Returns empty string on failure. +func (tp *TimestampParser) Parse(val any) string { + tp.mu.Lock() + defer tp.mu.Unlock() + + if tp.locked { + return tp.parseWith(tp.format, val) + } + + for _, f := range []TimestampFormat{tsFormatRFC3339, tsFormatUnixS, tsFormatUnixMS} { + if result := tp.parseWith(f, val); result != "" { + tp.locked = true + tp.format = f + return result + } + } + return "" +} + +func (tp *TimestampParser) parseWith(format TimestampFormat, val any) string { + var t time.Time + var ok bool + + switch format { + case tsFormatRFC3339: + t, ok = parseRFC3339(val) + case tsFormatUnixS: + t, ok = parseUnixSeconds(val) + case tsFormatUnixMS: + t, ok = parseUnixMillis(val) + } + + if !ok { + return "" + } + if !inRange(t) { + return "" + } + return t.Format(tp.layout) +} + +func parseRFC3339(val any) (time.Time, bool) { + s, ok := val.(string) + if !ok { + return time.Time{}, false + } + t, err := time.Parse(time.RFC3339Nano, s) + if err != nil { + return time.Time{}, false + } + return t, true +} + +func parseUnixSeconds(val any) (time.Time, bool) { + f, ok := toFloat64(val) + if !ok { + return time.Time{}, false + } + sec, frac := math.Modf(f) + return time.Unix(int64(sec), int64(frac*1e9)), true +} + +func parseUnixMillis(val any) (time.Time, bool) { + f, ok := toFloat64(val) + if !ok { + return time.Time{}, false + } + ms := int64(f) + return time.Unix(ms/1000, (ms%1000)*1e6), true +} + +func toFloat64(val any) (float64, bool) { + switch v := val.(type) { + case float64: + return v, true + case string: + f, err := strconv.ParseFloat(v, 64) + if err != nil { + return 0, false + } + return f, true + default: + return 0, false + } +} + +func inRange(t time.Time) bool { + year := t.Year() + return year >= 1970 && year <= 2100 +} + diff --git a/pkg/cg/timestamp_test.go b/pkg/cg/timestamp_test.go new file mode 100644 index 0000000..0e718d6 --- /dev/null +++ b/pkg/cg/timestamp_test.go @@ -0,0 +1,211 @@ +package cg + +import ( + "testing" + "time" +) + +// unixExpected computes the expected formatted string for a unix timestamp +// in the local timezone, matching the behavior of time.Unix().Format(). +func unixExpected(sec int64, layout string) string { + return time.Unix(sec, 0).Format(layout) +} + +type timestampParseTest struct { + name string + parser *TimestampParser + val any + want string +} + +var timestampParseTests = []timestampParseTest{ + { + name: "RFC 3339 string", + parser: NewTimestampParser("", "15:04:05 "), + val: "2024-01-15T10:30:00Z", + want: "10:30:00 ", + }, + { + name: "RFC 3339 with nanoseconds", + parser: NewTimestampParser("", "15:04:05.000 "), + val: "2024-01-15T10:30:00.123456789Z", + want: "10:30:00.123 ", + }, + { + name: "explicit rfc3339 format", + parser: NewTimestampParser("rfc3339", "15:04:05 "), + val: "2024-01-15T10:30:00Z", + want: "10:30:00 ", + }, + { + name: "explicit rfc3339 rejects number", + parser: NewTimestampParser("rfc3339", "15:04:05 "), + val: float64(1705312200), + want: "", + }, + { + name: "explicit unix-s rejects RFC 3339 string", + parser: NewTimestampParser("unix-s", "15:04:05 "), + val: "2024-01-15T10:30:00Z", + want: "", + }, + { + name: "non-parseable value", + parser: NewTimestampParser("", "15:04:05 "), + val: "not a timestamp", + want: "", + }, + { + name: "nil value", + parser: NewTimestampParser("", "15:04:05 "), + val: nil, + want: "", + }, + { + name: "boolean value", + parser: NewTimestampParser("", "15:04:05 "), + val: true, + want: "", + }, + { + name: "year before 1970 rejected", + parser: NewTimestampParser("rfc3339", "15:04:05 "), + val: "1969-12-31T23:59:59Z", + want: "", + }, + { + name: "year after 2100 rejected", + parser: NewTimestampParser("rfc3339", "15:04:05 "), + val: "2101-01-01T00:00:00Z", + want: "", + }, + { + name: "year 1970 accepted", + parser: NewTimestampParser("rfc3339", "2006 "), + val: "1970-01-01T00:00:00Z", + want: "1970 ", + }, + { + name: "year 2100 accepted", + parser: NewTimestampParser("rfc3339", "2006 "), + val: "2100-12-31T23:59:59Z", + want: "2100 ", + }, +} + +func TestTimestampParse(t *testing.T) { + t.Parallel() + + for _, tt := range timestampParseTests { + t.Run(tt.name, func(t *testing.T) { + got := tt.parser.Parse(tt.val) + if got != tt.want { + t.Errorf("Parse(%v) = %q, want %q", tt.val, got, tt.want) + } + }) + } +} + +type timestampParseUnixTest struct { + name string + parser *TimestampParser + val any + sec int64 + layout string +} + +var timestampParseUnixTests = []timestampParseUnixTest{ + { + name: "Unix seconds float", + parser: NewTimestampParser("unix-s", "15:04:05 "), + val: float64(1705312200), + sec: 1705312200, + layout: "15:04:05 ", + }, + { + name: "Unix seconds string", + parser: NewTimestampParser("unix-s", "15:04:05 "), + val: "1705312200", + sec: 1705312200, + layout: "15:04:05 ", + }, + { + name: "Unix milliseconds float", + parser: NewTimestampParser("unix-ms", "15:04:05 "), + val: float64(1705312200000), + sec: 1705312200, + layout: "15:04:05 ", + }, + { + name: "Unix milliseconds string", + parser: NewTimestampParser("unix-ms", "15:04:05 "), + val: "1705312200000", + sec: 1705312200, + layout: "15:04:05 ", + }, +} + +func TestTimestampParseUnix(t *testing.T) { + t.Parallel() + + for _, tt := range timestampParseUnixTests { + t.Run(tt.name, func(t *testing.T) { + got := tt.parser.Parse(tt.val) + want := unixExpected(tt.sec, tt.layout) + if got != want { + t.Errorf("Parse(%v) = %q, want %q", tt.val, got, want) + } + }) + } +} + +func TestTimestampParserLockOnFirstSuccess(t *testing.T) { + t.Parallel() + + tp := NewTimestampParser("", "15:04:05 ") + + // First call with RFC 3339 should lock to that format + result := tp.Parse("2024-01-15T10:30:00Z") + if result != "10:30:00 " { + t.Fatalf("first Parse() = %q, want %q", result, "10:30:00 ") + } + if !tp.locked { + t.Fatal("parser should be locked after first success") + } + if tp.format != tsFormatRFC3339 { + t.Fatalf("locked format = %d, want %d (rfc3339)", tp.format, tsFormatRFC3339) + } + + // Subsequent call with a unix timestamp should fail because format is locked + result = tp.Parse(float64(1705312200)) + if result != "" { + t.Errorf("Parse(unix) after RFC3339 lock = %q, want empty", result) + } + + // RFC 3339 still works + result = tp.Parse("2024-01-15T11:00:00Z") + if result != "11:00:00 " { + t.Errorf("Parse(rfc3339) after lock = %q, want %q", result, "11:00:00 ") + } +} + +func TestTimestampParserAutoDetectUnixFirst(t *testing.T) { + t.Parallel() + + tp := NewTimestampParser("", "15:04:05 ") + + // Unix seconds should auto-detect and lock + result := tp.Parse(float64(1705312200)) + if result == "" { + t.Fatal("Parse(unix-s) should succeed on auto-detect") + } + if tp.format != tsFormatUnixS { + t.Fatalf("locked format = %d, want %d (unix-s)", tp.format, tsFormatUnixS) + } + + // RFC 3339 should now fail + result = tp.Parse("2024-01-15T10:30:00Z") + if result != "" { + t.Errorf("Parse(rfc3339) after unix lock = %q, want empty", result) + } +} diff --git a/pkg/cg/writer.go b/pkg/cg/writer.go index 29fc88b..e4d1d3a 100644 --- a/pkg/cg/writer.go +++ b/pkg/cg/writer.go @@ -28,6 +28,12 @@ type AnnotatedWriter struct { mu sync.Mutex dest io.Writer prefix PrefixFunc + proc LineProcessor +} + +// SetProcessor sets the line processor for this writer. +func (w *AnnotatedWriter) SetProcessor(proc LineProcessor) { + w.proc = proc } // NewAnnotatedWriter creates an AnnotatedWriter that writes to dest, calling @@ -92,22 +98,38 @@ func (w *AnnotatedWriter) WriteLines(r io.Reader, ind Indicator) error { if err != nil { if err == io.EOF { if len(line) > 0 { - return w.WritePartialLine(ind, string(line)) + prefix, display := w.processLine(string(line)) + if prefix != "" { + return w.WritePartialLineWithPrefix(prefix, ind, display) + } + return w.WritePartialLine(ind, display) } return nil } return err } - if line[len(line)-1] == '\n' { - if werr := w.WriteLine(ind, string(line[:len(line)-1])); werr != nil { + raw := string(line[:len(line)-1]) + prefix, display := w.processLine(raw) + if prefix != "" { + if werr := w.WriteLineWithPrefix(prefix, ind, display); werr != nil { + return werr + } + } else { + if werr := w.WriteLine(ind, display); werr != nil { return werr } - continue } + } +} - if werr := w.WritePartialLine(ind, string(line)); werr != nil { - return werr - } +func (w *AnnotatedWriter) processLine(line string) (prefix string, display string) { + if w.proc == nil { + return "", line + } + result := w.proc(line) + if result == nil { + return "", line } + return result.Prefix, result.Line } From e7a10e3b639c77f08f58125a32800a47f069a7d3 Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Tue, 24 Feb 2026 23:29:32 -0800 Subject: [PATCH 07/10] pkg/cg: add logfmt structured log extraction --- pkg/cg/cg.go | 8 +- pkg/cg/logparse_logfmt.go | 128 +++++++++++++ pkg/cg/logparse_logfmt_test.go | 334 +++++++++++++++++++++++++++++++++ pkg/cg/runner.go | 16 +- 4 files changed, 479 insertions(+), 7 deletions(-) create mode 100644 pkg/cg/logparse_logfmt.go create mode 100644 pkg/cg/logparse_logfmt_test.go diff --git a/pkg/cg/cg.go b/pkg/cg/cg.go index 67670fc..aac49e0 100644 --- a/pkg/cg/cg.go +++ b/pkg/cg/cg.go @@ -39,7 +39,7 @@ func NewCommand() *cobra.Command { c.Flags().StringVar(&opts.Format, "format", DefaultFormat, "time prefix format (Go time.Format layout)") c.Flags().BoolVar(&opts.Capture, "capture", false, "capture child output to temporary files") c.Flags().BoolVar(&opts.Buffered, "buffered", false, "defer child output until command finishes, grouped by stream") - c.Flags().StringVar(&opts.LogParse, "log-parse", "", "log line parser (\"json\")") + c.Flags().StringVar(&opts.LogParse, "log-parse", "", "log line parser (\"json\", \"logfmt\")") c.Flags().StringVar(&opts.LogMsgKey, "log-message-key", "message", "JSON key for the log message") c.Flags().StringVar(&opts.LogTSKey, "log-timestamp-key", "timestamp", "JSON key for the timestamp (empty to disable)") c.Flags().StringVar(&opts.LogTSFmt, "log-timestamp-format", "", "timestamp format: rfc3339, unix-s, unix-ms (empty for auto-detect)") @@ -66,8 +66,10 @@ func (opts *Options) validateFlags(cmd *cobra.Command) error { return nil } - if opts.LogParse != "json" { - return fmt.Errorf("unsupported --log-parse value: %q (supported: json)", opts.LogParse) + switch opts.LogParse { + case "json", "logfmt": + default: + return fmt.Errorf("unsupported --log-parse value: %q (supported: json, logfmt)", opts.LogParse) } switch opts.LogTSFmt { diff --git a/pkg/cg/logparse_logfmt.go b/pkg/cg/logparse_logfmt.go new file mode 100644 index 0000000..db57676 --- /dev/null +++ b/pkg/cg/logparse_logfmt.go @@ -0,0 +1,128 @@ +package cg + +// parseLogfmt parses a logfmt-encoded line into key-value pairs. Returns nil +// if no pairs are found. +func parseLogfmt(line string) map[string]any { + m := make(map[string]any) + i := 0 + n := len(line) + + for i < n { + // Skip whitespace between pairs + for i < n && line[i] == ' ' { + i++ + } + if i >= n { + break + } + + // Parse key + keyStart := i + for i < n && line[i] != '=' && line[i] != ' ' { + i++ + } + if i == keyStart { + break + } + key := line[keyStart:i] + + if i >= n || line[i] == ' ' { + // Bare key (no '='), treat as boolean true + m[key] = true + continue + } + + // Skip '=' + i++ + + if i >= n || line[i] == ' ' { + // key= with no value, treat as empty string + m[key] = "" + continue + } + + if line[i] == '"' { + // Quoted value + i++ // skip opening quote + var val []byte + for i < n && line[i] != '"' { + if line[i] == '\\' && i+1 < n { + switch line[i+1] { + case '"', '\\': + val = append(val, line[i+1]) + i += 2 + continue + } + } + val = append(val, line[i]) + i++ + } + if i < n { + i++ // skip closing quote + } + m[key] = string(val) + } else { + // Unquoted value + valStart := i + for i < n && line[i] != ' ' { + i++ + } + m[key] = line[valStart:i] + } + } + + if len(m) == 0 { + return nil + } + return m +} + +// LogfmtProcessorOptions configures the logfmt log line processor. +type LogfmtProcessorOptions struct { + MessageKey string + TimestampKey string + TimestampFmt string + Fields []string + Format string +} + +// NewLogfmtProcessor returns a LineProcessor that extracts the message from +// logfmt log lines. Non-logfmt lines and lines missing the message key pass +// through unchanged. +func NewLogfmtProcessor(opts LogfmtProcessorOptions) LineProcessor { + var tsParser *TimestampParser + if opts.TimestampKey != "" { + tsParser = NewTimestampParser(opts.TimestampFmt, opts.Format) + } + + return func(line string) *ProcessedLine { + obj := parseLogfmt(line) + if obj == nil { + return nil + } + + msgVal, ok := obj[opts.MessageKey] + if !ok { + return nil + } + + msg := stringify(msgVal) + + var suffix string + if len(opts.Fields) > 0 { + suffix = formatFields(obj, opts.Fields, opts.MessageKey, opts.TimestampKey) + } + + var prefix string + if tsParser != nil { + if tsVal, ok := obj[opts.TimestampKey]; ok { + prefix = tsParser.Parse(tsVal) + } + } + + return &ProcessedLine{ + Prefix: prefix, + Line: msg + suffix, + } + } +} diff --git a/pkg/cg/logparse_logfmt_test.go b/pkg/cg/logparse_logfmt_test.go new file mode 100644 index 0000000..2407013 --- /dev/null +++ b/pkg/cg/logparse_logfmt_test.go @@ -0,0 +1,334 @@ +package cg + +import ( + "errors" + "strings" + "testing" +) + +type parseLogfmtTest struct { + name string + line string + want map[string]any +} + +var parseLogfmtTests = []parseLogfmtTest{ + { + name: "simple pairs", + line: `level=info message=hello`, + want: map[string]any{"level": "info", "message": "hello"}, + }, + { + name: "quoted value", + line: `message="hello world" level=info`, + want: map[string]any{"message": "hello world", "level": "info"}, + }, + { + name: "escaped quote in value", + line: `message="say \"hi\"" level=info`, + want: map[string]any{"message": `say "hi"`, "level": "info"}, + }, + { + name: "escaped backslash in value", + line: `path="C:\\Users\\foo" level=info`, + want: map[string]any{"path": `C:\Users\foo`, "level": "info"}, + }, + { + name: "empty value", + line: `key= other=val`, + want: map[string]any{"key": "", "other": "val"}, + }, + { + name: "bare key", + line: `debug level=info`, + want: map[string]any{"debug": true, "level": "info"}, + }, + { + name: "empty line", + line: "", + want: nil, + }, + { + name: "whitespace only", + line: " ", + want: nil, + }, + { + name: "mixed quoting", + line: `level=info message="hello world" host=srv1`, + want: map[string]any{"level": "info", "message": "hello world", "host": "srv1"}, + }, + { + name: "unicode value", + line: `message="こんにちは" level=info`, + want: map[string]any{"message": "こんにちは", "level": "info"}, + }, + { + name: "trailing whitespace", + line: `level=info message=hello `, + want: map[string]any{"level": "info", "message": "hello"}, + }, + { + name: "leading whitespace", + line: ` level=info message=hello`, + want: map[string]any{"level": "info", "message": "hello"}, + }, + { + name: "multiple spaces between pairs", + line: `level=info message=hello host=srv1`, + want: map[string]any{"level": "info", "message": "hello", "host": "srv1"}, + }, + { + name: "empty quoted value", + line: `message="" level=info`, + want: map[string]any{"message": "", "level": "info"}, + }, + { + name: "bare key at end", + line: `level=info debug`, + want: map[string]any{"level": "info", "debug": true}, + }, + { + name: "multiple bare keys", + line: `debug verbose`, + want: map[string]any{"debug": true, "verbose": true}, + }, +} + +func TestParseLogfmt(t *testing.T) { + t.Parallel() + + for _, tt := range parseLogfmtTests { + t.Run(tt.name, func(t *testing.T) { + got := parseLogfmt(tt.line) + if tt.want == nil { + if got != nil { + t.Errorf("parseLogfmt(%q) = %v, want nil", tt.line, got) + } + return + } + if got == nil { + t.Fatalf("parseLogfmt(%q) = nil, want %v", tt.line, tt.want) + } + if len(got) != len(tt.want) { + t.Fatalf("parseLogfmt(%q) has %d keys, want %d: got=%v", tt.line, len(got), len(tt.want), got) + } + for k, wantV := range tt.want { + gotV, ok := got[k] + if !ok { + t.Errorf("parseLogfmt(%q) missing key %q", tt.line, k) + continue + } + if gotV != wantV { + t.Errorf("parseLogfmt(%q)[%q] = %v (%T), want %v (%T)", tt.line, k, gotV, gotV, wantV, wantV) + } + } + }) + } +} + +type logfmtProcessorTest struct { + name string + opts LogfmtProcessorOptions + line string + want *ProcessedLine +} + +var logfmtProcessorTests = []logfmtProcessorTest{ + { + name: "message extraction", + opts: LogfmtProcessorOptions{MessageKey: "message"}, + line: `message=hello level=info`, + want: &ProcessedLine{Line: "hello"}, + }, + { + name: "quoted message extraction", + opts: LogfmtProcessorOptions{MessageKey: "message"}, + line: `message="hello world" level=info`, + want: &ProcessedLine{Line: "hello world"}, + }, + { + name: "missing message key", + opts: LogfmtProcessorOptions{MessageKey: "message"}, + line: `msg=hello level=info`, + want: nil, + }, + { + name: "non-logfmt passthrough", + opts: LogfmtProcessorOptions{MessageKey: "message"}, + line: "this is just plain text without any key=value pairs... or is it?", + want: nil, + }, + { + name: "custom message key", + opts: LogfmtProcessorOptions{MessageKey: "msg"}, + line: `msg="custom key"`, + want: &ProcessedLine{Line: "custom key"}, + }, + { + name: "field selection", + opts: LogfmtProcessorOptions{ + MessageKey: "message", + Fields: []string{"level"}, + }, + line: `message=hello level=info host=srv1`, + want: &ProcessedLine{Line: "hello level=info"}, + }, + { + name: "wildcard fields", + opts: LogfmtProcessorOptions{ + MessageKey: "message", + Fields: []string{"*"}, + }, + line: `message=hello level=info host=srv1`, + want: &ProcessedLine{Line: "hello host=srv1 level=info"}, + }, + { + name: "timestamp extraction", + opts: LogfmtProcessorOptions{ + MessageKey: "message", + TimestampKey: "timestamp", + Format: "15:04:05 ", + }, + line: `message=hello timestamp=2024-01-15T10:30:00Z`, + want: &ProcessedLine{Prefix: "10:30:00 ", Line: "hello"}, + }, + { + name: "timestamp with fields", + opts: LogfmtProcessorOptions{ + MessageKey: "message", + TimestampKey: "ts", + Format: "15:04:05 ", + Fields: []string{"level"}, + }, + line: `message=hello ts=2024-01-15T10:30:00Z level=warn`, + want: &ProcessedLine{Prefix: "10:30:00 ", Line: "hello level=warn"}, + }, + { + name: "empty line passthrough", + opts: LogfmtProcessorOptions{MessageKey: "message"}, + line: "", + want: nil, + }, +} + +func TestNewLogfmtProcessor(t *testing.T) { + t.Parallel() + + for _, tt := range logfmtProcessorTests { + t.Run(tt.name, func(t *testing.T) { + proc := NewLogfmtProcessor(tt.opts) + got := proc(tt.line) + + if tt.want == nil { + if got != nil { + t.Errorf("processor returned %+v, want nil", got) + } + return + } + if got == nil { + t.Fatalf("processor returned nil, want %+v", tt.want) + } + if got.Line != tt.want.Line { + t.Errorf("Line = %q, want %q", got.Line, tt.want.Line) + } + if got.Prefix != tt.want.Prefix { + t.Errorf("Prefix = %q, want %q", got.Prefix, tt.want.Prefix) + } + }) + } +} + +func TestCommandLogParseLogfmt(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "logfmt", + "--", + "sh", "-c", `echo 'message=hello level=info'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: hello\n") { + t.Errorf("output missing parsed message, got: %q", out) + } + for _, line := range strings.Split(out, "\n") { + if strings.Contains(line, "O: ") && strings.Contains(line, "level=info") && !strings.Contains(line, "hello") { + t.Errorf("output line should not contain raw logfmt: %q", line) + } + } +} + +func TestCommandLogParseLogfmtWithFields(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "logfmt", + "--log-fields", "level", + "--", + "sh", "-c", `echo 'message=hello level=info'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: hello level=info") { + t.Errorf("output missing parsed message with fields, got: %q", out) + } +} + +func TestCommandLogParseLogfmtBuffered(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + out, err := runCgCommand( + "--format", "T ", + "--log-parse", "logfmt", + "--buffered", + "--", + "sh", "-c", `echo 'message="buffered msg" level=info'`, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !strings.Contains(out, "O: buffered msg") { + t.Errorf("output missing parsed message in buffered mode, got: %q", out) + } + if !strings.Contains(out, "I: --- stdout ---") { + t.Errorf("missing stdout header in buffered mode, got: %q", out) + } +} + +func TestCommandLogParseLogfmtInvalidValue(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test") + } + + _, err := runCgCommand( + "--format", "T ", + "--log-parse", "yaml", + "--", + "echo", "hi", + ) + if err == nil { + t.Fatal("expected error for unsupported --log-parse value") + } + + var exitErr *ExitError + if !errors.As(err, &exitErr) { + t.Fatalf("expected *ExitError, got %T: %v", err, err) + } + if exitErr.Code != 2 { + t.Errorf("exit code = %d, want 2", exitErr.Code) + } +} diff --git a/pkg/cg/runner.go b/pkg/cg/runner.go index 9331779..56e9611 100644 --- a/pkg/cg/runner.go +++ b/pkg/cg/runner.go @@ -26,15 +26,23 @@ func (opts *Options) run(cmd *cobra.Command, args []string) error { } w := NewAnnotatedWriter(cmd.OutOrStdout(), prefix) - if opts.LogParse != "" { - proc := NewJSONProcessor(JSONProcessorOptions{ + switch opts.LogParse { + case "json": + w.SetProcessor(NewJSONProcessor(JSONProcessorOptions{ MessageKey: opts.LogMsgKey, TimestampKey: opts.LogTSKey, TimestampFmt: opts.LogTSFmt, Fields: parseFieldsFlag(opts.LogFields), Format: opts.Format, - }) - w.SetProcessor(proc) + })) + case "logfmt": + w.SetProcessor(NewLogfmtProcessor(LogfmtProcessorOptions{ + MessageKey: opts.LogMsgKey, + TimestampKey: opts.LogTSKey, + TimestampFmt: opts.LogTSFmt, + Fields: parseFieldsFlag(opts.LogFields), + Format: opts.Format, + })) } var buf *LineBuffer From 306fe07c8286f74daa4c34a3a42039181c89c054 Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Wed, 25 Feb 2026 00:36:43 -0800 Subject: [PATCH 08/10] pkg/cg: add testscript-based integration tests Exercise the compiled binary end-to-end using txtar scripts. --- go.mod | 1 + go.sum | 20 ++------- pkg/cg/testdata/basic_stdout.txtar | 3 ++ pkg/cg/testdata/buffered.txtar | 7 ++++ pkg/cg/testdata/capture.txtar | 6 +++ pkg/cg/testdata/command_not_found.txtar | 2 + pkg/cg/testdata/custom_format.txtar | 2 + pkg/cg/testdata/exit_code.txtar | 2 + pkg/cg/testdata/log_parse_json.txtar | 9 ++++ pkg/cg/testdata/log_parse_logfmt.txtar | 9 ++++ pkg/cg/testdata/stderr.txtar | 3 ++ pkg/cg/testscript_test.go | 56 +++++++++++++++++++++++++ 12 files changed, 103 insertions(+), 17 deletions(-) create mode 100644 pkg/cg/testdata/basic_stdout.txtar create mode 100644 pkg/cg/testdata/buffered.txtar create mode 100644 pkg/cg/testdata/capture.txtar create mode 100644 pkg/cg/testdata/command_not_found.txtar create mode 100644 pkg/cg/testdata/custom_format.txtar create mode 100644 pkg/cg/testdata/exit_code.txtar create mode 100644 pkg/cg/testdata/log_parse_json.txtar create mode 100644 pkg/cg/testdata/log_parse_logfmt.txtar create mode 100644 pkg/cg/testdata/stderr.txtar create mode 100644 pkg/cg/testscript_test.go diff --git a/go.mod b/go.mod index 4d1cb6c..950876b 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/ripta/hypercmd v0.5.0 github.com/ripta/reals v0.0.0-20251220032726-c99f163d5c5c github.com/ripta/unihan v0.0.0-20250404091138-c307c698a880 + github.com/rogpeppe/go-internal v1.14.1 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 github.com/stretchr/testify v1.11.1 diff --git a/go.sum b/go.sum index 478dfac..5eac90e 100644 --- a/go.sum +++ b/go.sum @@ -47,8 +47,6 @@ github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHP github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= -github.com/mattn/go-runewidth v0.0.16 h1:E5ScNMtiwvlvB5paMFdw9p4kSQzbXFikJ5SQO6TULQc= -github.com/mattn/go-runewidth v0.0.16/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-runewidth v0.0.20 h1:WcT52H91ZUAwy8+HUkdM3THM6gXqXuLJi9O3rjcQQaQ= github.com/mattn/go-runewidth v0.0.20/go.mod h1:XBkDxAl56ILZc9knddidhrOlY5R/pDhgLpndooCuJAs= github.com/mattn/go-tty v0.0.7 h1:KJ486B6qI8+wBO7kQxYgmmEFDaFEE96JMBQ7h400N8Q= @@ -69,15 +67,14 @@ github.com/r3labs/diff/v3 v3.0.2 h1:yVuxAY1V6MeM4+HNur92xkS39kB/N+cFi2hMkY06BbA= github.com/r3labs/diff/v3 v3.0.2/go.mod h1:Cy542hv0BAEmhDYWtGxXRQ4kqRsVIcEjG9gChUlTmkw= github.com/ripta/hypercmd v0.5.0 h1:8wEZndeP/umK8xLgZD1aYOIsdWsxymweJSETnbF1Awo= github.com/ripta/hypercmd v0.5.0/go.mod h1:nffU7nnFN8yU/PIHbN35UCE5q0FSnDJ6ev45SFEIZ48= -github.com/ripta/reals v0.0.0-20251129121815-4fa2f223ded2 h1:QWeZ/uw8S951/qJQzg+wBAOpFhUx7yVJxyPRZdjJmuI= -github.com/ripta/reals v0.0.0-20251129121815-4fa2f223ded2/go.mod h1:WErCt40puDDQdpVq8Hg1DzjB0svufA8WboSYG4BI2+E= github.com/ripta/reals v0.0.0-20251220032726-c99f163d5c5c h1:4bBR+jNoWIs1roinlXrVDUtmSvqjtNbrJ3cuQtFci5g= github.com/ripta/reals v0.0.0-20251220032726-c99f163d5c5c/go.mod h1:WErCt40puDDQdpVq8Hg1DzjB0svufA8WboSYG4BI2+E= github.com/ripta/unihan v0.0.0-20250404091138-c307c698a880 h1:ZzDUYlZP/LHJmkh+PtgRZHEKa+eNVefq6YR8BnUCQ2I= github.com/ripta/unihan v0.0.0-20250404091138-c307c698a880/go.mod h1:ZLBfCas48lym/27GOsyFjRo7OGejoGHzOTdUdoRtDqU= -github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= +github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= @@ -102,17 +99,12 @@ go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= -golang.org/x/exp v0.0.0-20250911091902-df9299821621 h1:2id6c1/gto0kaHYyrixvknJ8tUK/Qs5IsmBtrc+FtgU= -golang.org/x/exp v0.0.0-20250911091902-df9299821621/go.mod h1:TwQYMMnGpvZyc+JpB/UAuTNIsVJifOlSkrZkhcvpVUk= golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa h1:Zt3DZoOFFYkKhDT3v7Lm9FDMEV06GpzjG2jrqW+QTE0= golang.org/x/exp v0.0.0-20260218203240-3dfff04db8fa/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA= -golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= -golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= golang.org/x/mod v0.33.0 h1:tHFzIWbBifEmbwtGz65eaWyGiGZatSrT9prnU8DbVL8= golang.org/x/mod v0.33.0/go.mod h1:swjeQEj+6r7fODbD2cqrnje9PnziFuw4bmLbBZFrQ5w= -golang.org/x/net v0.49.0 h1:eeHFmOGUTtaaPSGNmjBKpbng9MulQsJURQUAfUwY++o= -golang.org/x/net v0.49.0/go.mod h1:/ysNB2EvaqvesRkuLAyjI1ycPZlQHM3q01F02UY/MV8= golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20200909081042-eff7692f9009/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -126,16 +118,10 @@ golang.org/x/text v0.34.0 h1:oL/Qq0Kdaqxa1KbNeMKwQq0reLCCaFtqu2eNuSeNHbk= golang.org/x/text v0.34.0/go.mod h1:homfLqTYRFyVYemLBFl5GgL/DWEiH5wcsQ5gSh1yziA= golang.org/x/time v0.14.0 h1:MRx4UaLrDotUKUdCIqzPC48t1Y9hANFKIRpNx+Te8PI= golang.org/x/time v0.14.0/go.mod h1:eL/Oa2bBBK0TkX57Fyni+NgnyQQN4LitPmob2Hjnqw4= -golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= -golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= golang.org/x/tools v0.42.0 h1:uNgphsn75Tdz5Ji2q36v/nsFSfR/9BRFvqhGBaJGd5k= golang.org/x/tools v0.42.0/go.mod h1:Ma6lCIwGZvHK6XtgbswSoWroEkhugApmsXyrUmBhfr0= -google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9 h1:jm6v6kMRpTYKxBRrDkYAitNJegUeO1Mf3Kt80obv0gg= -google.golang.org/genproto/googleapis/api v0.0.0-20250922171735-9219d122eba9/go.mod h1:LmwNphe5Afor5V3R5BppOULHOnt2mCIf+NxMd4XiygE= google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d h1:EocjzKLywydp5uZ5tJ79iP6Q0UjDnyiHkGRWxuPBP8s= google.golang.org/genproto/googleapis/api v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:48U2I+QQUYhsFrg2SY6r+nJzeOtjey7j//WBESw+qyQ= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9 h1:V1jCN2HBa8sySkR5vLcCSqJSTMv093Rw9EJefhQGP7M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250922171735-9219d122eba9/go.mod h1:HSkG/KdJWusxU1F6CNrwNDjBMgisKxGnc5dAZfT0mjQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d h1:t/LOSXPJ9R0B6fnZNyALBRfZBH0Uy0gT+uR+SJ6syqQ= google.golang.org/genproto/googleapis/rpc v0.0.0-20260217215200-42d3e9bedb6d/go.mod h1:4Hqkh8ycfw05ld/3BWL7rJOSfebL2Q+DVDeRgYgxUU8= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= diff --git a/pkg/cg/testdata/basic_stdout.txtar b/pkg/cg/testdata/basic_stdout.txtar new file mode 100644 index 0000000..edf92f9 --- /dev/null +++ b/pkg/cg/testdata/basic_stdout.txtar @@ -0,0 +1,3 @@ +exec cg --format 'T ' -- echo hello +stdout 'T O: hello' +stdout 'T I: Finished with exitcode 0' diff --git a/pkg/cg/testdata/buffered.txtar b/pkg/cg/testdata/buffered.txtar new file mode 100644 index 0000000..b20eec0 --- /dev/null +++ b/pkg/cg/testdata/buffered.txtar @@ -0,0 +1,7 @@ +exec cg --format 'T ' --buffered -- sh -c 'echo out; echo err >&2' +stdout 'T I: buffered mode, output deferred' +stdout 'T I: --- stdout ---' +stdout 'T O: out' +stdout 'T I: --- stderr ---' +stdout 'T E: err' +stdout 'T I: Finished with exitcode 0' diff --git a/pkg/cg/testdata/capture.txtar b/pkg/cg/testdata/capture.txtar new file mode 100644 index 0000000..b6adf41 --- /dev/null +++ b/pkg/cg/testdata/capture.txtar @@ -0,0 +1,6 @@ +exec cg --format 'T ' --capture -- echo hello +stdout 'T I: capture\.stdout=' +stdout 'T I: capture\.stderr=' +stdout 'T I: capture\.lifecycle=' +! stdout 'T O: hello' +stdout 'T I: Finished with exitcode 0' diff --git a/pkg/cg/testdata/command_not_found.txtar b/pkg/cg/testdata/command_not_found.txtar new file mode 100644 index 0000000..34e6d49 --- /dev/null +++ b/pkg/cg/testdata/command_not_found.txtar @@ -0,0 +1,2 @@ +! exec cg --format 'T ' -- __nonexistent_command_for_cg_test__ +stdout 'T I: Finished with exitcode 127' diff --git a/pkg/cg/testdata/custom_format.txtar b/pkg/cg/testdata/custom_format.txtar new file mode 100644 index 0000000..850b084 --- /dev/null +++ b/pkg/cg/testdata/custom_format.txtar @@ -0,0 +1,2 @@ +exec cg --format '2006-01-02 ' -- echo test +stdout '[0-9]{4}-[0-9]{2}-[0-9]{2} O: test' diff --git a/pkg/cg/testdata/exit_code.txtar b/pkg/cg/testdata/exit_code.txtar new file mode 100644 index 0000000..58c48a6 --- /dev/null +++ b/pkg/cg/testdata/exit_code.txtar @@ -0,0 +1,2 @@ +! exec cg --format 'T ' -- sh -c 'exit 42' +stdout 'T I: Finished with exitcode 42' diff --git a/pkg/cg/testdata/log_parse_json.txtar b/pkg/cg/testdata/log_parse_json.txtar new file mode 100644 index 0000000..2306e2b --- /dev/null +++ b/pkg/cg/testdata/log_parse_json.txtar @@ -0,0 +1,9 @@ +# Message extraction from JSON log lines +exec cg --log-parse json --format 'T ' -- emit-json-log +stdout 'T O: server started' +stdout 'T O: listening on port' +! stdout '"message"' + +# Field extraction appends key=value pairs +exec cg --log-parse json --format 'T ' --log-fields port -- emit-json-log +stdout 'O: server started port=8080' diff --git a/pkg/cg/testdata/log_parse_logfmt.txtar b/pkg/cg/testdata/log_parse_logfmt.txtar new file mode 100644 index 0000000..dbc635a --- /dev/null +++ b/pkg/cg/testdata/log_parse_logfmt.txtar @@ -0,0 +1,9 @@ +# Message extraction from logfmt log lines +exec cg --log-parse logfmt --format 'T ' -- emit-logfmt-log +stdout 'T O: server started' +stdout 'T O: listening on port' +! stdout 'message=' + +# Field extraction appends key=value pairs +exec cg --log-parse logfmt --format 'T ' --log-fields port -- emit-logfmt-log +stdout 'O: server started port=8080' diff --git a/pkg/cg/testdata/stderr.txtar b/pkg/cg/testdata/stderr.txtar new file mode 100644 index 0000000..9e71bd0 --- /dev/null +++ b/pkg/cg/testdata/stderr.txtar @@ -0,0 +1,3 @@ +exec cg --format 'T ' -- sh -c 'echo out; echo err >&2' +stdout 'T O: out' +stdout 'T E: err' diff --git a/pkg/cg/testscript_test.go b/pkg/cg/testscript_test.go new file mode 100644 index 0000000..fb9164a --- /dev/null +++ b/pkg/cg/testscript_test.go @@ -0,0 +1,56 @@ +package cg + +import ( + "errors" + "fmt" + "os" + "testing" + + "github.com/rogpeppe/go-internal/testscript" +) + +func TestMain(m *testing.M) { + os.Exit(testscript.RunMain(m, map[string]func() int{ + "cg": cgMain, + "emit-json-log": emitJSONLog, + "emit-logfmt-log": emitLogfmtLog, + })) +} + +func cgMain() int { + cmd := NewCommand() + err := cmd.Execute() + if err == nil { + return 0 + } + + var exitErr *ExitError + if errors.As(err, &exitErr) { + return exitErr.Code + } + + fmt.Fprintf(os.Stderr, "Error: %+v\n", err) + return 1 +} + +func emitJSONLog() int { + fmt.Println(`{"timestamp":"2025-01-15T10:30:00Z","level":"info","message":"server started","port":8080}`) + fmt.Println(`{"timestamp":"2025-01-15T10:30:01Z","level":"info","message":"listening on port","port":8080}`) + return 0 +} + +func emitLogfmtLog() int { + fmt.Println(`timestamp=2025-01-15T10:30:00Z level=info message="server started" port=8080`) + fmt.Println(`timestamp=2025-01-15T10:30:01Z level=info message="listening on port" port=8080`) + return 0 +} + +func TestCgScript(t *testing.T) { + if testing.Short() { + t.Skip("skipping testscript tests") + } + + testscript.Run(t, testscript.Params{ + Dir: "testdata", + }) +} From 2f28aa7c8b74b497818b330c33cc4022282d5114 Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Wed, 25 Feb 2026 00:48:42 -0800 Subject: [PATCH 09/10] README: add note about `cg --capture` and `--buffered` --- README.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b8f53bc..b98b786 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Run a command and annotate each line of its stdout and stderr with a timestamp and stream indicator (`O` for stdout, `E` for stderr, `I` for cg's own lifecycle messages). -Acts like `annotate-output` script. +Acts like the `annotate-output` script; `cg` is short for command guard. ``` go install github.com/ripta/rt/cmd/cg@latest @@ -91,7 +91,8 @@ The child's exit code is propagated: 42 ``` -Use `--format` to change the timestamp prefix. It takes the golang `time.Format` layout: +Use `--format` to change the timestamp prefix. It takes the golang +`time.Format` layout: ``` ❯ cg --format '2006-01-02T15:04:05 ' -- echo hello @@ -104,6 +105,10 @@ Use `--format` to change the timestamp prefix. It takes the golang `time.Format` Signals SIGINT and SIGTERM are forwarded to the child process. +`cg` also supports `--capture` to capture the child's stdout and stderr into +separate files, and `--buffered` to buffer the child's output and print it all +at once when the child finishes, instead of streaming it in real time. + `enc` ---- From 40706c91e6fdc2115e1617b03283a6311470d80f Mon Sep 17 00:00:00 2001 From: Ripta Pasay Date: Wed, 25 Feb 2026 00:57:08 -0800 Subject: [PATCH 10/10] goreleaser: not really defaults no more --- .goreleaser.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.goreleaser.yaml b/.goreleaser.yaml index 9df5d31..33cf2c5 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -1,4 +1,3 @@ -# This is an example .goreleaser.yml file with some sensible defaults. # Make sure to check the documentation at https://goreleaser.com # # yaml-language-server: $schema=https://goreleaser.com/static/schema.json