Skip to content

Commit

Permalink
fix(turborepo): Send spaces logs for all tasks, even ones that aren't…
Browse files Browse the repository at this point in the history
… cached (#5425)

Co-authored-by: Greg Soltis <Greg Soltis>
  • Loading branch information
gsoltis committed Jun 29, 2023
1 parent 52f8b57 commit 827f601
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 21 deletions.
1 change: 0 additions & 1 deletion cli/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (g *CompleteGraph) GetPackageTaskVisitor(
Outputs: taskDefinition.Outputs.Inclusions,
ExcludedOutputs: taskDefinition.Outputs.Exclusions,
LogFileRelativePath: logFileRelativePath,
LogFileAbsolutePath: logFileAbsolutePath,
ResolvedTaskDefinition: taskDefinition,
ExpandedInputs: expandedInputs,
ExpandedOutputs: []turbopath.AnchoredSystemPath{},
Expand Down
32 changes: 31 additions & 1 deletion cli/internal/run/real_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ import (
"github.com/vercel/turbo/cli/internal/util"
)

// threadsafeOutputBuffer implements io.Writer for multiple goroutines
// to write to the same underlying buffer. Child processes use separate
// goroutines to handle reading from stdout and stderr, but for now we
// send both to the same buffer.
type threadsafeOutputBuffer struct {
buf bytes.Buffer
mu sync.Mutex
}

func (tsob *threadsafeOutputBuffer) Write(p []byte) (n int, err error) {
tsob.mu.Lock()
defer tsob.mu.Unlock()
return tsob.buf.Write(p)
}

func (tsob *threadsafeOutputBuffer) Bytes() []byte {
return tsob.buf.Bytes()
}

// RealRun executes a set of tasks
func RealRun(
ctx gocontext.Context,
Expand Down Expand Up @@ -143,6 +162,13 @@ func RealRun(
errWriter = errBuf
}

var spacesLogBuffer *threadsafeOutputBuffer
if runSummary.SpacesIsEnabled() {
spacesLogBuffer = &threadsafeOutputBuffer{}
outWriter = io.MultiWriter(spacesLogBuffer, outWriter)
errWriter = io.MultiWriter(spacesLogBuffer, errWriter)
}

ui := concurrentUIFactory.Build(os.Stdin, outWriter, errWriter)

taskExecutionSummary, err := ec.exec(ctx, packageTask, ui, outWriter)
Expand All @@ -161,7 +187,11 @@ func RealRun(
// not using defer, just release the lock
taskSummaryMutex.Unlock()

runSummary.CloseTask(taskSummary)
var logBytes []byte
if spacesLogBuffer != nil {
logBytes = spacesLogBuffer.Bytes()
}
runSummary.CloseTask(taskSummary, logBytes)
}
if isGrouped {
logChan <- taskLogContext{
Expand Down
10 changes: 8 additions & 2 deletions cli/internal/runsummary/run_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func NewRunSummary(
return rsm
}

// SpacesIsEnabled returns true if this run summary is going to send to a
// spaces backend
func (rsm *Meta) SpacesIsEnabled() bool {
return rsm.spacesClient.enabled
}

// getPath returns a path to where the runSummary is written.
// The returned path will always be relative to the dir passsed in.
// We don't do a lot of validation, so `../../` paths are allowed.
Expand Down Expand Up @@ -245,9 +251,9 @@ func (rsm *Meta) save() error {
}

// CloseTask posts the result of the Task to Spaces
func (rsm *Meta) CloseTask(task *TaskSummary) {
func (rsm *Meta) CloseTask(task *TaskSummary, logs []byte) {
if rsm.spacesClient.enabled {
rsm.spacesClient.postTask(task)
rsm.spacesClient.postTask(task, logs)
}
}

Expand Down
8 changes: 4 additions & 4 deletions cli/internal/runsummary/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func (c *spacesClient) createRun(payload *spacesRunPayload) {
}()
}

func (c *spacesClient) postTask(task *TaskSummary) {
func (c *spacesClient) postTask(task *TaskSummary, logs []byte) {
c.queueRequest(&spaceRequest{
method: "POST",
makeURL: func(self *spaceRequest, run *spaceRun) error {
Expand All @@ -190,7 +190,7 @@ func (c *spacesClient) postTask(task *TaskSummary) {
self.url = fmt.Sprintf(tasksEndpoint, c.spaceID, run.ID)
return nil
},
body: newSpacesTaskPayload(task),
body: newSpacesTaskPayload(task, logs),
})
}

Expand Down Expand Up @@ -329,7 +329,7 @@ func newSpacesDonePayload(runsummary *RunSummary) *spacesRunPayload {
}
}

func newSpacesTaskPayload(taskSummary *TaskSummary) *spacesTask {
func newSpacesTaskPayload(taskSummary *TaskSummary, logs []byte) *spacesTask {
startTime := taskSummary.Execution.startAt.UnixMilli()
endTime := taskSummary.Execution.endTime().UnixMilli()

Expand All @@ -344,6 +344,6 @@ func newSpacesTaskPayload(taskSummary *TaskSummary) *spacesTask {
ExitCode: *taskSummary.Execution.exitCode,
Dependencies: taskSummary.Dependencies,
Dependents: taskSummary.Dependents,
Logs: string(taskSummary.GetLogs()),
Logs: string(logs),
}
}
7 changes: 4 additions & 3 deletions cli/internal/runsummary/spaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ func TestFailToCreateRun(t *testing.T) {
exitCode: &exitCode,
},
}
c.postTask(ts)
c.postTask(ts)
c.postTask(ts)
var logs []byte
c.postTask(ts, logs)
c.postTask(ts, logs)
c.postTask(ts, logs)
c.Close()

assert.True(t, api.sawFirst)
Expand Down
10 changes: 0 additions & 10 deletions cli/internal/runsummary/task_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ type TaskSummary struct {
Outputs []string `json:"outputs"`
ExcludedOutputs []string `json:"excludedOutputs"`
LogFileRelativePath string `json:"logFile"`
LogFileAbsolutePath turbopath.AbsoluteSystemPath `json:"-"`
Dir string `json:"directory,omitempty"`
Dependencies []string `json:"dependencies"`
Dependents []string `json:"dependents"`
Expand All @@ -79,15 +78,6 @@ type TaskSummary struct {
Execution *TaskExecutionSummary `json:"execution,omitempty"` // omit when it's not set
}

// GetLogs reads the log file and returns the data
func (ts *TaskSummary) GetLogs() []byte {
bytes, err := ts.LogFileAbsolutePath.ReadFile()
if err != nil {
return []byte{}
}
return bytes
}

// TaskEnvConfiguration contains the environment variable inputs for a task
type TaskEnvConfiguration struct {
Env []string `json:"env"`
Expand Down

0 comments on commit 827f601

Please sign in to comment.