diff --git a/cmd/src/campaigns_common.go b/cmd/src/campaigns_common.go index 2064946ffc..af2a87c745 100644 --- a/cmd/src/campaigns_common.go +++ b/cmd/src/campaigns_common.go @@ -235,10 +235,15 @@ func campaignsExecute(ctx context.Context, out *output.Output, svc *campaigns.Se pending.VerboseLine(output.Linef("🚧", output.StyleSuccess, "Workspace creator: %T", workspaceCreator)) campaignsCompletePending(pending, "Prepared workspaces") + fetcher := svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives) + for _, task := range tasks { + fetcher.MarkForLaterUse(task.Repository) + } + opts := campaigns.ExecutorOpts{ Cache: svc.NewExecutionCache(flags.cacheDir), Creator: workspaceCreator, - RepoFetcher: svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives), + RepoFetcher: fetcher, ClearCache: flags.clearCache, KeepLogs: flags.keepLogs, Timeout: flags.timeout, diff --git a/internal/campaigns/repo_fetcher.go b/internal/campaigns/repo_fetcher.go index 240f7b9ed8..1661533781 100644 --- a/internal/campaigns/repo_fetcher.go +++ b/internal/campaigns/repo_fetcher.go @@ -18,6 +18,11 @@ import ( // RepoFetcher abstracts the process of retrieving an archive for the given // repository. type RepoFetcher interface { + // MarkForLaterUse needs to be called before Fetch so that RepoFetcher can + // register how many tasks will want to use the same archive and not delete + // it prematurely. + MarkForLaterUse(*graphql.Repository) + // Fetch must retrieve the given repository and return it as a RepoZip. // This will generally imply that the file should be written to a temporary // location on the filesystem. @@ -37,29 +42,7 @@ type repoFetcher struct { var _ RepoFetcher = &repoFetcher{} -// RepoZip implementations represent a downloaded repository archive. -type RepoZip interface { - // Close must finalise the downloaded archive. If one or more temporary - // files were created, they should be deleted here. - Close() error - - // Path must return the path to the archive on the filesystem. - Path() string -} - -// repoZip is the concrete implementation of the RepoZip interface used outside -// of tests. -type repoZip struct { - path string - fetcher *repoFetcher - - mu sync.Mutex - references int -} - -var _ RepoZip = &repoZip{} - -func (rf *repoFetcher) zipFor(path string) *repoZip { +func (rf *repoFetcher) zipFor(repo *graphql.Repository) *repoZip { rf.zipsMu.Lock() defer rf.zipsMu.Unlock() @@ -67,6 +50,7 @@ func (rf *repoFetcher) zipFor(path string) *repoZip { rf.zips = make(map[string]*repoZip) } + path := filepath.Join(rf.dir, repo.Slug()+".zip") zip, ok := rf.zips[path] if !ok { zip = &repoZip{path: path, fetcher: rf} @@ -75,16 +59,23 @@ func (rf *repoFetcher) zipFor(path string) *repoZip { return zip } -func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) { - path := filepath.Join(rf.dir, repo.Slug()+".zip") +func (rf *repoFetcher) MarkForLaterUse(repo *graphql.Repository) { + zip := rf.zipFor(repo) + zip.mu.Lock() + defer zip.mu.Unlock() + + zip.marks += 1 +} - zip := rf.zipFor(path) +func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) { + zip := rf.zipFor(repo) zip.mu.Lock() defer zip.mu.Unlock() // Someone already fetched it if zip.references > 0 { zip.references += 1 + zip.marks -= 1 return zip, nil } @@ -98,31 +89,57 @@ func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (Rep // giving us a temporary place on the filesystem to keep the archive. // Since it's never mounted into the containers being run, we can keep // these directories 0700 without issue. - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + if err := os.MkdirAll(filepath.Dir(zip.path), 0700); err != nil { return nil, err } - err = fetchRepositoryArchive(ctx, rf.client, repo, path) + err = fetchRepositoryArchive(ctx, rf.client, repo, zip.path) if err != nil { // If the context got cancelled, or we ran out of disk space, or ... // while we were downloading the file, we remove the partially // downloaded file. - os.Remove(path) + os.Remove(zip.path) return nil, errors.Wrap(err, "fetching ZIP archive") } } zip.references += 1 + zip.marks -= 1 return zip, nil } +// RepoZip implementations represent a downloaded repository archive. +type RepoZip interface { + // Close must finalise the downloaded archive. If one or more temporary + // files were created, they should be deleted here. + Close() error + + // Path must return the path to the archive on the filesystem. + Path() string +} + +var _ RepoZip = &repoZip{} + +// repoZip is the concrete implementation of the RepoZip interface used outside +// of tests. +type repoZip struct { + path string + fetcher *repoFetcher + + mu sync.Mutex + // references is the number of *active* tasks that currently use the archive. + references int + // marks is the number of tasks that *will* make use of the archive. + marks int +} + func (rz *repoZip) Close() error { rz.mu.Lock() defer rz.mu.Unlock() rz.references -= 1 - if rz.references == 0 && rz.fetcher.deleteZips { + if rz.references == 0 && rz.marks == 0 && rz.fetcher.deleteZips { return os.Remove(rz.path) } diff --git a/internal/campaigns/repo_fetcher_test.go b/internal/campaigns/repo_fetcher_test.go index 0fbda28614..63dd4f69f2 100644 --- a/internal/campaigns/repo_fetcher_test.go +++ b/internal/campaigns/repo_fetcher_test.go @@ -58,6 +58,11 @@ func TestRepoFetcher_Fetch(t *testing.T) { deleteZips: false, } + // We're going to call Fetch three times + for i := 0; i < 3; i++ { + rf.MarkForLaterUse(repo) + } + rz, err := rf.Fetch(context.Background(), repo) if err != nil { t.Errorf("unexpected error: %s", err)