Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/src/campaigns_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,15 @@ func campaignsExecute(ctx context.Context, out *output.Output, svc *campaigns.Se
pending.VerboseLine(output.Linef("🚧", output.StyleSuccess, "Workspace creator: %T", workspaceCreator))
campaignsCompletePending(pending, "Prepared workspaces")

fetcher := svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives)
for _, task := range tasks {
fetcher.MarkForLaterUse(task.Repository)
}

opts := campaigns.ExecutorOpts{
Cache: svc.NewExecutionCache(flags.cacheDir),
Creator: workspaceCreator,
RepoFetcher: svc.NewRepoFetcher(flags.cacheDir, flags.cleanArchives),
RepoFetcher: fetcher,
ClearCache: flags.clearCache,
KeepLogs: flags.keepLogs,
Timeout: flags.timeout,
Expand Down
77 changes: 47 additions & 30 deletions internal/campaigns/repo_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
// RepoFetcher abstracts the process of retrieving an archive for the given
// repository.
type RepoFetcher interface {
// MarkForLaterUse needs to be called before Fetch so that RepoFetcher can
// register how many tasks will want to use the same archive and not delete
// it prematurely.
MarkForLaterUse(*graphql.Repository)

// Fetch must retrieve the given repository and return it as a RepoZip.
// This will generally imply that the file should be written to a temporary
// location on the filesystem.
Expand All @@ -37,36 +42,15 @@ type repoFetcher struct {

var _ RepoFetcher = &repoFetcher{}

// RepoZip implementations represent a downloaded repository archive.
type RepoZip interface {
// Close must finalise the downloaded archive. If one or more temporary
// files were created, they should be deleted here.
Close() error

// Path must return the path to the archive on the filesystem.
Path() string
}

// repoZip is the concrete implementation of the RepoZip interface used outside
// of tests.
type repoZip struct {
path string
fetcher *repoFetcher

mu sync.Mutex
references int
}

var _ RepoZip = &repoZip{}

func (rf *repoFetcher) zipFor(path string) *repoZip {
func (rf *repoFetcher) zipFor(repo *graphql.Repository) *repoZip {
rf.zipsMu.Lock()
defer rf.zipsMu.Unlock()

if rf.zips == nil {
rf.zips = make(map[string]*repoZip)
}

path := filepath.Join(rf.dir, repo.Slug()+".zip")
zip, ok := rf.zips[path]
if !ok {
zip = &repoZip{path: path, fetcher: rf}
Expand All @@ -75,16 +59,23 @@ func (rf *repoFetcher) zipFor(path string) *repoZip {
return zip
}

func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) {
path := filepath.Join(rf.dir, repo.Slug()+".zip")
func (rf *repoFetcher) MarkForLaterUse(repo *graphql.Repository) {
zip := rf.zipFor(repo)
zip.mu.Lock()
defer zip.mu.Unlock()

zip.marks += 1
}

zip := rf.zipFor(path)
func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (RepoZip, error) {
zip := rf.zipFor(repo)
zip.mu.Lock()
defer zip.mu.Unlock()

// Someone already fetched it
if zip.references > 0 {
zip.references += 1
zip.marks -= 1
return zip, nil
}

Expand All @@ -98,31 +89,57 @@ func (rf *repoFetcher) Fetch(ctx context.Context, repo *graphql.Repository) (Rep
// giving us a temporary place on the filesystem to keep the archive.
// Since it's never mounted into the containers being run, we can keep
// these directories 0700 without issue.
if err := os.MkdirAll(filepath.Dir(path), 0700); err != nil {
if err := os.MkdirAll(filepath.Dir(zip.path), 0700); err != nil {
return nil, err
}

err = fetchRepositoryArchive(ctx, rf.client, repo, path)
err = fetchRepositoryArchive(ctx, rf.client, repo, zip.path)
if err != nil {
// If the context got cancelled, or we ran out of disk space, or ...
// while we were downloading the file, we remove the partially
// downloaded file.
os.Remove(path)
os.Remove(zip.path)

return nil, errors.Wrap(err, "fetching ZIP archive")
}
}

zip.references += 1
zip.marks -= 1
return zip, nil
}

// RepoZip implementations represent a downloaded repository archive.
type RepoZip interface {
// Close must finalise the downloaded archive. If one or more temporary
// files were created, they should be deleted here.
Close() error

// Path must return the path to the archive on the filesystem.
Path() string
}

var _ RepoZip = &repoZip{}

// repoZip is the concrete implementation of the RepoZip interface used outside
// of tests.
type repoZip struct {
path string
fetcher *repoFetcher

mu sync.Mutex
// references is the number of *active* tasks that currently use the archive.
references int
// marks is the number of tasks that *will* make use of the archive.
marks int
}

func (rz *repoZip) Close() error {
rz.mu.Lock()
defer rz.mu.Unlock()

rz.references -= 1
if rz.references == 0 && rz.fetcher.deleteZips {
if rz.references == 0 && rz.marks == 0 && rz.fetcher.deleteZips {
return os.Remove(rz.path)
}

Expand Down
5 changes: 5 additions & 0 deletions internal/campaigns/repo_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func TestRepoFetcher_Fetch(t *testing.T) {
deleteZips: false,
}

// We're going to call Fetch three times
for i := 0; i < 3; i++ {
rf.MarkForLaterUse(repo)
}

rz, err := rf.Fetch(context.Background(), repo)
if err != nil {
t.Errorf("unexpected error: %s", err)
Expand Down