From 132d48596ce9e6f2a4987ea6c1de0b5e874ec629 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Tue, 11 May 2021 17:25:45 +0200 Subject: [PATCH 1/7] Only checkout repo archive if steps will be executed Since we use a "reference counting" mechanism when checking out archives we shouldn't bump the ref count if we never execute steps. Because only after executing the steps do we decrease the ref count and thus "free" the archive to be cleaned up. The problem is that if a `Task` has a cache hit then the ref will never be decreased and the archive never cleaned up. --- internal/batches/executor/executor.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 18fe1367c4..d40be14c03 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -228,7 +228,6 @@ func New(opts Opts, client api.Client, features batches.FeatureFlags) *executor } func (x *executor) AddTask(task *Task) { - task.Archive = x.fetcher.Checkout(task.Repository, task.ArchivePathToFetch()) x.tasks = append(x.tasks, task) x.statusesMu.Lock() @@ -377,6 +376,9 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { log.Close() }() + // Now checkout the archive + task.Archive = x.fetcher.Checkout(task.Repository, task.ArchivePathToFetch()) + // Set up our timeout. runCtx, cancel := context.WithTimeout(ctx, x.timeout) defer cancel() From 3e8475dd7951e3430d35afd76e873b047415517c Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Wed, 12 May 2021 12:23:27 +0200 Subject: [PATCH 2/7] WIP is working --- cmd/src/batch_common.go | 19 +++++++++-- internal/batches/executor/executor.go | 45 +++++++++++++++++++++++++-- internal/batches/repo_fetcher.go | 1 + internal/batches/service/service.go | 42 +++++++++++++++++++------ 4 files changed, 93 insertions(+), 14 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index e9dae4e498..bc60fccfd6 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -295,7 +295,9 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { } batchCompletePending(pending, fmt.Sprintf("Found %d workspaces with steps to execute", len(tasks))) - execOpts := executor.Opts{ + // EXECUTION OF TASKS + + svc.InitExecutor(ctx, executor.Opts{ CacheDir: opts.flags.cacheDir, ClearCache: opts.flags.clearCache, CleanArchives: opts.flags.cleanArchives, @@ -304,10 +306,21 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { Timeout: opts.flags.timeout, KeepLogs: opts.flags.keepLogs, TempDir: opts.flags.tempDir, + }) + + pending = batchCreatePending(opts.out, "Checking cache for changeset specs") + uncachedTasks, cachedSpecs, err := svc.CheckCache(ctx, tasks) + if err != nil { + return err + } + if len(uncachedTasks) > 0 { + batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. %d tasks need to be executed", len(cachedSpecs), len(uncachedTasks))) + } else { + batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. No tasks need to be executed", len(cachedSpecs))) } p := newBatchProgressPrinter(opts.out, *verbose, opts.flags.parallelism) - specs, logFiles, err := svc.RunExecutor(ctx, execOpts, tasks, batchSpec, p.PrintStatuses, opts.flags.skipErrors) + freshSpecs, logFiles, err := svc.RunExecutor(ctx, uncachedTasks, batchSpec, p.PrintStatuses, opts.flags.skipErrors) if err != nil && !opts.flags.skipErrors { return err } @@ -328,6 +341,8 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { }() } + specs := append(cachedSpecs, freshSpecs...) + err = svc.ValidateChangesetSpecs(repos, specs) if err != nil { return err diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index d40be14c03..e1f0f5f332 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -50,6 +50,8 @@ type Executor interface { Start(ctx context.Context) Wait(ctx context.Context) ([]*batches.ChangesetSpec, error) + CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) + // LockedTaskStatuses calls the given function with the current state of // the task statuses. Before calling the function, the statuses are locked // to provide a consistent view of all statuses, but that also means the @@ -289,6 +291,43 @@ func (x *executor) Wait(ctx context.Context) ([]*batches.ChangesetSpec, error) { return x.specs, nil } +func (x *executor) CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) { + // Check if the task is cached. + cacheKey := task.cacheKey() + if x.clearCache { + if err = x.cache.Clear(ctx, cacheKey); err != nil { + return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) + } + + return specs, false, nil + } + + var result executionResult + result, found, err = x.cache.Get(ctx, cacheKey) + if err != nil { + return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name) + } + + if !found { + return specs, false, nil + } + + // If the cached result resulted in an empty diff, we don't need to + // add it to the list of specs that are displayed to the user and + // send to the server. Instead, we can just report that the task is + // complete and move on. + if result.Diff == "" { + return specs, true, nil + } + + specs, err = createChangesetSpecs(task, result, x.features) + if err != nil { + return specs, false, err + } + + return specs, true, nil +} + func (x *executor) do(ctx context.Context, task *Task) (err error) { // Ensure that the status is updated when we're done. defer func() { @@ -349,7 +388,7 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { }) // Add the spec to the executor's list of completed specs. - if err := x.addCompletedSpecs(task.Repository, specs); err != nil { + if err := x.addCompletedSpecs(specs); err != nil { return err } @@ -429,7 +468,7 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { status.ChangesetSpecs = specs }) - if err := x.addCompletedSpecs(task.Repository, specs); err != nil { + if err := x.addCompletedSpecs(specs); err != nil { return err } @@ -446,7 +485,7 @@ func (x *executor) updateTaskStatus(task *Task, update func(status *TaskStatus)) } } -func (x *executor) addCompletedSpecs(repository *graphql.Repository, specs []*batches.ChangesetSpec) error { +func (x *executor) addCompletedSpecs(specs []*batches.ChangesetSpec) error { x.specsMu.Lock() defer x.specsMu.Unlock() diff --git a/internal/batches/repo_fetcher.go b/internal/batches/repo_fetcher.go index d7f5732ce1..8915607e7f 100644 --- a/internal/batches/repo_fetcher.go +++ b/internal/batches/repo_fetcher.go @@ -184,6 +184,7 @@ func (rz *repoZip) Close() error { defer rz.mu.Unlock() rz.uses -= 1 + if rz.uses == 0 && rz.checkouts == 0 && rz.deleteOnClose { for _, addFile := range rz.additionalFiles { if addFile.fetched { diff --git a/internal/batches/service/service.go b/internal/batches/service/service.go index 8fdac712df..3fd320ef91 100644 --- a/internal/batches/service/service.go +++ b/internal/batches/service/service.go @@ -28,6 +28,9 @@ type Service struct { client api.Client features batches.FeatureFlags imageCache *docker.ImageCache + + // TODO(mrnugget): I don't like this state here, ugh. + exec executor.Executor } type Opts struct { @@ -192,16 +195,37 @@ func (svc *Service) BuildTasks(ctx context.Context, repos []*graphql.Repository, return builder.BuildAll(ctx, repos) } -func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks []*executor.Task, spec *batches.BatchSpec, progress func([]*executor.TaskStatus), skipErrors bool) ([]*batches.ChangesetSpec, []string, error) { - x := executor.New(opts, svc.client, svc.features) +func (svc *Service) InitExecutor(ctx context.Context, opts executor.Opts) { + svc.exec = executor.New(opts, svc.client, svc.features) +} + +func (svc *Service) CheckCache(ctx context.Context, tasks []*executor.Task) (uncached []*executor.Task, specs []*batches.ChangesetSpec, err error) { + for _, t := range tasks { + cachedSpecs, found, err := svc.exec.CheckCache(ctx, t) + if err != nil { + return nil, nil, err + } + + if !found { + uncached = append(uncached, t) + continue + } + + specs = append(specs, cachedSpecs...) + } + + return uncached, specs, nil +} + +func (svc *Service) RunExecutor(ctx context.Context, tasks []*executor.Task, spec *batches.BatchSpec, progress func([]*executor.TaskStatus), skipErrors bool) ([]*batches.ChangesetSpec, []string, error) { for _, t := range tasks { - x.AddTask(t) + svc.exec.AddTask(t) } done := make(chan struct{}) if progress != nil { go func() { - x.LockedTaskStatuses(progress) + svc.exec.LockedTaskStatuses(progress) ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -209,7 +233,7 @@ func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks [ for { select { case <-ticker.C: - x.LockedTaskStatuses(progress) + svc.exec.LockedTaskStatuses(progress) case <-done: return @@ -220,10 +244,10 @@ func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks [ var errs *multierror.Error - x.Start(ctx) - specs, err := x.Wait(ctx) + svc.exec.Start(ctx) + specs, err := svc.exec.Wait(ctx) if progress != nil { - x.LockedTaskStatuses(progress) + svc.exec.LockedTaskStatuses(progress) done <- struct{}{} } if err != nil { @@ -272,7 +296,7 @@ func (svc *Service) RunExecutor(ctx context.Context, opts executor.Opts, tasks [ } } - return specs, x.LogFiles(), errs.ErrorOrNil() + return specs, svc.exec.LogFiles(), errs.ErrorOrNil() } func (svc *Service) ValidateChangesetSpecs(repos []*graphql.Repository, specs []*batches.ChangesetSpec) error { From a8748372810c2e900e9614a1e6bb218b16b5f9ba Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Wed, 12 May 2021 14:01:02 +0200 Subject: [PATCH 3/7] More tiny refactoring --- cmd/src/batch_common.go | 3 +- internal/batches/executor/executor.go | 156 +++++++++----------------- internal/batches/service/service.go | 10 +- 3 files changed, 60 insertions(+), 109 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index bc60fccfd6..9bb000966e 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -299,7 +299,6 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { svc.InitExecutor(ctx, executor.Opts{ CacheDir: opts.flags.cacheDir, - ClearCache: opts.flags.clearCache, CleanArchives: opts.flags.cleanArchives, Creator: workspaceCreator, Parallelism: opts.flags.parallelism, @@ -309,7 +308,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { }) pending = batchCreatePending(opts.out, "Checking cache for changeset specs") - uncachedTasks, cachedSpecs, err := svc.CheckCache(ctx, tasks) + uncachedTasks, cachedSpecs, err := svc.CheckCache(ctx, tasks, opts.flags.clearCache) if err != nil { return err } diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index e1f0f5f332..fe16f1d684 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -18,6 +18,43 @@ import ( "github.com/sourcegraph/src-cli/internal/batches/workspace" ) +func CheckCache(ctx context.Context, cache ExecutionCache, clearCache bool, features batches.FeatureFlags, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) { + // Check if the task is cached. + cacheKey := task.cacheKey() + if clearCache { + if err = cache.Clear(ctx, cacheKey); err != nil { + return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) + } + + return specs, false, nil + } + + var result executionResult + result, found, err = cache.Get(ctx, cacheKey) + if err != nil { + return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name) + } + + if !found { + return specs, false, nil + } + + // If the cached result resulted in an empty diff, we don't need to + // add it to the list of specs that are displayed to the user and + // send to the server. Instead, we can just report that the task is + // complete and move on. + if result.Diff == "" { + return specs, true, nil + } + + specs, err = createChangesetSpecs(task, result, features) + if err != nil { + return specs, false, err + } + + return specs, true, nil +} + type TaskExecutionErr struct { Err error Logfile string @@ -50,8 +87,6 @@ type Executor interface { Start(ctx context.Context) Wait(ctx context.Context) ([]*batches.ChangesetSpec, error) - CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) - // LockedTaskStatuses calls the given function with the current state of // the task statuses. Before calling the function, the statuses are locked // to provide a consistent view of all statuses, but that also means the @@ -167,11 +202,13 @@ func (ts *TaskStatus) FileDiffs() ([]*diff.FileDiff, bool, error) { } type Opts struct { - CacheDir string - ClearCache bool - CleanArchives bool + CacheDir string + + // TODO: shoudl be builder + Cache ExecutionCache + Creator workspace.Creator Parallelism int Timeout time.Duration @@ -181,8 +218,7 @@ type Opts struct { } type executor struct { - cache ExecutionCache - clearCache bool + cache ExecutionCache features batches.FeatureFlags @@ -208,8 +244,7 @@ type executor struct { // TODO(mrnugget): Why are client and features not part of Opts? func New(opts Opts, client api.Client, features batches.FeatureFlags) *executor { return &executor{ - cache: NewCache(opts.CacheDir), - clearCache: opts.ClearCache, + cache: opts.Cache, logger: log.NewManager(opts.TempDir, opts.KeepLogs), creator: opts.Creator, @@ -291,43 +326,6 @@ func (x *executor) Wait(ctx context.Context) ([]*batches.ChangesetSpec, error) { return x.specs, nil } -func (x *executor) CheckCache(ctx context.Context, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) { - // Check if the task is cached. - cacheKey := task.cacheKey() - if x.clearCache { - if err = x.cache.Clear(ctx, cacheKey); err != nil { - return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) - } - - return specs, false, nil - } - - var result executionResult - result, found, err = x.cache.Get(ctx, cacheKey) - if err != nil { - return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name) - } - - if !found { - return specs, false, nil - } - - // If the cached result resulted in an empty diff, we don't need to - // add it to the list of specs that are displayed to the user and - // send to the server. Instead, we can just report that the task is - // complete and move on. - if result.Diff == "" { - return specs, true, nil - } - - specs, err = createChangesetSpecs(task, result, x.features) - if err != nil { - return specs, false, err - } - - return specs, true, nil -} - func (x *executor) do(ctx context.Context, task *Task) (err error) { // Ensure that the status is updated when we're done. defer func() { @@ -343,59 +341,6 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { status.StartedAt = time.Now() }) - // Check if the task is cached. - cacheKey := task.cacheKey() - if x.clearCache { - if err = x.cache.Clear(ctx, cacheKey); err != nil { - err = errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) - return - } - } else { - var ( - result executionResult - found bool - ) - - result, found, err = x.cache.Get(ctx, cacheKey) - if err != nil { - err = errors.Wrapf(err, "checking cache for %q", task.Repository.Name) - return - } - if found { - // If the cached result resulted in an empty diff, we don't need to - // add it to the list of specs that are displayed to the user and - // send to the server. Instead, we can just report that the task is - // complete and move on. - if result.Diff == "" { - x.updateTaskStatus(task, func(status *TaskStatus) { - status.Cached = true - status.FinishedAt = time.Now() - - }) - return - } - - var specs []*batches.ChangesetSpec - specs, err = createChangesetSpecs(task, result, x.features) - if err != nil { - return err - } - - x.updateTaskStatus(task, func(status *TaskStatus) { - status.ChangesetSpecs = specs - status.Cached = true - status.FinishedAt = time.Now() - }) - - // Add the spec to the executor's list of completed specs. - if err := x.addCompletedSpecs(specs); err != nil { - return err - } - - return - } - } - // It isn't, so let's get ready to run the task. First, let's set up our // logging. log, err := x.logger.AddTask(task.Repository.SlugForPath(task.Path)) @@ -446,11 +391,8 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { return } - // Build the changeset specs. - specs, err := createChangesetSpecs(task, result, x.features) - if err != nil { - return err - } + // Check if the task is cached. + cacheKey := task.cacheKey() // Add to the cache. We don't use runCtx here because we want to write to // the cache even if we've now reached the timeout. @@ -464,6 +406,12 @@ func (x *executor) do(ctx context.Context, task *Task) (err error) { return } + // Build the changeset specs. + specs, err := createChangesetSpecs(task, result, x.features) + if err != nil { + return err + } + x.updateTaskStatus(task, func(status *TaskStatus) { status.ChangesetSpecs = specs }) diff --git a/internal/batches/service/service.go b/internal/batches/service/service.go index 3fd320ef91..02ab100d91 100644 --- a/internal/batches/service/service.go +++ b/internal/batches/service/service.go @@ -30,7 +30,8 @@ type Service struct { imageCache *docker.ImageCache // TODO(mrnugget): I don't like this state here, ugh. - exec executor.Executor + exec executor.Executor + cache executor.ExecutionCache } type Opts struct { @@ -196,12 +197,15 @@ func (svc *Service) BuildTasks(ctx context.Context, repos []*graphql.Repository, } func (svc *Service) InitExecutor(ctx context.Context, opts executor.Opts) { + svc.cache = executor.NewCache(opts.CacheDir) + opts.Cache = svc.cache + svc.exec = executor.New(opts, svc.client, svc.features) } -func (svc *Service) CheckCache(ctx context.Context, tasks []*executor.Task) (uncached []*executor.Task, specs []*batches.ChangesetSpec, err error) { +func (svc *Service) CheckCache(ctx context.Context, tasks []*executor.Task, clearCache bool) (uncached []*executor.Task, specs []*batches.ChangesetSpec, err error) { for _, t := range tasks { - cachedSpecs, found, err := svc.exec.CheckCache(ctx, t) + cachedSpecs, found, err := executor.CheckCache(ctx, svc.cache, false, svc.features, t) if err != nil { return nil, nil, err } From 55c43ca3e6f8392f063f5d93af64e8c848c18795 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Wed, 12 May 2021 14:20:36 +0200 Subject: [PATCH 4/7] Working state --- cmd/src/batch_common.go | 1 + internal/batches/executor/executor.go | 25 +++++++++++----------- internal/batches/executor/executor_test.go | 6 ++++-- internal/batches/service/service.go | 14 +++++++++--- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 9bb000966e..4896c60048 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -297,6 +297,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { // EXECUTION OF TASKS + svc.InitCache(opts.flags.cacheDir) svc.InitExecutor(ctx, executor.Opts{ CacheDir: opts.flags.cacheDir, CleanArchives: opts.flags.cleanArchives, diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index fe16f1d684..07aafbcda8 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -202,14 +202,15 @@ func (ts *TaskStatus) FileDiffs() ([]*diff.FileDiff, bool, error) { } type Opts struct { + Cache ExecutionCache + Client api.Client + Features batches.FeatureFlags + Creator workspace.Creator + CleanArchives bool CacheDir string - // TODO: shoudl be builder - Cache ExecutionCache - - Creator workspace.Creator Parallelism int Timeout time.Duration @@ -241,18 +242,16 @@ type executor struct { specsMu sync.Mutex } -// TODO(mrnugget): Why are client and features not part of Opts? -func New(opts Opts, client api.Client, features batches.FeatureFlags) *executor { +func New(opts Opts) *executor { return &executor{ - cache: opts.Cache, - - logger: log.NewManager(opts.TempDir, opts.KeepLogs), - creator: opts.Creator, + cache: opts.Cache, + client: opts.Client, + features: opts.Features, + creator: opts.Creator, - fetcher: batches.NewRepoFetcher(client, opts.CacheDir, opts.CleanArchives), + logger: log.NewManager(opts.TempDir, opts.KeepLogs), - client: client, - features: features, + fetcher: batches.NewRepoFetcher(opts.Client, opts.CacheDir, opts.CleanArchives), tempDir: opts.TempDir, timeout: opts.Timeout, diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 56b060a4a3..83059a2bf0 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -436,7 +436,10 @@ output4=integration-test-batch-change`, cache := newInMemoryExecutionCache() creator := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, []batches.Step{}) opts := Opts{ + Cache: cache, Creator: creator, + Client: client, + Features: featuresAllEnabled(), TempDir: testTempDir, Parallelism: runtime.GOMAXPROCS(0), Timeout: tc.executorTimeout, @@ -452,8 +455,7 @@ output4=integration-test-batch-change`, // executor. We'll run this multiple times to cover both the cache // and non-cache code paths. execute := func(t *testing.T) { - executor := New(opts, client, featuresAllEnabled()) - executor.cache = cache + executor := New(opts) executor.fetcher = repoFetcher for i := range tc.steps { diff --git a/internal/batches/service/service.go b/internal/batches/service/service.go index 02ab100d91..6f6314a75c 100644 --- a/internal/batches/service/service.go +++ b/internal/batches/service/service.go @@ -196,16 +196,24 @@ func (svc *Service) BuildTasks(ctx context.Context, repos []*graphql.Repository, return builder.BuildAll(ctx, repos) } +// TODO(mrnugget): This is not good. +func (svc *Service) InitCache(cacheDir string) { + svc.cache = executor.NewCache(cacheDir) +} + +// TODO(mrnugget): This is not good. Ideally the executor wouldn't have to know +// anything about the cache. func (svc *Service) InitExecutor(ctx context.Context, opts executor.Opts) { - svc.cache = executor.NewCache(opts.CacheDir) opts.Cache = svc.cache + opts.Client = svc.client + opts.Features = svc.features - svc.exec = executor.New(opts, svc.client, svc.features) + svc.exec = executor.New(opts) } func (svc *Service) CheckCache(ctx context.Context, tasks []*executor.Task, clearCache bool) (uncached []*executor.Task, specs []*batches.ChangesetSpec, err error) { for _, t := range tasks { - cachedSpecs, found, err := executor.CheckCache(ctx, svc.cache, false, svc.features, t) + cachedSpecs, found, err := executor.CheckCache(ctx, svc.cache, clearCache, svc.features, t) if err != nil { return nil, nil, err } From 94abafaef87a1317cf948b390578210255fd16e2 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Wed, 12 May 2021 14:30:04 +0200 Subject: [PATCH 5/7] Move stuff around --- internal/batches/executor/check_cache.go | 45 +++++++ internal/batches/executor/executor.go | 145 ----------------------- internal/batches/executor/task.go | 40 +++++++ internal/batches/executor/task_status.go | 82 +++++++++++++ 4 files changed, 167 insertions(+), 145 deletions(-) create mode 100644 internal/batches/executor/check_cache.go create mode 100644 internal/batches/executor/task.go create mode 100644 internal/batches/executor/task_status.go diff --git a/internal/batches/executor/check_cache.go b/internal/batches/executor/check_cache.go new file mode 100644 index 0000000000..25601c01a0 --- /dev/null +++ b/internal/batches/executor/check_cache.go @@ -0,0 +1,45 @@ +package executor + +import ( + "context" + + "github.com/pkg/errors" + "github.com/sourcegraph/src-cli/internal/batches" +) + +func CheckCache(ctx context.Context, cache ExecutionCache, clearCache bool, features batches.FeatureFlags, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) { + // Check if the task is cached. + cacheKey := task.cacheKey() + if clearCache { + if err = cache.Clear(ctx, cacheKey); err != nil { + return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) + } + + return specs, false, nil + } + + var result executionResult + result, found, err = cache.Get(ctx, cacheKey) + if err != nil { + return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name) + } + + if !found { + return specs, false, nil + } + + // If the cached result resulted in an empty diff, we don't need to + // add it to the list of specs that are displayed to the user and + // send to the server. Instead, we can just report that the task is + // complete and move on. + if result.Diff == "" { + return specs, true, nil + } + + specs, err = createChangesetSpecs(task, result, features) + if err != nil { + return specs, false, err + } + + return specs, true, nil +} diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index 07aafbcda8..b64b445da7 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -13,48 +13,10 @@ import ( "github.com/sourcegraph/go-diff/diff" "github.com/sourcegraph/src-cli/internal/api" "github.com/sourcegraph/src-cli/internal/batches" - "github.com/sourcegraph/src-cli/internal/batches/graphql" "github.com/sourcegraph/src-cli/internal/batches/log" "github.com/sourcegraph/src-cli/internal/batches/workspace" ) -func CheckCache(ctx context.Context, cache ExecutionCache, clearCache bool, features batches.FeatureFlags, task *Task) (specs []*batches.ChangesetSpec, found bool, err error) { - // Check if the task is cached. - cacheKey := task.cacheKey() - if clearCache { - if err = cache.Clear(ctx, cacheKey); err != nil { - return specs, false, errors.Wrapf(err, "clearing cache for %q", task.Repository.Name) - } - - return specs, false, nil - } - - var result executionResult - result, found, err = cache.Get(ctx, cacheKey) - if err != nil { - return specs, false, errors.Wrapf(err, "checking cache for %q", task.Repository.Name) - } - - if !found { - return specs, false, nil - } - - // If the cached result resulted in an empty diff, we don't need to - // add it to the list of specs that are displayed to the user and - // send to the server. Instead, we can just report that the task is - // complete and move on. - if result.Diff == "" { - return specs, true, nil - } - - specs, err = createChangesetSpecs(task, result, features) - if err != nil { - return specs, false, err - } - - return specs, true, nil -} - type TaskExecutionErr struct { Err error Logfile string @@ -94,113 +56,6 @@ type Executor interface { LockedTaskStatuses(func([]*TaskStatus)) } -type Task struct { - Repository *graphql.Repository - - // Path is the folder relative to the repository's root in which the steps - // should be executed. - Path string - // OnlyFetchWorkspace determines whether the repository archive contains - // the complete repository or just the files in Path (and additional files, - // see RepoFetcher). - // If Path is "" then this setting has no effect. - OnlyFetchWorkspace bool - - Steps []batches.Step - - // TODO(mrnugget): this should just be a single BatchSpec field instead, if - // we can make it work with caching - BatchChangeAttributes *BatchChangeAttributes `json:"-"` - Template *batches.ChangesetTemplate `json:"-"` - TransformChanges *batches.TransformChanges `json:"-"` - - Archive batches.RepoZip `json:"-"` -} - -func (t *Task) ArchivePathToFetch() string { - if t.OnlyFetchWorkspace { - return t.Path - } - return "" -} - -func (t *Task) cacheKey() ExecutionCacheKey { - return ExecutionCacheKey{t} -} - -type TaskStatus struct { - RepoName string - Path string - - Cached bool - - LogFile string - EnqueuedAt time.Time - StartedAt time.Time - FinishedAt time.Time - - // TODO: add current step and progress fields. - CurrentlyExecuting string - - // ChangesetSpecs are the specs produced by executing the Task in a - // repository. With the introduction of `transformChanges` to the batch - // spec, one Task can produce multiple ChangesetSpecs. - ChangesetSpecs []*batches.ChangesetSpec - // Err is set if executing the Task lead to an error. - Err error - - fileDiffs []*diff.FileDiff - fileDiffsErr error - fileDiffsOnce sync.Once -} - -func (ts *TaskStatus) DisplayName() string { - if ts.Path != "" { - return ts.RepoName + ":" + ts.Path - } - return ts.RepoName -} - -func (ts *TaskStatus) IsRunning() bool { - return !ts.StartedAt.IsZero() && ts.FinishedAt.IsZero() -} - -func (ts *TaskStatus) IsCompleted() bool { - return !ts.StartedAt.IsZero() && !ts.FinishedAt.IsZero() -} - -func (ts *TaskStatus) ExecutionTime() time.Duration { - return ts.FinishedAt.Sub(ts.StartedAt).Truncate(time.Millisecond) -} - -// FileDiffs returns the file diffs produced by the Task in the given -// repository. -// If no file diffs were produced, the task resulted in an error, or the task -// hasn't finished execution yet, the second return value is false. -func (ts *TaskStatus) FileDiffs() ([]*diff.FileDiff, bool, error) { - if !ts.IsCompleted() || len(ts.ChangesetSpecs) == 0 || ts.Err != nil { - return nil, false, nil - } - - ts.fileDiffsOnce.Do(func() { - var all []*diff.FileDiff - - for _, spec := range ts.ChangesetSpecs { - fd, err := diff.ParseMultiFileDiff([]byte(spec.Commits[0].Diff)) - if err != nil { - ts.fileDiffsErr = err - return - } - - all = append(all, fd...) - } - - ts.fileDiffs = all - }) - - return ts.fileDiffs, len(ts.fileDiffs) != 0, ts.fileDiffsErr -} - type Opts struct { Cache ExecutionCache Client api.Client diff --git a/internal/batches/executor/task.go b/internal/batches/executor/task.go new file mode 100644 index 0000000000..677270d162 --- /dev/null +++ b/internal/batches/executor/task.go @@ -0,0 +1,40 @@ +package executor + +import ( + "github.com/sourcegraph/src-cli/internal/batches" + "github.com/sourcegraph/src-cli/internal/batches/graphql" +) + +type Task struct { + Repository *graphql.Repository + + // Path is the folder relative to the repository's root in which the steps + // should be executed. + Path string + // OnlyFetchWorkspace determines whether the repository archive contains + // the complete repository or just the files in Path (and additional files, + // see RepoFetcher). + // If Path is "" then this setting has no effect. + OnlyFetchWorkspace bool + + Steps []batches.Step + + // TODO(mrnugget): this should just be a single BatchSpec field instead, if + // we can make it work with caching + BatchChangeAttributes *BatchChangeAttributes `json:"-"` + Template *batches.ChangesetTemplate `json:"-"` + TransformChanges *batches.TransformChanges `json:"-"` + + Archive batches.RepoZip `json:"-"` +} + +func (t *Task) ArchivePathToFetch() string { + if t.OnlyFetchWorkspace { + return t.Path + } + return "" +} + +func (t *Task) cacheKey() ExecutionCacheKey { + return ExecutionCacheKey{t} +} diff --git a/internal/batches/executor/task_status.go b/internal/batches/executor/task_status.go new file mode 100644 index 0000000000..f2247ea765 --- /dev/null +++ b/internal/batches/executor/task_status.go @@ -0,0 +1,82 @@ +package executor + +import ( + "sync" + "time" + + "github.com/sourcegraph/go-diff/diff" + "github.com/sourcegraph/src-cli/internal/batches" +) + +type TaskStatus struct { + RepoName string + Path string + + Cached bool + + LogFile string + EnqueuedAt time.Time + StartedAt time.Time + FinishedAt time.Time + + // TODO: add current step and progress fields. + CurrentlyExecuting string + + // ChangesetSpecs are the specs produced by executing the Task in a + // repository. With the introduction of `transformChanges` to the batch + // spec, one Task can produce multiple ChangesetSpecs. + ChangesetSpecs []*batches.ChangesetSpec + // Err is set if executing the Task lead to an error. + Err error + + fileDiffs []*diff.FileDiff + fileDiffsErr error + fileDiffsOnce sync.Once +} + +func (ts *TaskStatus) DisplayName() string { + if ts.Path != "" { + return ts.RepoName + ":" + ts.Path + } + return ts.RepoName +} + +func (ts *TaskStatus) IsRunning() bool { + return !ts.StartedAt.IsZero() && ts.FinishedAt.IsZero() +} + +func (ts *TaskStatus) IsCompleted() bool { + return !ts.StartedAt.IsZero() && !ts.FinishedAt.IsZero() +} + +func (ts *TaskStatus) ExecutionTime() time.Duration { + return ts.FinishedAt.Sub(ts.StartedAt).Truncate(time.Millisecond) +} + +// FileDiffs returns the file diffs produced by the Task in the given +// repository. +// If no file diffs were produced, the task resulted in an error, or the task +// hasn't finished execution yet, the second return value is false. +func (ts *TaskStatus) FileDiffs() ([]*diff.FileDiff, bool, error) { + if !ts.IsCompleted() || len(ts.ChangesetSpecs) == 0 || ts.Err != nil { + return nil, false, nil + } + + ts.fileDiffsOnce.Do(func() { + var all []*diff.FileDiff + + for _, spec := range ts.ChangesetSpecs { + fd, err := diff.ParseMultiFileDiff([]byte(spec.Commits[0].Diff)) + if err != nil { + ts.fileDiffsErr = err + return + } + + all = append(all, fd...) + } + + ts.fileDiffs = all + }) + + return ts.fileDiffs, len(ts.fileDiffs) != 0, ts.fileDiffsErr +} From 8cd9c5083b82ff6999ee86ad5fb1caf56734546e Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Wed, 12 May 2021 15:40:56 +0200 Subject: [PATCH 6/7] More refactoring --- cmd/src/batch_common.go | 2 +- internal/batches/executor/changeset_specs.go | 215 +++++++++++++++++++ internal/batches/executor/executor.go | 211 +----------------- internal/batches/executor/executor_test.go | 2 +- internal/batches/service/service.go | 2 +- 5 files changed, 220 insertions(+), 212 deletions(-) create mode 100644 internal/batches/executor/changeset_specs.go diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 4896c60048..9a733a91c3 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -298,7 +298,7 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { // EXECUTION OF TASKS svc.InitCache(opts.flags.cacheDir) - svc.InitExecutor(ctx, executor.Opts{ + svc.InitExecutor(ctx, executor.NewExecutorOpts{ CacheDir: opts.flags.cacheDir, CleanArchives: opts.flags.cleanArchives, Creator: workspaceCreator, diff --git a/internal/batches/executor/changeset_specs.go b/internal/batches/executor/changeset_specs.go new file mode 100644 index 0000000000..0c697c45f6 --- /dev/null +++ b/internal/batches/executor/changeset_specs.go @@ -0,0 +1,215 @@ +package executor + +import ( + "fmt" + "strings" + + "github.com/pkg/errors" + "github.com/sourcegraph/go-diff/diff" + "github.com/sourcegraph/src-cli/internal/batches" +) + +func createChangesetSpecs(task *Task, result executionResult, features batches.FeatureFlags) ([]*batches.ChangesetSpec, error) { + repo := task.Repository.Name + + tmplCtx := &ChangesetTemplateContext{ + BatchChangeAttributes: *task.BatchChangeAttributes, + Steps: StepsContext{ + Changes: result.ChangedFiles, + Path: result.Path, + }, + Outputs: result.Outputs, + Repository: *task.Repository, + } + + var authorName string + var authorEmail string + + if task.Template.Commit.Author == nil { + if features.IncludeAutoAuthorDetails { + // user did not provide author info, so use defaults + authorName = "Sourcegraph" + authorEmail = "batch-changes@sourcegraph.com" + } + } else { + var err error + authorName, err = renderChangesetTemplateField("authorName", task.Template.Commit.Author.Name, tmplCtx) + if err != nil { + return nil, err + } + authorEmail, err = renderChangesetTemplateField("authorEmail", task.Template.Commit.Author.Email, tmplCtx) + if err != nil { + return nil, err + } + } + + title, err := renderChangesetTemplateField("title", task.Template.Title, tmplCtx) + if err != nil { + return nil, err + } + + body, err := renderChangesetTemplateField("body", task.Template.Body, tmplCtx) + if err != nil { + return nil, err + } + + message, err := renderChangesetTemplateField("message", task.Template.Commit.Message, tmplCtx) + if err != nil { + return nil, err + } + + // TODO: As a next step, we should extend the ChangesetTemplateContext to also include + // TransformChanges.Group and then change validateGroups and groupFileDiffs to, for each group, + // render the branch name *before* grouping the diffs. + defaultBranch, err := renderChangesetTemplateField("branch", task.Template.Branch, tmplCtx) + if err != nil { + return nil, err + } + + newSpec := func(branch, diff string) *batches.ChangesetSpec { + return &batches.ChangesetSpec{ + BaseRepository: task.Repository.ID, + CreatedChangeset: &batches.CreatedChangeset{ + BaseRef: task.Repository.BaseRef(), + BaseRev: task.Repository.Rev(), + HeadRepository: task.Repository.ID, + HeadRef: "refs/heads/" + branch, + Title: title, + Body: body, + Commits: []batches.GitCommitDescription{ + { + Message: message, + AuthorName: authorName, + AuthorEmail: authorEmail, + Diff: diff, + }, + }, + Published: task.Template.Published.ValueWithSuffix(repo, branch), + }, + } + } + + var specs []*batches.ChangesetSpec + + groups := groupsForRepository(task.Repository.Name, task.TransformChanges) + if len(groups) != 0 { + err := validateGroups(task.Repository.Name, task.Template.Branch, groups) + if err != nil { + return specs, err + } + + // TODO: Regarding 'defaultBranch', see comment above + diffsByBranch, err := groupFileDiffs(result.Diff, defaultBranch, groups) + if err != nil { + return specs, errors.Wrap(err, "grouping diffs failed") + } + + for branch, diff := range diffsByBranch { + specs = append(specs, newSpec(branch, diff)) + } + } else { + specs = append(specs, newSpec(defaultBranch, result.Diff)) + } + + return specs, nil +} + +func groupsForRepository(repo string, transform *batches.TransformChanges) []batches.Group { + var groups []batches.Group + + if transform == nil { + return groups + } + + for _, g := range transform.Group { + if g.Repository != "" { + if g.Repository == repo { + groups = append(groups, g) + } + } else { + groups = append(groups, g) + } + } + + return groups +} + +func validateGroups(repo, defaultBranch string, groups []batches.Group) error { + uniqueBranches := make(map[string]struct{}, len(groups)) + + for _, g := range groups { + if _, ok := uniqueBranches[g.Branch]; ok { + return fmt.Errorf("transformChanges would lead to multiple changesets in repository %s to have the same branch %q", repo, g.Branch) + } else { + uniqueBranches[g.Branch] = struct{}{} + } + + if g.Branch == defaultBranch { + return fmt.Errorf("transformChanges group branch for repository %s is the same as branch %q in changesetTemplate", repo, defaultBranch) + } + } + + return nil +} + +func groupFileDiffs(completeDiff, defaultBranch string, groups []batches.Group) (map[string]string, error) { + fileDiffs, err := diff.ParseMultiFileDiff([]byte(completeDiff)) + if err != nil { + return nil, err + } + + // Housekeeping: we setup these two datastructures so we can + // - access the group.Branch by the directory for which they should be used + // - check against the given directories, in order. + branchesByDirectory := make(map[string]string, len(groups)) + dirs := make([]string, len(branchesByDirectory)) + for _, g := range groups { + branchesByDirectory[g.Directory] = g.Branch + dirs = append(dirs, g.Directory) + } + + byBranch := make(map[string][]*diff.FileDiff, len(groups)) + byBranch[defaultBranch] = []*diff.FileDiff{} + + // For each file diff... + for _, f := range fileDiffs { + name := f.NewName + if name == "/dev/null" { + name = f.OrigName + } + + // .. we check whether it matches one of the given directories in the + // group transformations, with the last match winning: + var matchingDir string + for _, d := range dirs { + if strings.Contains(name, d) { + matchingDir = d + } + } + + // If the diff didn't match a rule, it goes into the default branch and + // the default changeset. + if matchingDir == "" { + byBranch[defaultBranch] = append(byBranch[defaultBranch], f) + continue + } + + // If it *did* match a directory, we look up which branch we should use: + branch, ok := branchesByDirectory[matchingDir] + if !ok { + panic("this should not happen: " + matchingDir) + } + + byBranch[branch] = append(byBranch[branch], f) + } + + finalDiffsByBranch := make(map[string]string, len(byBranch)) + for branch, diffs := range byBranch { + printed, err := diff.PrintMultiFileDiff(diffs) + if err != nil { + return nil, errors.Wrap(err, "printing multi file diff failed") + } + finalDiffsByBranch[branch] = string(printed) + } + return finalDiffsByBranch, nil +} diff --git a/internal/batches/executor/executor.go b/internal/batches/executor/executor.go index b64b445da7..e8287a32b9 100644 --- a/internal/batches/executor/executor.go +++ b/internal/batches/executor/executor.go @@ -4,13 +4,11 @@ import ( "context" "fmt" "os/exec" - "strings" "sync" "time" "github.com/neelance/parallel" "github.com/pkg/errors" - "github.com/sourcegraph/go-diff/diff" "github.com/sourcegraph/src-cli/internal/api" "github.com/sourcegraph/src-cli/internal/batches" "github.com/sourcegraph/src-cli/internal/batches/log" @@ -56,7 +54,7 @@ type Executor interface { LockedTaskStatuses(func([]*TaskStatus)) } -type Opts struct { +type NewExecutorOpts struct { Cache ExecutionCache Client api.Client Features batches.FeatureFlags @@ -97,7 +95,7 @@ type executor struct { specsMu sync.Mutex } -func New(opts Opts) *executor { +func New(opts NewExecutorOpts) *executor { return &executor{ cache: opts.Cache, client: opts.Client, @@ -322,208 +320,3 @@ func reachedTimeout(cmdCtx context.Context, err error) bool { return errors.Is(errors.Cause(err), context.DeadlineExceeded) } - -func createChangesetSpecs(task *Task, result executionResult, features batches.FeatureFlags) ([]*batches.ChangesetSpec, error) { - repo := task.Repository.Name - - tmplCtx := &ChangesetTemplateContext{ - BatchChangeAttributes: *task.BatchChangeAttributes, - Steps: StepsContext{ - Changes: result.ChangedFiles, - Path: result.Path, - }, - Outputs: result.Outputs, - Repository: *task.Repository, - } - - var authorName string - var authorEmail string - - if task.Template.Commit.Author == nil { - if features.IncludeAutoAuthorDetails { - // user did not provide author info, so use defaults - authorName = "Sourcegraph" - authorEmail = "batch-changes@sourcegraph.com" - } - } else { - var err error - authorName, err = renderChangesetTemplateField("authorName", task.Template.Commit.Author.Name, tmplCtx) - if err != nil { - return nil, err - } - authorEmail, err = renderChangesetTemplateField("authorEmail", task.Template.Commit.Author.Email, tmplCtx) - if err != nil { - return nil, err - } - } - - title, err := renderChangesetTemplateField("title", task.Template.Title, tmplCtx) - if err != nil { - return nil, err - } - - body, err := renderChangesetTemplateField("body", task.Template.Body, tmplCtx) - if err != nil { - return nil, err - } - - message, err := renderChangesetTemplateField("message", task.Template.Commit.Message, tmplCtx) - if err != nil { - return nil, err - } - - // TODO: As a next step, we should extend the ChangesetTemplateContext to also include - // TransformChanges.Group and then change validateGroups and groupFileDiffs to, for each group, - // render the branch name *before* grouping the diffs. - defaultBranch, err := renderChangesetTemplateField("branch", task.Template.Branch, tmplCtx) - if err != nil { - return nil, err - } - - newSpec := func(branch, diff string) *batches.ChangesetSpec { - return &batches.ChangesetSpec{ - BaseRepository: task.Repository.ID, - CreatedChangeset: &batches.CreatedChangeset{ - BaseRef: task.Repository.BaseRef(), - BaseRev: task.Repository.Rev(), - HeadRepository: task.Repository.ID, - HeadRef: "refs/heads/" + branch, - Title: title, - Body: body, - Commits: []batches.GitCommitDescription{ - { - Message: message, - AuthorName: authorName, - AuthorEmail: authorEmail, - Diff: diff, - }, - }, - Published: task.Template.Published.ValueWithSuffix(repo, branch), - }, - } - } - - var specs []*batches.ChangesetSpec - - groups := groupsForRepository(task.Repository.Name, task.TransformChanges) - if len(groups) != 0 { - err := validateGroups(task.Repository.Name, task.Template.Branch, groups) - if err != nil { - return specs, err - } - - // TODO: Regarding 'defaultBranch', see comment above - diffsByBranch, err := groupFileDiffs(result.Diff, defaultBranch, groups) - if err != nil { - return specs, errors.Wrap(err, "grouping diffs failed") - } - - for branch, diff := range diffsByBranch { - specs = append(specs, newSpec(branch, diff)) - } - } else { - specs = append(specs, newSpec(defaultBranch, result.Diff)) - } - - return specs, nil -} - -func groupsForRepository(repo string, transform *batches.TransformChanges) []batches.Group { - var groups []batches.Group - - if transform == nil { - return groups - } - - for _, g := range transform.Group { - if g.Repository != "" { - if g.Repository == repo { - groups = append(groups, g) - } - } else { - groups = append(groups, g) - } - } - - return groups -} - -func validateGroups(repo, defaultBranch string, groups []batches.Group) error { - uniqueBranches := make(map[string]struct{}, len(groups)) - - for _, g := range groups { - if _, ok := uniqueBranches[g.Branch]; ok { - return fmt.Errorf("transformChanges would lead to multiple changesets in repository %s to have the same branch %q", repo, g.Branch) - } else { - uniqueBranches[g.Branch] = struct{}{} - } - - if g.Branch == defaultBranch { - return fmt.Errorf("transformChanges group branch for repository %s is the same as branch %q in changesetTemplate", repo, defaultBranch) - } - } - - return nil -} - -func groupFileDiffs(completeDiff, defaultBranch string, groups []batches.Group) (map[string]string, error) { - fileDiffs, err := diff.ParseMultiFileDiff([]byte(completeDiff)) - if err != nil { - return nil, err - } - - // Housekeeping: we setup these two datastructures so we can - // - access the group.Branch by the directory for which they should be used - // - check against the given directories, in order. - branchesByDirectory := make(map[string]string, len(groups)) - dirs := make([]string, len(branchesByDirectory)) - for _, g := range groups { - branchesByDirectory[g.Directory] = g.Branch - dirs = append(dirs, g.Directory) - } - - byBranch := make(map[string][]*diff.FileDiff, len(groups)) - byBranch[defaultBranch] = []*diff.FileDiff{} - - // For each file diff... - for _, f := range fileDiffs { - name := f.NewName - if name == "/dev/null" { - name = f.OrigName - } - - // .. we check whether it matches one of the given directories in the - // group transformations, with the last match winning: - var matchingDir string - for _, d := range dirs { - if strings.Contains(name, d) { - matchingDir = d - } - } - - // If the diff didn't match a rule, it goes into the default branch and - // the default changeset. - if matchingDir == "" { - byBranch[defaultBranch] = append(byBranch[defaultBranch], f) - continue - } - - // If it *did* match a directory, we look up which branch we should use: - branch, ok := branchesByDirectory[matchingDir] - if !ok { - panic("this should not happen: " + matchingDir) - } - - byBranch[branch] = append(byBranch[branch], f) - } - - finalDiffsByBranch := make(map[string]string, len(byBranch)) - for branch, diffs := range byBranch { - printed, err := diff.PrintMultiFileDiff(diffs) - if err != nil { - return nil, errors.Wrap(err, "printing multi file diff failed") - } - finalDiffsByBranch[branch] = string(printed) - } - return finalDiffsByBranch, nil -} diff --git a/internal/batches/executor/executor_test.go b/internal/batches/executor/executor_test.go index 83059a2bf0..6547d88a59 100644 --- a/internal/batches/executor/executor_test.go +++ b/internal/batches/executor/executor_test.go @@ -435,7 +435,7 @@ output4=integration-test-batch-change`, cache := newInMemoryExecutionCache() creator := workspace.NewCreator(context.Background(), "bind", testTempDir, testTempDir, []batches.Step{}) - opts := Opts{ + opts := NewExecutorOpts{ Cache: cache, Creator: creator, Client: client, diff --git a/internal/batches/service/service.go b/internal/batches/service/service.go index 6f6314a75c..6a3048a165 100644 --- a/internal/batches/service/service.go +++ b/internal/batches/service/service.go @@ -203,7 +203,7 @@ func (svc *Service) InitCache(cacheDir string) { // TODO(mrnugget): This is not good. Ideally the executor wouldn't have to know // anything about the cache. -func (svc *Service) InitExecutor(ctx context.Context, opts executor.Opts) { +func (svc *Service) InitExecutor(ctx context.Context, opts executor.NewExecutorOpts) { opts.Cache = svc.cache opts.Client = svc.client opts.Features = svc.features From 788cdbb0f2c5cdc84c2d87985694713ca5535a09 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Fri, 14 May 2021 10:28:32 +0200 Subject: [PATCH 7/7] Improve caching messages after feedback --- cmd/src/batch_common.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/cmd/src/batch_common.go b/cmd/src/batch_common.go index 9a733a91c3..cada32efb2 100644 --- a/cmd/src/batch_common.go +++ b/cmd/src/batch_common.go @@ -313,10 +313,19 @@ func executeBatchSpec(ctx context.Context, opts executeBatchSpecOpts) error { if err != nil { return err } - if len(uncachedTasks) > 0 { - batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. %d tasks need to be executed", len(cachedSpecs), len(uncachedTasks))) + var specsFoundMessage string + if len(cachedSpecs) == 1 { + specsFoundMessage = "Found 1 cached changeset spec" } else { - batchCompletePending(pending, fmt.Sprintf("Found %d cached changeset specs. No tasks need to be executed", len(cachedSpecs))) + specsFoundMessage = fmt.Sprintf("Found %d cached changeset specs", len(cachedSpecs)) + } + switch len(uncachedTasks) { + case 0: + batchCompletePending(pending, fmt.Sprintf("%s; no tasks need to be executed", specsFoundMessage)) + case 1: + batchCompletePending(pending, fmt.Sprintf("%s; %d task needs to be executed", specsFoundMessage, len(uncachedTasks))) + default: + batchCompletePending(pending, fmt.Sprintf("%s; %d tasks need to be executed", specsFoundMessage, len(uncachedTasks))) } p := newBatchProgressPrinter(opts.out, *verbose, opts.flags.parallelism)