From f9ba72108b931280c7cf582736811ede2b8aae34 Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Mon, 1 Feb 2021 14:59:13 +0100 Subject: [PATCH 1/2] Keep repo archives cached for later executed tasks This is a follow-up to fix the issue discovered by @eseliger here: https://github.com/sourcegraph/src-cli/pull/442#discussion_r566840914 Short version: the previous implementation would only avoid deleting an archive if there were *currently active tasks* holding references to it. If tasks that need the same archive would execute sequentially, though, the archive would be downloaded, deleted, downloaded again. This here is a fix for the issue by first marking all repository archives for later use and only once all marks have been turned into references and those references have been closed is the archive deleted. --- cmd/src/campaigns_common.go | 7 ++- internal/campaigns/repo_fetcher.go | 77 ++++++++++++++++++------------ 2 files changed, 53 insertions(+), 31 deletions(-) diff --git a/cmd/src/campaigns_common.go b/cmd/src/campaigns_common.go index 2064946ffc..af2a87c745 100644 --- a/cmd/src/campaigns_common.go +++ b/cmd/src/campaigns_common.go @@ -235,10 +235,15 @@ func campaignsExecute(ctx context.Context, out *output.Output, svc *campaigns.Se pending.VerboseLine(output.Linef("🚧", output.StyleSuccess, "Workspace creator: %T", workspaceCreator)) campaignsCompletePending(pending, "Prepared workspaces") + fetcher := svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives) + for _, task := range tasks { + fetcher.MarkForLaterUse(task.Repository) + } + opts := campaigns.ExecutorOpts{ Cache: svc.NewExecutionCache(flags.cacheDir), Creator: workspaceCreator, - RepoFetcher: svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives), + RepoFetcher: fetcher, ClearCache: flags.clearCache, KeepLogs: flags.keepLogs, Timeout: flags.timeout, diff --git a/internal/campaigns/repo_fetcher.go b/internal/campaigns/repo_fetcher.go index 240f7b9ed8..1661533781 100644 --- a/internal/campaigns/repo_fetcher.go +++ b/internal/campaigns/repo_fetcher.go @@ -18,6 +18,11 @@ import ( // RepoFetcher abstracts the process of retrieving an archive for the given // repository. type RepoFetcher interface { + // MarkForLaterUse needs to be called before Fetch so that RepoFetcher can + // register how many tasks will want to use the same archive and not delete + // it prematurely. + MarkForLaterUse(*graphql.Repository) + // Fetch must retrieve the given repository and return it as a RepoZip. // This will generally imply that the file should be written to a temporary // location on the filesystem. @@ -37,29 +42,7 @@ type repoFetcher struct { var _ RepoFetcher = &repoFetcher{} -// RepoZip implementations represent a downloaded repository archive. -type RepoZip interface { - // Close must finalise the downloaded archive. If one or more temporary - // files were created, they should be deleted here. - Close() error - - // Path must return the path to the archive on the filesystem. - Path() string -} - -// repoZip is the concrete implementation of the RepoZip interface used outside -// of tests. -type repoZip struct { - path string - fetcher *repoFetcher - - mu sync.Mutex - references int -} - -var _ RepoZip = &repoZip{} - -func (rf *repoFetcher) zipFor(path string) *repoZip { +func (rf *repoFetcher) zipFor(repo *graphql.Repository) *repoZip { rf.zipsMu.Lock() defer rf.zipsMu.Unlock() @@ -67,6 +50,7 @@ func (rf *repoFetcher) zipFor(path string) *repoZip { rf.zips = make(map[string]*repoZip) } + path := filepath.Join(rf.dir, repo.Slug()+".zip") zip, ok := rf.zips[path] if !ok { zip = &repoZip{path: path, fetcher: rf} @@ -75,16 +59,23 @@ func (rf *repoFetcher) zipFor(path string) *repoZip { return zip } -func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) { - path := filepath.Join(rf.dir, repo.Slug()+".zip") +func (rf *repoFetcher) MarkForLaterUse(repo *graphql.Repository) { + zip := rf.zipFor(repo) + zip.mu.Lock() + defer zip.mu.Unlock() + + zip.marks += 1 +} - zip := rf.zipFor(path) +func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) { + zip := rf.zipFor(repo) zip.mu.Lock() defer zip.mu.Unlock() // Someone already fetched it if zip.references > 0 { zip.references += 1 + zip.marks -= 1 return zip, nil } @@ -98,31 +89,57 @@ func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (Rep // giving us a temporary place on the filesystem to keep the archive. // Since it's never mounted into the containers being run, we can keep // these directories 0700 without issue. - if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil { + if err := os.MkdirAll(filepath.Dir(zip.path), 0700); err != nil { return nil, err } - err = fetchRepositoryArchive(ctx, rf.client, repo, path) + err = fetchRepositoryArchive(ctx, rf.client, repo, zip.path) if err != nil { // If the context got cancelled, or we ran out of disk space, or ... // while we were downloading the file, we remove the partially // downloaded file. - os.Remove(path) + os.Remove(zip.path) return nil, errors.Wrap(err, "fetching ZIP archive") } } zip.references += 1 + zip.marks -= 1 return zip, nil } +// RepoZip implementations represent a downloaded repository archive. +type RepoZip interface { + // Close must finalise the downloaded archive. If one or more temporary + // files were created, they should be deleted here. + Close() error + + // Path must return the path to the archive on the filesystem. + Path() string +} + +var _ RepoZip = &repoZip{} + +// repoZip is the concrete implementation of the RepoZip interface used outside +// of tests. +type repoZip struct { + path string + fetcher *repoFetcher + + mu sync.Mutex + // references is the number of *active* tasks that currently use the archive. + references int + // marks is the number of tasks that *will* make use of the archive. + marks int +} + func (rz *repoZip) Close() error { rz.mu.Lock() defer rz.mu.Unlock() rz.references -= 1 - if rz.references == 0 && rz.fetcher.deleteZips { + if rz.references == 0 && rz.marks == 0 && rz.fetcher.deleteZips { return os.Remove(rz.path) } From 4298b1a8b98083a9a0f040c67e2c74e459d5b4af Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Mon, 1 Feb 2021 15:07:51 +0100 Subject: [PATCH 2/2] Fix RepoFetcher tests --- internal/campaigns/repo_fetcher_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/campaigns/repo_fetcher_test.go b/internal/campaigns/repo_fetcher_test.go index 0fbda28614..63dd4f69f2 100644 --- a/internal/campaigns/repo_fetcher_test.go +++ b/internal/campaigns/repo_fetcher_test.go @@ -58,6 +58,11 @@ func TestRepoFetcher_Fetch(t *testing.T) { deleteZips: false, } + // We're going to call Fetch three times + for i := 0; i < 3; i++ { + rf.MarkForLaterUse(repo) + } + rz, err := rf.Fetch(context.Background(), repo) if err != nil { t.Errorf("unexpected error: %s", err)