Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor log streaming #462

Merged
merged 3 commits into from Dec 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/internal/context/context.go
Expand Up @@ -23,7 +23,7 @@ import (

const (
ROOT_NODE_NAME = "___ROOT___"
GLOBAL_CACHE_KEY = "hello"
GLOBAL_CACHE_KEY = "snozzberries"
)

// A BuildResultStatus represents the status of a target when we log a build result.
Expand Down
121 changes: 45 additions & 76 deletions cli/internal/run/run.go
Expand Up @@ -20,6 +20,7 @@ import (
"turbo/internal/core"
"turbo/internal/fs"
"turbo/internal/globby"
"turbo/internal/logstreamer"
"turbo/internal/scm"
"turbo/internal/ui"
"turbo/internal/util"
Expand Down Expand Up @@ -443,9 +444,6 @@ func (c *RunCommand) Run(args []string) int {
targetLogger.Debug("log file", "path", filepath.Join(runOptions.cwd, logFileName))

// Cache ---------------------------------------------
// We create the real task outputs now so we can potentially use them to
// to store artifacts from remote cache to local fs cache

var hit bool
if runOptions.forceExecution {
hit = false
Expand All @@ -456,35 +454,20 @@ func (c *RunCommand) Run(args []string) int {
} else if hit {
if runOptions.stream && fs.FileExists(filepath.Join(runOptions.cwd, logFileName)) {
logReplayWaitGroup.Add(1)
targetUi.Output(fmt.Sprintf("cache hit, replaying output %s", ui.Dim(hash)))
go replayLogs(targetLogger, targetUi, runOptions, logFileName, hash, &logReplayWaitGroup, false)
go replayLogs(targetLogger, c.Ui, runOptions, logFileName, hash, &logReplayWaitGroup, false)
}
targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime))
tracer(TargetCached, nil)

return nil
}
}
// Setup log file
if err := fs.EnsureDir(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))); err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
output, err := os.Create(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)))
if err != nil {
fmt.Println("here")
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
defer output.Close()

if runOptions.stream {
targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash)))
}

// Setup command execution
argsactual := append([]string{"run"}, task)
argsactual = append(argsactual, runOptions.passThroughArgs...)
// @TODO: @jaredpalmer fix this hack to get the package manager's name
Expand All @@ -493,64 +476,50 @@ func (c *RunCommand) Run(args []string) int {
envs := fmt.Sprintf("TURBO_HASH=%v", hash)
cmd.Env = append(os.Environ(), envs)

// Get a pipe to read from stdout and stderr
stdout, err := cmd.StdoutPipe()
defer stdout.Close()
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
stderr, err := cmd.StderrPipe()
defer stderr.Close()
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}

writer := bufio.NewWriter(output)

// Merge the streams together
merged := io.MultiReader(stderr, stdout)

// Create a scanner which scans r in a line-by-line fashion
scanner := bufio.NewScanner(merged)

// Execute command
// Failed to spawn?
if err := cmd.Start(); err != nil {
tracer(TargetBuildFailed, err)
writer.Flush()
if runOptions.bail {
targetLogger.Error("Could not spawn command: %w", err)
targetUi.Error(fmt.Sprintf("Could not spawn command: %v", err))
os.Exit(1)
}
targetUi.Warn("could not spawn command, but continuing...")
}
// Read line by line and process it
if runOptions.stream || runOptions.cache {
for scanner.Scan() {
line := scanner.Text()
if runOptions.stream {
targetUi.Output(string(scanner.Bytes()))
// Setup stdout/stderr
// If we are not caching anything, then we don't need to write logs to disk
// be careful about this conditional given the default of cache = true
var combinedWriter io.Writer
if !runOptions.cache || !(pipeline.Cache == nil || *pipeline.Cache) {
combinedWriter = os.Stdout
} else {
// Setup log file
if err := fs.EnsureDir(logFileName); err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
if runOptions.cache {
writer.WriteString(fmt.Sprintf("%v\n", line))
}
output, err := os.Create(logFileName)
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
defer output.Close()
bufWriter := bufio.NewWriter(output)
bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash)))
defer bufWriter.Flush()
combinedWriter = io.MultiWriter(os.Stdout, bufWriter)
logger := log.New(combinedWriter, "", 0)
// Setup a streamer that we'll pipe cmd.Stdout to
logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false)
// Setup a streamer that we'll pipe cmd.Stderr to.
logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false)
cmd.Stderr = logStreamerErr
cmd.Stdout = logStreamerOut
// Flush/Reset any error we recorded
logStreamerErr.FlushRecord()
logStreamerOut.FlushRecord()
}

// Run the command
if err := cmd.Wait(); err != nil {
if err := cmd.Run(); err != nil {
tracer(TargetBuildFailed, err)
targetLogger.Error("Error: command finished with error: %w", err)
writer.Flush()
if runOptions.bail {
if runOptions.stream {
targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err))
Expand Down Expand Up @@ -579,8 +548,7 @@ func (c *RunCommand) Run(args []string) int {
return nil
}

writer.Flush()

// Cache command outputs
if runOptions.cache && (pipeline.Cache == nil || *pipeline.Cache) {
targetLogger.Debug("caching output", "outputs", outputs)
ignore := []string{}
Expand All @@ -590,6 +558,7 @@ func (c *RunCommand) Run(args []string) int {
}
}

// Clean up tracing
tracer(TargetBuilt, nil)
targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime))
return nil
Expand Down Expand Up @@ -923,7 +892,7 @@ func replayLogs(logger hclog.Logger, prefixUi cli.Ui, runOptions *RunOptions, lo
defer f.Close()
scan := bufio.NewScanner(f)
for scan.Scan() {
prefixUi.Output(ui.Dim(string(scan.Bytes()))) //Writing to Stdout
prefixUi.Output(ui.StripAnsi(string(scan.Bytes()))) //Writing to Stdout
}
logger.Debug("finish replaying logs")
}
11 changes: 11 additions & 0 deletions cli/internal/ui/ui.go
Expand Up @@ -5,13 +5,15 @@ import (
"io"
"math"
"os"
"regexp"
"strings"

"github.com/fatih/color"
"github.com/mattn/go-isatty"
)

const ESC = 27
const ansiEscapeStr = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"

var IsTTY = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd())
var IsCI = os.Getenv("CI") == "true" || os.Getenv("BUILD_NUMBER") == "true" || os.Getenv("TEAMCITY_VERSION") != ""
Expand All @@ -27,6 +29,15 @@ func ClearLines(writer io.Writer, count int) {
_, _ = fmt.Fprint(writer, strings.Repeat(clear, count))
}

var ansiRegex = regexp.MustCompile(ansiEscapeStr)

func StripAnsi(str string) string {
if !IsTTY {
return ansiRegex.ReplaceAllString(str, "")
}
return str
}

// Dim prints out dimmed text
func Dim(str string) string {
return gray.Sprint(str)
Expand Down