diff --git a/.golangci.yml b/.golangci.yml index c33efd501d..e8bd47262b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,8 +3,7 @@ version: 2 linters: disable: - unused - - unusedfunc - - unusedparams + issues: exclude-rules: - linters: diff --git a/pkg/util/utilfn/streamtolines.go b/pkg/util/utilfn/streamtolines.go index 39f2adcc1e..7cfabf4cf2 100644 --- a/pkg/util/utilfn/streamtolines.go +++ b/pkg/util/utilfn/streamtolines.go @@ -83,3 +83,32 @@ func StreamToLinesChan(input io.Reader) chan LineOutput { }() return ch } + +// LineWriter is an io.Writer that processes data line-by-line via a callback. +// Lines do not include the trailing newline. Lines longer than maxLineLength are dropped. +type LineWriter struct { + lineBuf lineBuf + lineFn func([]byte) +} + +// NewLineWriter creates a new LineWriter with the given callback function. +func NewLineWriter(lineFn func([]byte)) *LineWriter { + return &LineWriter{ + lineFn: lineFn, + } +} + +// Write implements io.Writer, processing the data and calling the callback for each complete line. +func (lw *LineWriter) Write(p []byte) (n int, err error) { + streamToLines_processBuf(&lw.lineBuf, p, lw.lineFn) + return len(p), nil +} + +// Flush outputs any remaining buffered data as a final line. +// Should be called when the input stream is complete (e.g., at EOF). +func (lw *LineWriter) Flush() { + if len(lw.lineBuf.buf) > 0 && !lw.lineBuf.inLongLine { + lw.lineFn(lw.lineBuf.buf) + lw.lineBuf.buf = nil + } +} diff --git a/tsunami/build/build.go b/tsunami/build/build.go index 04ed04fdcc..ffbfe801a9 100644 --- a/tsunami/build/build.go +++ b/tsunami/build/build.go @@ -30,18 +30,42 @@ const MinSupportedGoMinorVersion = 22 const TsunamiUIImportPath = "github.com/wavetermdev/waveterm/tsunami/ui" type OutputCapture struct { - lock sync.Mutex - lines []string + lock sync.Mutex + lines []string + lineWriter *util.LineWriter } func MakeOutputCapture() *OutputCapture { - return &OutputCapture{ + oc := &OutputCapture{ lines: make([]string, 0), } + oc.lineWriter = util.NewLineWriter(func(line []byte) { + // synchronized via the Write/Flush functions + oc.lines = append(oc.lines, string(line)) + }) + return oc } -func (oc *OutputCapture) Printf(format string, args ...interface{}) { +func (oc *OutputCapture) Write(p []byte) (n int, err error) { if oc == nil { + return os.Stdout.Write(p) + } + oc.lock.Lock() + defer oc.lock.Unlock() + return oc.lineWriter.Write(p) +} + +func (oc *OutputCapture) Flush() { + if oc == nil || oc.lineWriter == nil { + return + } + oc.lock.Lock() + defer oc.lock.Unlock() + oc.lineWriter.Flush() +} + +func (oc *OutputCapture) Printf(format string, args ...interface{}) { + if oc == nil || oc.lineWriter == nil { log.Printf(format, args...) return } @@ -319,15 +343,16 @@ func createGoMod(tempDir, appName, goVersion string, opts BuildOpts, verbose boo tidyCmd := exec.Command("go", "mod", "tidy") tidyCmd.Dir = tempDir - if verbose { + if oc != nil || verbose { oc.Printf("Running go mod tidy") - tidyCmd.Stdout = os.Stdout - tidyCmd.Stderr = os.Stderr + tidyCmd.Stdout = oc + tidyCmd.Stderr = oc } if err := tidyCmd.Run(); err != nil { return fmt.Errorf("failed to run go mod tidy: %w", err) } + oc.Flush() if verbose { oc.Printf("Successfully ran go mod tidy") @@ -651,15 +676,16 @@ func runGoBuild(tempDir string, opts BuildOpts) error { buildCmd := exec.Command("go", args...) buildCmd.Dir = tempDir - if opts.Verbose { + if oc != nil || opts.Verbose { oc.Printf("Running: %s", strings.Join(buildCmd.Args, " ")) - buildCmd.Stdout = os.Stdout - buildCmd.Stderr = os.Stderr + buildCmd.Stdout = oc + buildCmd.Stderr = oc } if err := buildCmd.Run(); err != nil { return fmt.Errorf("failed to build application: %w", err) } + oc.Flush() if opts.Verbose { if opts.OutputFile != "" { diff --git a/tsunami/util/streamtolines.go b/tsunami/util/streamtolines.go new file mode 100644 index 0000000000..e70cac3572 --- /dev/null +++ b/tsunami/util/streamtolines.go @@ -0,0 +1,114 @@ +// Copyright 2025, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package util + +import ( + "bytes" + "context" + "io" + "time" +) + +type LineOutput struct { + Line string + Error error +} + +type lineBuf struct { + buf []byte + inLongLine bool +} + +const maxLineLength = 128 * 1024 + +func ReadLineWithTimeout(ch chan LineOutput, timeout time.Duration) (string, error) { + select { + case output := <-ch: + if output.Error != nil { + return "", output.Error + } + return output.Line, nil + case <-time.After(timeout): + return "", context.DeadlineExceeded + } +} + +func streamToLines_processBuf(lineBuf *lineBuf, readBuf []byte, lineFn func([]byte)) { + for len(readBuf) > 0 { + nlIdx := bytes.IndexByte(readBuf, '\n') + if nlIdx == -1 { + if lineBuf.inLongLine || len(lineBuf.buf)+len(readBuf) > maxLineLength { + lineBuf.buf = nil + lineBuf.inLongLine = true + return + } + lineBuf.buf = append(lineBuf.buf, readBuf...) + return + } + if !lineBuf.inLongLine && len(lineBuf.buf)+nlIdx <= maxLineLength { + line := append(lineBuf.buf, readBuf[:nlIdx]...) + lineFn(line) + } + lineBuf.buf = nil + lineBuf.inLongLine = false + readBuf = readBuf[nlIdx+1:] + } +} + +func StreamToLines(input io.Reader, lineFn func([]byte)) error { + var lineBuf lineBuf + readBuf := make([]byte, 64*1024) + for { + n, err := input.Read(readBuf) + streamToLines_processBuf(&lineBuf, readBuf[:n], lineFn) + if err != nil { + return err + } + } +} + +// starts a goroutine to drive the channel +// line output does not include the trailing newline +func StreamToLinesChan(input io.Reader) chan LineOutput { + ch := make(chan LineOutput) + go func() { + defer close(ch) + err := StreamToLines(input, func(line []byte) { + ch <- LineOutput{Line: string(line)} + }) + if err != nil && err != io.EOF { + ch <- LineOutput{Error: err} + } + }() + return ch +} + +// LineWriter is an io.Writer that processes data line-by-line via a callback. +// Lines do not include the trailing newline. Lines longer than maxLineLength are dropped. +type LineWriter struct { + lineBuf lineBuf + lineFn func([]byte) +} + +// NewLineWriter creates a new LineWriter with the given callback function. +func NewLineWriter(lineFn func([]byte)) *LineWriter { + return &LineWriter{ + lineFn: lineFn, + } +} + +// Write implements io.Writer, processing the data and calling the callback for each complete line. +func (lw *LineWriter) Write(p []byte) (n int, err error) { + streamToLines_processBuf(&lw.lineBuf, p, lw.lineFn) + return len(p), nil +} + +// Flush outputs any remaining buffered data as a final line. +// Should be called when the input stream is complete (e.g., at EOF). +func (lw *LineWriter) Flush() { + if len(lw.lineBuf.buf) > 0 && !lw.lineBuf.inLongLine { + lw.lineFn(lw.lineBuf.buf) + lw.lineBuf.buf = nil + } +}