Skip to content

Commit

Permalink
Distiguishing "finished exec" and "built specs" in TaskStatus (#542)
Browse files Browse the repository at this point in the history
This fixes the regression reported in
sourcegraph/sourcegraph#21230 and introduced
by yours truly in
d6c876c.

With the introduction of the Coordinator and the explicit steps of
checking-the-cache-for and building-of ChangesetSpecs outside the
executor, the `TaskStatus.ChangesetSpecs` field wasn't set at the time
when `FinishedAt` was set.

The `batchProgressPrinter` assumed, though, that this was the case and
that if `FinishedAt` was set that `len(taskStatus.ChangesetSpecs) == 0`
means "No changes".

This change here fixes the problem by distinguishing between the two
states: finished execution of steps and finished building changeset
specs.

The problem is that it's still a slight regression in behaviour:
previously the diff stats would be printed in the status bar and in the
verbose mode *as tasks were finishing*.

Now that we build all changeset specs at once, after all of them have
been executed, we can't update the status bar to include diff stats and
the verbose messages will be logged all at once.

I still think that the current code (with the Coordinator) is better
than what we had before and the hard problem here is fixed (no wrong
information being displayed), but longer term I think there's a solution
possible in which we decouple the task execution and its status much
more and make it possible to build better UIs for the status of
execution.

That I think should be approached separately though.
  • Loading branch information
mrnugget committed May 21, 2021
1 parent d6c876c commit 348c49a
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 76 deletions.
106 changes: 57 additions & 49 deletions cmd/src/batch_progress_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ func newBatchProgressPrinter(out *output.Output, verbose bool, numParallelism in

numParallelism: numParallelism,

completedTasks: map[string]bool{},
executedTasks: map[string]bool{},
runningTasks: map[string]*executor.TaskStatus{},
completedTasks: map[string]bool{},

repoStatusBar: map[string]int{},
statusBarRepo: map[int]string{},
Expand All @@ -45,8 +46,14 @@ type batchProgressPrinter struct {
maxRepoName int
numParallelism int

// runningTasks are the tasks that are currently executing.
runningTasks map[string]*executor.TaskStatus
// executedTasks are the tasks that finished execution but where the
// changesetSpec hasn't been built.
executedTasks map[string]bool
// completedTasks are the tasks that finished execution and finished
// building changesetSpecs.
completedTasks map[string]bool
runningTasks map[string]*executor.TaskStatus

repoStatusBar map[string]int
statusBarRepo map[int]string
Expand Down Expand Up @@ -109,7 +116,8 @@ func (p *batchProgressPrinter) PrintStatuses(statuses []*executor.TaskStatus) {
p.numStatusBars = p.initProgressBar(statuses)
}

newlyCompleted := []*executor.TaskStatus{}
newlyFinishedExecution := []*executor.TaskStatus{}
newlyFinishedBuilding := []*executor.TaskStatus{}
currentlyRunning := []*executor.TaskStatus{}
errored := 0

Expand All @@ -118,14 +126,14 @@ func (p *batchProgressPrinter) PrintStatuses(statuses []*executor.TaskStatus) {
p.maxRepoName = len(ts.DisplayName())
}

if ts.IsCompleted() {
if ts.FinishedExecution() {
if ts.Err != nil {
errored += 1
}

if !p.completedTasks[ts.DisplayName()] {
p.completedTasks[ts.DisplayName()] = true
newlyCompleted = append(newlyCompleted, ts)
if !p.executedTasks[ts.DisplayName()] {
p.executedTasks[ts.DisplayName()] = true
newlyFinishedExecution = append(newlyFinishedExecution, ts)
}

if _, ok := p.runningTasks[ts.DisplayName()]; ok {
Expand All @@ -137,13 +145,20 @@ func (p *batchProgressPrinter) PrintStatuses(statuses []*executor.TaskStatus) {
}
}

if ts.FinishedBuildingSpecs() {
if !p.completedTasks[ts.DisplayName()] {
p.completedTasks[ts.DisplayName()] = true
newlyFinishedBuilding = append(newlyFinishedBuilding, ts)
}
}

if ts.IsRunning() {
currentlyRunning = append(currentlyRunning, ts)
}

}

p.updateProgressBar(len(p.completedTasks), errored, len(statuses))
p.updateProgressBar(len(p.executedTasks), errored, len(statuses))

newlyStarted := map[string]*executor.TaskStatus{}
statusBarIndex := 0
Expand Down Expand Up @@ -176,14 +191,35 @@ func (p *batchProgressPrinter) PrintStatuses(statuses []*executor.TaskStatus) {
p.repoStatusBar[ts.DisplayName()] = statusBarIndex
}

for _, ts := range newlyCompleted {
fileDiffs, hasDiffs, err := ts.FileDiffs()
if err != nil {
p.progress.Verbosef("%-*s failed to display status: %s", p.maxRepoName, ts.DisplayName(), err)
continue
for _, ts := range newlyFinishedExecution {
if idx, ok := p.repoStatusBar[ts.DisplayName()]; ok {
// Log that this task completed, but only if there is no
// currently executing one in this bar, to avoid flicker.
if _, ok := p.statusBarRepo[idx]; !ok {
statusText, err := taskStatusBarText(ts)
if err != nil {
p.progress.Verbosef("%-*s failed to display status: %s", p.maxRepoName, ts.DisplayName(), err)
continue
}

if ts.Err != nil {
p.progress.StatusBarFailf(idx, statusText)
} else {
p.progress.StatusBarCompletef(idx, statusText)
}
}
delete(p.repoStatusBar, ts.DisplayName())
}
}

if p.verbose {
for _, ts := range newlyFinishedBuilding {
fileDiffs, hasDiffs, err := ts.FileDiffs()
if err != nil {
p.progress.Verbosef("%-*s failed to display status: %s", p.maxRepoName, ts.DisplayName(), err)
continue
}

if p.verbose {
p.progress.WriteLine(output.Linef("", output.StylePending, "%s", ts.DisplayName()))

if !hasDiffs {
Expand All @@ -206,25 +242,6 @@ func (p *batchProgressPrinter) PrintStatuses(statuses []*executor.TaskStatus) {
p.progress.Verbosef(" Execution took %s", ts.ExecutionTime())
p.progress.Verbose("")
}

if idx, ok := p.repoStatusBar[ts.DisplayName()]; ok {
// Log that this task completed, but only if there is no
// currently executing one in this bar, to avoid flicker.
if _, ok := p.statusBarRepo[idx]; !ok {
statusText, err := taskStatusBarText(ts)
if err != nil {
p.progress.Verbosef("%-*s failed to display status: %s", p.maxRepoName, ts.DisplayName(), err)
continue
}

if ts.Err != nil {
p.progress.StatusBarFailf(idx, statusText)
} else {
p.progress.StatusBarCompletef(idx, statusText)
}
}
delete(p.repoStatusBar, ts.DisplayName())
}
}

for statusBar, repo := range p.statusBarRepo {
Expand Down Expand Up @@ -255,24 +272,15 @@ type statusTexter interface {
func taskStatusBarText(ts *executor.TaskStatus) (string, error) {
var statusText string

if ts.IsCompleted() {
diffs, hasDiffs, err := ts.FileDiffs()
if err != nil {
return "", err
}

if hasDiffs {
statusText = diffStatDescription(diffs) + " " + diffStatDiagram(sumDiffStats(diffs))
} else {
if ts.Err != nil {
if texter, ok := ts.Err.(statusTexter); ok {
statusText = texter.StatusText()
} else {
statusText = ts.Err.Error()
}
if ts.FinishedExecution() {
if ts.Err != nil {
if texter, ok := ts.Err.(statusTexter); ok {
statusText = texter.StatusText()
} else {
statusText = "No changes"
statusText = ts.Err.Error()
}
} else {
statusText = "Done!"
}
} else if ts.IsRunning() {
if ts.CurrentlyExecuting != "" {
Expand Down
42 changes: 33 additions & 9 deletions cmd/src/batch_progress_printer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,44 @@ func TestBatchProgressPrinterIntegration(t *testing.T) {
t.Fatalf("wrong output:\n%s", cmp.Diff(want, have))
}

// Now mark the last task as completed
// Now mark the last task as finished-execution
statuses[len(statuses)-1] = &executor.TaskStatus{
RepoName: "github.com/sourcegraph/automation-testing",
StartedAt: now.Add(time.Duration(-5) * time.Second),
FinishedAt: now.Add(time.Duration(5) * time.Second),
CurrentlyExecuting: "",
Err: nil,
}

printer.PrintStatuses(statuses)
have = buf.Lines()
want = []string{
"⠋ Executing... (1/3, 0 errored) ███████████████▍",
"│ ",
"├── github.com/sourcegraph/sourcegraph echo Hello World > README.md 0s",
"├── github.com/sourcegraph/src-cli Downloading archive 0s",
"└── github.com/sourcegraph/automati... Done! 0s",
"",
}
if !cmp.Equal(want, have) {
t.Fatalf("wrong output:\n%s", cmp.Diff(want, have))
}

// Print again to make sure we get the same result
printer.PrintStatuses(statuses)
have = buf.Lines()
if !cmp.Equal(want, have) {
t.Fatalf("wrong output:\n%s", cmp.Diff(want, have))
}

// Mark the last task as finished-building-specs
statuses[len(statuses)-1] = &executor.TaskStatus{
RepoName: "github.com/sourcegraph/automation-testing",
StartedAt: now.Add(time.Duration(-5) * time.Second),
FinishedAt: now.Add(time.Duration(5) * time.Second),
CurrentlyExecuting: "",
Err: nil,
ChangesetSpecsDone: true,
ChangesetSpecs: []*batches.ChangesetSpec{
{
BaseRepository: "graphql-id",
Expand Down Expand Up @@ -132,19 +163,12 @@ func TestBatchProgressPrinterIntegration(t *testing.T) {
"│ ",
"├── github.com/sourcegraph/sourcegraph echo Hello World > README.md 0s",
"├── github.com/sourcegraph/src-cli Downloading archive 0s",
"└── github.com/sourcegraph/automati... 3 files changed ++++ 0s",
"└── github.com/sourcegraph/automati... Done! 0s",
"",
}
if !cmp.Equal(want, have) {
t.Fatalf("wrong output:\n%s", cmp.Diff(want, have))
}

// Print again to make sure we get the same result
printer.PrintStatuses(statuses)
have = buf.Lines()
if !cmp.Equal(want, have) {
t.Fatalf("wrong output:\n%s", cmp.Diff(want, have))
}
}

type ttyBuf struct {
Expand Down
29 changes: 16 additions & 13 deletions internal/batches/executor/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,15 @@ func (c *Coordinator) checkCacheForTask(ctx context.Context, task *Task) (specs
return specs, true, nil
}

func (c *Coordinator) cacheAndBuildSpec(ctx context.Context, taskResult taskResult, status taskStatusHandler) ([]*batches.ChangesetSpec, error) {
func (c *Coordinator) cacheAndBuildSpec(ctx context.Context, taskResult taskResult, status taskStatusHandler) (specs []*batches.ChangesetSpec, err error) {
defer func() {
// Set these two fields in any case
status.Update(taskResult.task, func(status *TaskStatus) {
status.ChangesetSpecsDone = true
status.ChangesetSpecs = specs
})
}()

// Add to the cache, even if no diff was produced.
cacheKey := taskResult.task.cacheKey()
if err := c.cache.Set(ctx, cacheKey, taskResult.result); err != nil {
Expand All @@ -153,18 +161,11 @@ func (c *Coordinator) cacheAndBuildSpec(ctx context.Context, taskResult taskResu
}

// Build the changeset specs.
specs, err := createChangesetSpecs(taskResult.task, taskResult.result, c.opts.AutoAuthorDetails)
specs, err = createChangesetSpecs(taskResult.task, taskResult.result, c.opts.AutoAuthorDetails)
if err != nil {
return specs, err
}

// Update the status of Task
status.Update(taskResult.task, func(status *TaskStatus) {
status.ChangesetSpecs = specs
// TODO(mrnugget): Ideally we'd get rid of this dependency on the task
// status handler here.
})

return specs, nil
}

Expand Down Expand Up @@ -205,10 +206,6 @@ func (c *Coordinator) Execute(ctx context.Context, tasks []*Task, spec *batches.
// Run executor
c.exec.Start(ctx, tasks, status)
results, err := c.exec.Wait(ctx)
if printer != nil {
status.CopyStatuses(printer)
done <- struct{}{}
}
if err != nil {
if c.opts.SkipErrors {
errs = multierror.Append(errs, err)
Expand All @@ -227,6 +224,12 @@ func (c *Coordinator) Execute(ctx context.Context, tasks []*Task, spec *batches.
specs = append(specs, taskSpecs...)
}

// Now that we've built the specs too we can mark the progress as done
if printer != nil {
status.CopyStatuses(printer)
done <- struct{}{}
}

// Add external changeset specs.
for _, ic := range spec.ImportChangesets {
repo, err := c.opts.ResolveRepoName(ctx, ic.Repository)
Expand Down
1 change: 0 additions & 1 deletion internal/batches/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ func (x *executor) do(ctx context.Context, task *Task, status taskStatusHandler)
// Ensure that the status is updated when we're done.
defer func() {
status.Update(task, func(status *TaskStatus) {
status.FinishedAt = time.Now()
status.CurrentlyExecuting = ""
status.Err = err
})
Expand Down
17 changes: 13 additions & 4 deletions internal/batches/executor/task_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,14 @@ type TaskStatus struct {
CurrentlyExecuting string

// ChangesetSpecs are the specs produced by executing the Task in a
// repository. With the introduction of `transformChanges` to the batch
// spec, one Task can produce multiple ChangesetSpecs.
// repository. One Task can produce multiple ChangesetSpecs (see
// createChangesetSpec).
// Only check this field once ChangesetSpecsDone is set.
ChangesetSpecs []*batches.ChangesetSpec
// ChangesetSpecsDone is set after the Coordinator attempted to build the
// ChangesetSpecs of a task.
ChangesetSpecsDone bool

// Err is set if executing the Task lead to an error.
Err error

Expand All @@ -89,10 +94,14 @@ func (ts *TaskStatus) IsRunning() bool {
return !ts.StartedAt.IsZero() && ts.FinishedAt.IsZero()
}

func (ts *TaskStatus) IsCompleted() bool {
func (ts *TaskStatus) FinishedExecution() bool {
return !ts.StartedAt.IsZero() && !ts.FinishedAt.IsZero()
}

func (ts *TaskStatus) FinishedBuildingSpecs() bool {
return ts.ChangesetSpecsDone
}

func (ts *TaskStatus) ExecutionTime() time.Duration {
return ts.FinishedAt.Sub(ts.StartedAt).Truncate(time.Millisecond)
}
Expand All @@ -102,7 +111,7 @@ func (ts *TaskStatus) ExecutionTime() time.Duration {
// If no file diffs were produced, the task resulted in an error, or the task
// hasn't finished execution yet, the second return value is false.
func (ts *TaskStatus) FileDiffs() ([]*diff.FileDiff, bool, error) {
if !ts.IsCompleted() || len(ts.ChangesetSpecs) == 0 || ts.Err != nil {
if !ts.FinishedBuildingSpecs() || len(ts.ChangesetSpecs) == 0 || ts.Err != nil {
return nil, false, nil
}

Expand Down

0 comments on commit 348c49a

Please sign in to comment.