From f7196efe4d64ca53ed79f67af121770b3a91fadd Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 10 Nov 2025 16:29:05 -0800 Subject: [PATCH 1/3] capture go build / go mod tidy output to outputcapture --- pkg/util/utilfn/streamtolines.go | 29 ++++++++ tsunami/build/build.go | 38 +++++++++-- tsunami/util/streamtolines.go | 114 +++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 7 deletions(-) create mode 100644 tsunami/util/streamtolines.go diff --git a/pkg/util/utilfn/streamtolines.go b/pkg/util/utilfn/streamtolines.go index 39f2adcc1e..7cfabf4cf2 100644 --- a/pkg/util/utilfn/streamtolines.go +++ b/pkg/util/utilfn/streamtolines.go @@ -83,3 +83,32 @@ func StreamToLinesChan(input io.Reader) chan LineOutput { }() return ch } + +// LineWriter is an io.Writer that processes data line-by-line via a callback. +// Lines do not include the trailing newline. Lines longer than maxLineLength are dropped. +type LineWriter struct { + lineBuf lineBuf + lineFn func([]byte) +} + +// NewLineWriter creates a new LineWriter with the given callback function. +func NewLineWriter(lineFn func([]byte)) *LineWriter { + return &LineWriter{ + lineFn: lineFn, + } +} + +// Write implements io.Writer, processing the data and calling the callback for each complete line. +func (lw *LineWriter) Write(p []byte) (n int, err error) { + streamToLines_processBuf(&lw.lineBuf, p, lw.lineFn) + return len(p), nil +} + +// Flush outputs any remaining buffered data as a final line. +// Should be called when the input stream is complete (e.g., at EOF). +func (lw *LineWriter) Flush() { + if len(lw.lineBuf.buf) > 0 && !lw.lineBuf.inLongLine { + lw.lineFn(lw.lineBuf.buf) + lw.lineBuf.buf = nil + } +} diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 04ed04fdcc..839a069d24 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -30,14 +30,35 @@ const MinSupportedGoMinorVersion = 22 const TsunamiUIImportPath = "github.com/wavetermdev/waveterm/tsunami/ui" type OutputCapture struct { - lock sync.Mutex - lines []string + lock sync.Mutex + lines []string + lineWriter *util.LineWriter } func MakeOutputCapture() *OutputCapture { - return &OutputCapture{ + oc := &OutputCapture{ lines: make([]string, 0), } + oc.lineWriter = util.NewLineWriter(func(line []byte) { + oc.lock.Lock() + defer oc.lock.Unlock() + oc.lines = append(oc.lines, string(line)) + }) + return oc +} + +func (oc *OutputCapture) Write(p []byte) (n int, err error) { + if oc == nil { + return os.Stdout.Write(p) + } + return oc.lineWriter.Write(p) +} + +func (oc *OutputCapture) Flush() { + if oc == nil || oc.lineWriter == nil { + return + } + oc.lineWriter.Flush() } func (oc *OutputCapture) Printf(format string, args ...interface{}) { @@ -321,8 +342,8 @@ func createGoMod(tempDir, appName, goVersion string, opts BuildOpts, verbose boo if verbose { oc.Printf("Running go mod tidy") - tidyCmd.Stdout = os.Stdout - tidyCmd.Stderr = os.Stderr + tidyCmd.Stdout = oc + tidyCmd.Stderr = oc } if err := tidyCmd.Run(); err != nil { @@ -653,8 +674,11 @@ func runGoBuild(tempDir string, opts BuildOpts) error { if opts.Verbose { oc.Printf("Running: %s", strings.Join(buildCmd.Args, " ")) - buildCmd.Stdout = os.Stdout - buildCmd.Stderr = os.Stderr + } + + if oc != nil || opts.Verbose { + buildCmd.Stdout = oc + buildCmd.Stderr = oc } if err := buildCmd.Run(); err != nil { diff --git a/tsunami/util/streamtolines.go b/tsunami/util/streamtolines.go new file mode 100644 index 0000000000..e70cac3572 --- /dev/null +++ b/tsunami/util/streamtolines.go @@ -0,0 +1,114 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package util + +import ( + "bytes" + "context" + "io" + "time" +) + +type LineOutput struct { + Line string + Error error +} + +type lineBuf struct { + buf []byte + inLongLine bool +} + +const maxLineLength = 128 * 1024 + +func ReadLineWithTimeout(ch chan LineOutput, timeout time.Duration) (string, error) { + select { + case output := <-ch: + if output.Error != nil { + return "", output.Error + } + return output.Line, nil + case <-time.After(timeout): + return "", context.DeadlineExceeded + } +} + +func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]byte)) { + for len(readBuf) > 0 { + nlIdx := bytes.IndexByte(readBuf, '\n') + if nlIdx == -1 { + if lineBuf.inLongLine || len(lineBuf.buf)+len(readBuf) > maxLineLength { + lineBuf.buf = nil + lineBuf.inLongLine = true + return + } + lineBuf.buf = append(lineBuf.buf, readBuf...) + return + } + if !lineBuf.inLongLine && len(lineBuf.buf)+nlIdx <= maxLineLength { + line := append(lineBuf.buf, readBuf[:nlIdx]...) + lineFn(line) + } + lineBuf.buf = nil + lineBuf.inLongLine = false + readBuf = readBuf[nlIdx+1:] + } +} + +func StreamToLines(input io.Reader, lineFn func([]byte)) error { + var lineBuf lineBuf + readBuf := make([]byte, 64*1024) + for { + n, err := input.Read(readBuf) + streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn) + if err != nil { + return err + } + } +} + +// starts a goroutine to drive the channel +// line output does not include the trailing newline +func StreamToLinesChan(input io.Reader) chan LineOutput { + ch := make(chan LineOutput) + go func() { + defer close(ch) + err := StreamToLines(input, func(line []byte) { + ch <- LineOutput{Line: string(line)} + }) + if err != nil && err != io.EOF { + ch <- LineOutput{Error: err} + } + }() + return ch +} + +// LineWriter is an io.Writer that processes data line-by-line via a callback. +// Lines do not include the trailing newline. Lines longer than maxLineLength are dropped. +type LineWriter struct { + lineBuf lineBuf + lineFn func([]byte) +} + +// NewLineWriter creates a new LineWriter with the given callback function. +func NewLineWriter(lineFn func([]byte)) *LineWriter { + return &LineWriter{ + lineFn: lineFn, + } +} + +// Write implements io.Writer, processing the data and calling the callback for each complete line. +func (lw *LineWriter) Write(p []byte) (n int, err error) { + streamToLines_processBuf(&lw.lineBuf, p, lw.lineFn) + return len(p), nil +} + +// Flush outputs any remaining buffered data as a final line. +// Should be called when the input stream is complete (e.g., at EOF). +func (lw *LineWriter) Flush() { + if len(lw.lineBuf.buf) > 0 && !lw.lineBuf.inLongLine { + lw.lineFn(lw.lineBuf.buf) + lw.lineBuf.buf = nil + } +} From 65c661be90fb284d454921ad3c014ff9af0e9c8d Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 10 Nov 2025 16:34:05 -0800 Subject: [PATCH 2/3] fix golangci linters --- .golangci.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index c33efd501d..e8bd47262b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,8 +3,7 @@ version: 2 linters: disable: - unused - - unusedfunc - - unusedparams + issues: exclude-rules: - linters: From 16e0c42ca0edb1f721bce77dfdde0041810f0aae Mon Sep 17 00:00:00 2001 From: sawka Date: Mon, 10 Nov 2025 17:05:00 -0800 Subject: [PATCH 3/3] fix sync and call flush --- tsunami/build/build.go | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 839a069d24..ffbfe801a9 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -40,8 +40,7 @@ func MakeOutputCapture() *OutputCapture { lines: make([]string, 0), } oc.lineWriter = util.NewLineWriter(func(line []byte) { - oc.lock.Lock() - defer oc.lock.Unlock() + // synchronized via the Write/Flush functions oc.lines = append(oc.lines, string(line)) }) return oc @@ -51,6 +50,8 @@ func (oc *OutputCapture) Write(p []byte) (n int, err error) { if oc == nil { return os.Stdout.Write(p) } + oc.lock.Lock() + defer oc.lock.Unlock() return oc.lineWriter.Write(p) } @@ -58,11 +59,13 @@ func (oc *OutputCapture) Flush() { if oc == nil || oc.lineWriter == nil { return } + oc.lock.Lock() + defer oc.lock.Unlock() oc.lineWriter.Flush() } func (oc *OutputCapture) Printf(format string, args ...interface{}) { - if oc == nil { + if oc == nil || oc.lineWriter == nil { log.Printf(format, args...) return } @@ -340,7 +343,7 @@ func createGoMod(tempDir, appName, goVersion string, opts BuildOpts, verbose boo tidyCmd := exec.Command("go", "mod", "tidy") tidyCmd.Dir = tempDir - if verbose { + if oc != nil || verbose { oc.Printf("Running go mod tidy") tidyCmd.Stdout = oc tidyCmd.Stderr = oc @@ -349,6 +352,7 @@ func createGoMod(tempDir, appName, goVersion string, opts BuildOpts, verbose boo if err := tidyCmd.Run(); err != nil { return fmt.Errorf("failed to run go mod tidy: %w", err) } + oc.Flush() if verbose { oc.Printf("Successfully ran go mod tidy") @@ -672,11 +676,8 @@ func runGoBuild(tempDir string, opts BuildOpts) error { buildCmd := exec.Command("go", args...) buildCmd.Dir = tempDir - if opts.Verbose { - oc.Printf("Running: %s", strings.Join(buildCmd.Args, " ")) - } - if oc != nil || opts.Verbose { + oc.Printf("Running: %s", strings.Join(buildCmd.Args, " ")) buildCmd.Stdout = oc buildCmd.Stderr = oc } @@ -684,6 +685,7 @@ func runGoBuild(tempDir string, opts BuildOpts) error { if err := buildCmd.Run(); err != nil { return fmt.Errorf("failed to build application: %w", err) } + oc.Flush() if opts.Verbose { if opts.OutputFile != "" {