Skip to content

Commit

Permalink
batches: Cache filesystem mount (#795)
Browse files Browse the repository at this point in the history
  • Loading branch information
Piszmog committed Jun 29, 2022
1 parent a612679 commit d43a1fc
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 64 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -13,6 +13,8 @@ All notable changes to `src-cli` are documented in this file.

### Added

- Batch specs that mount paths now cache results. [sourcegraph/sourcegraph#37216](https://github.com/sourcegraph/sourcegraph/issues/37216)

### Changed

### Fixed
Expand Down
14 changes: 7 additions & 7 deletions cmd/src/batch_common.go
Expand Up @@ -366,13 +366,13 @@ func executeBatchSpec(ctx context.Context, ui ui.ExecUI, opts executeBatchSpecOp
archiveRegistry,
log.NewDiskManager(opts.flags.tempDir, opts.flags.keepLogs),
executor.NewCoordinatorOpts{
Creator: workspaceCreator,
Cache: executor.NewDiskCache(opts.flags.cacheDir),
Parallelism: parallelism,
Timeout: opts.flags.timeout,
TempDir: opts.flags.tempDir,
GlobalEnv: os.Environ(),
AllowPathMounts: true,
Creator: workspaceCreator,
Cache: executor.NewDiskCache(opts.flags.cacheDir),
Parallelism: parallelism,
Timeout: opts.flags.timeout,
TempDir: opts.flags.tempDir,
GlobalEnv: os.Environ(),
IsRemote: false,
},
)

Expand Down
2 changes: 1 addition & 1 deletion cmd/src/batch_exec.go
Expand Up @@ -224,7 +224,7 @@ func executeBatchSpecInWorkspaces(ctx context.Context, flags *executorModeFlags)
// Don't allow to read from env.
GlobalEnv: []string{},
// Temporarily prevent the ability to sending a batch spec with a mount for server-side processing.
AllowPathMounts: false,
IsRemote: true,
},
)

Expand Down
6 changes: 3 additions & 3 deletions go.mod
Expand Up @@ -20,7 +20,7 @@ require (
github.com/sourcegraph/go-diff v0.6.1
github.com/sourcegraph/jsonx v0.0.0-20200629203448-1a936bd500cf
github.com/sourcegraph/scip v0.1.0
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220614233452-81dc06f6e96e
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220624142708-f2b0579b57ff
github.com/stretchr/testify v1.7.2
golang.org/x/net v0.0.0-20220526153639-5463443f8c37
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
Expand Down Expand Up @@ -83,7 +83,7 @@ require (
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rogpeppe/go-internal v1.8.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/sourcegraph/log v0.0.0-20220613150728-bb50c87ba841 // indirect
github.com/sourcegraph/log v0.0.0-20220621231153-3bee7082c87e // indirect
github.com/spf13/cobra v1.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf // indirect
Expand All @@ -98,7 +98,7 @@ require (
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098 // indirect
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 // indirect
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.11 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Expand Up @@ -347,12 +347,12 @@ github.com/sourcegraph/go-diff v0.6.1 h1:hmA1LzxW0n1c3Q4YbrFgg4P99GSnebYa3x8gr0H
github.com/sourcegraph/go-diff v0.6.1/go.mod h1:iBszgVvyxdc8SFZ7gm69go2KDdt3ag071iBaWPF6cjs=
github.com/sourcegraph/jsonx v0.0.0-20200629203448-1a936bd500cf h1:oAdWFqhStsWiiMP/vkkHiMXqFXzl1XfUNOdxKJbd6bI=
github.com/sourcegraph/jsonx v0.0.0-20200629203448-1a936bd500cf/go.mod h1:ppFaPm6kpcHnZGqQTFhUIAQRIEhdQDWP1PCv4/ON354=
github.com/sourcegraph/log v0.0.0-20220613150728-bb50c87ba841 h1:kiYxuyQ1zSNA4YPtVVJQAFw5ZRNaFRkFqgKyEJQTzhg=
github.com/sourcegraph/log v0.0.0-20220613150728-bb50c87ba841/go.mod h1:A+9F6IicYvBbl2aT0R81lMraKcXjVfdfw352yPe2yJI=
github.com/sourcegraph/log v0.0.0-20220621231153-3bee7082c87e h1:7MnFFZ85BBwLNDkrQJB503/znGuSoLirDLFWRcLqHlM=
github.com/sourcegraph/log v0.0.0-20220621231153-3bee7082c87e/go.mod h1:A+9F6IicYvBbl2aT0R81lMraKcXjVfdfw352yPe2yJI=
github.com/sourcegraph/scip v0.1.0 h1:kTs0CJaLQvcRZjg+HpGrcJPNX2Tx31+d6szWio3ZOkQ=
github.com/sourcegraph/scip v0.1.0/go.mod h1:/AZ8RvsnRfeCZy232PJuVZqcl9f82fJnYwdWZeU2JCo=
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220614233452-81dc06f6e96e h1:O+XG50CIAdzjuELX2gHhkc9Br1AaAXaOkZWg1YSRFpQ=
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220614233452-81dc06f6e96e/go.mod h1:Orrt+5wdseAvxsVxgdswYPzJgVkTk0rLlmFHZW61epo=
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220624142708-f2b0579b57ff h1:lcDmWfcg43tON2jMtpxtEq61wMF6rIgvbx7KfTKA6TU=
github.com/sourcegraph/sourcegraph/lib v0.0.0-20220624142708-f2b0579b57ff/go.mod h1:ppU4qV7WAmfjEoQhM7P1hCDWmv1Q6lIy5mT9o3Pj5t8=
github.com/sourcegraph/yaml v1.0.1-0.20200714132230-56936252f152 h1:z/MpntplPaW6QW95pzcAR/72Z5TWDyDnSo0EOcyij9o=
github.com/sourcegraph/yaml v1.0.1-0.20200714132230-56936252f152/go.mod h1:GIjDIg/heH5DOkXY3YJ/wNhfHsQHoXGjl8G8amsYQ1I=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
Expand Down Expand Up @@ -503,8 +503,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220209214540-3681064d5158/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098 h1:PgOr27OhUx2IRqGJ2RxAWI4dJQ7bi9cSrB82uzFzfUA=
golang.org/x/sys v0.0.0-20220614162138-6c1b26c55098/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664 h1:wEZYwx+kK+KlZ0hpvP2Ls1Xr4+RWnlzGFwPP0aiDjIU=
golang.org/x/sys v0.0.0-20220622161953-175b2fd9d664/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171 h1:EH1Deb8WZJ0xc0WK//leUHXcX9aLE5SymusoTmMZye8=
golang.org/x/term v0.0.0-20220411215600-e5f449aeb171/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down
51 changes: 17 additions & 34 deletions internal/batches/executor/coordinator.go
Expand Up @@ -49,10 +49,10 @@ type NewCoordinatorOpts struct {
// Used by batcheslib.BuildChangesetSpecs
Features batches.FeatureFlags

Parallelism int
Timeout time.Duration
TempDir string
AllowPathMounts bool
Parallelism int
Timeout time.Duration
TempDir string
IsRemote bool
}

func NewCoordinator(opts NewCoordinatorOpts, logger log.LogManager) *Coordinator {
Expand All @@ -62,19 +62,13 @@ func NewCoordinator(opts NewCoordinatorOpts, logger log.LogManager) *Coordinator
Creator: opts.Creator,
Logger: logger,

Parallelism: opts.Parallelism,
Timeout: opts.Timeout,
TempDir: opts.TempDir,
AllowPathMounts: opts.AllowPathMounts,
GlobalEnv: opts.GlobalEnv,
Parallelism: opts.Parallelism,
Timeout: opts.Timeout,
TempDir: opts.TempDir,
IsRemote: opts.IsRemote,
GlobalEnv: opts.GlobalEnv,
WriteStepCacheResult: func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error {
// Temporarily skip writing to the cache if a mount is present
for _, step := range task.Steps {
if len(step.Mount) > 0 {
return nil
}
}
cacheKey := task.cacheKey(opts.GlobalEnv)
cacheKey := task.cacheKey(opts.GlobalEnv, opts.IsRemote)
return writeToCache(ctx, opts.Cache, stepResult, task, cacheKey)
},
})
Expand Down Expand Up @@ -121,7 +115,7 @@ func (c *Coordinator) CheckStepResultsCache(ctx context.Context, tasks []*Task,

func (c *Coordinator) ClearCache(ctx context.Context, tasks []*Task) error {
for _, task := range tasks {
cacheKey := task.cacheKey(c.opts.GlobalEnv)
cacheKey := task.cacheKey(c.opts.GlobalEnv, c.opts.IsRemote)
if err := c.cache.Clear(ctx, cacheKey); err != nil {
return errors.Wrapf(err, "clearing cache for %q", task.Repository.Name)
}
Expand All @@ -137,7 +131,7 @@ func (c *Coordinator) ClearCache(ctx context.Context, tasks []*Task) error {

func (c *Coordinator) checkCacheForTask(ctx context.Context, batchSpec *batcheslib.BatchSpec, task *Task) (specs []*batcheslib.ChangesetSpec, found bool, err error) {
// Check if the task is cached.
cacheKey := task.cacheKey(c.opts.GlobalEnv)
cacheKey := task.cacheKey(c.opts.GlobalEnv, c.opts.IsRemote)

var result execution.Result
result, found, err = c.cache.Get(ctx, cacheKey)
Expand Down Expand Up @@ -201,7 +195,7 @@ func (c Coordinator) buildChangesetSpecs(task *Task, batchSpec *batcheslib.Batch
func (c *Coordinator) loadCachedStepResults(ctx context.Context, task *Task, globalEnv []string) error {
// We start at the back so that we can find the _last_ cached step,
// then restart execution on the following step.
taskKey := task.cacheKey(globalEnv)
taskKey := task.cacheKey(globalEnv, c.opts.IsRemote)
for i := len(task.Steps) - 1; i > -1; i-- {
key := cacheKeyForStep(taskKey, i)

Expand Down Expand Up @@ -230,9 +224,9 @@ func writeToCache(ctx context.Context, cache cache.Cache, stepResult execution.A
return nil
}

func (c *Coordinator) writeExecutionCacheResult(ctx context.Context, taskResult taskResult, ui TaskExecutionUI) error {
func (c *Coordinator) writeExecutionCacheResult(ctx context.Context, taskResult taskResult) error {
// Add to the cache, even if no diff was produced.
cacheKey := taskResult.task.cacheKey(c.opts.GlobalEnv)
cacheKey := taskResult.task.cacheKey(c.opts.GlobalEnv, c.opts.IsRemote)
if err := c.cache.Set(ctx, cacheKey, taskResult.result); err != nil {
return errors.Wrapf(err, "caching result for %q", taskResult.task.Repository.Name)
}
Expand All @@ -241,18 +235,7 @@ func (c *Coordinator) writeExecutionCacheResult(ctx context.Context, taskResult
}

func (c *Coordinator) writeCacheAndBuildSpecs(ctx context.Context, batchSpec *batcheslib.BatchSpec, taskResult taskResult, ui TaskExecutionUI) ([]*batcheslib.ChangesetSpec, error) {
// Temporarily prevent writing to the cache when running a spec with a mount. Caching does not at the moment "know"
// when a file that is being mounted has changed. This causes the execution not to re-run if a mounted file changes.
hasMount := false
for _, step := range batchSpec.Steps {
if len(step.Mount) > 0 {
hasMount = true
break
}
}
if !hasMount {
c.writeExecutionCacheResult(ctx, taskResult, ui)
}
c.writeExecutionCacheResult(ctx, taskResult)

// If the steps didn't result in any diff, we don't need to create a
// changeset spec that's displayed to the user and send to the server.
Expand All @@ -276,7 +259,7 @@ func (c *Coordinator) Execute(ctx context.Context, tasks []*Task, ui TaskExecuti

// Write results to cache.
for _, taskResult := range results {
if cacheErr := c.writeExecutionCacheResult(ctx, taskResult, ui); cacheErr != nil {
if cacheErr := c.writeExecutionCacheResult(ctx, taskResult); cacheErr != nil {
return cacheErr
}
}
Expand Down
4 changes: 2 additions & 2 deletions internal/batches/executor/coordinator_test.go
Expand Up @@ -226,7 +226,7 @@ func TestCoordinator_Execute(t *testing.T) {
},
},
{
name: "skip cache for step mount",
name: "cache for step mount",

tasks: []*Task{srcCLITask, sourcegraphTask},

Expand All @@ -250,7 +250,7 @@ func TestCoordinator_Execute(t *testing.T) {
},
opts: NewCoordinatorOpts{Features: featuresAllEnabled()},

wantCacheEntries: 0,
wantCacheEntries: 2,
wantSpecs: []*batcheslib.ChangesetSpec{
buildSpecFor(testRepo1, func(spec *batcheslib.ChangesetSpec) {
spec.Commits[0].Diff = `dummydiff1`
Expand Down
16 changes: 8 additions & 8 deletions internal/batches/executor/executor.go
Expand Up @@ -63,7 +63,7 @@ type newExecutorOpts struct {
Parallelism int
Timeout time.Duration
TempDir string
AllowPathMounts bool
IsRemote bool
GlobalEnv []string
WriteStepCacheResult func(ctx context.Context, stepResult execution.AfterStepResult, task *Task) error
}
Expand Down Expand Up @@ -175,13 +175,13 @@ func (x *executor) do(ctx context.Context, task *Task, ui TaskExecutionUI) (err

// Actually execute the steps.
opts := &executionOpts{
task: task,
logger: l,
wc: x.opts.Creator,
ensureImage: x.opts.EnsureImage,
tempDir: x.opts.TempDir,
allowPathMounts: x.opts.AllowPathMounts,
globalEnv: x.opts.GlobalEnv,
task: task,
logger: l,
wc: x.opts.Creator,
ensureImage: x.opts.EnsureImage,
tempDir: x.opts.TempDir,
isRemote: x.opts.IsRemote,
globalEnv: x.opts.GlobalEnv,

ui: ui.StepsExecutionUI(task),
writeStepCacheResult: x.opts.WriteStepCacheResult,
Expand Down
4 changes: 2 additions & 2 deletions internal/batches/executor/run_steps.go
Expand Up @@ -38,7 +38,7 @@ type executionOpts struct {

ui StepsExecutionUI

allowPathMounts bool
isRemote bool

globalEnv []string

Expand Down Expand Up @@ -309,7 +309,7 @@ func executeSingleStep(
}

// Temporarily add a guard to prevent a path to mount path for server-side processing
if opts.allowPathMounts {
if !opts.isRemote {
// Mount any paths on the local system to the docker container. The paths have already been validated during parsing
for _, mount := range step.Mount {
args = append(args, "--mount", fmt.Sprintf("type=bind,source=%s,target=%s,ro", mount.Path, mount.Mountpoint))
Expand Down
70 changes: 69 additions & 1 deletion internal/batches/executor/task.go
@@ -1,10 +1,14 @@
package executor

import (
"os"
"path/filepath"

batcheslib "github.com/sourcegraph/sourcegraph/lib/batches"
"github.com/sourcegraph/sourcegraph/lib/batches/execution"
"github.com/sourcegraph/sourcegraph/lib/batches/execution/cache"
"github.com/sourcegraph/sourcegraph/lib/batches/template"
"github.com/sourcegraph/sourcegraph/lib/errors"

"github.com/sourcegraph/src-cli/internal/batches/graphql"
"github.com/sourcegraph/src-cli/internal/batches/repozip"
Expand Down Expand Up @@ -41,7 +45,12 @@ func (t *Task) ArchivePathToFetch() string {
return ""
}

func (t *Task) cacheKey(globalEnv []string) *cache.ExecutionKeyWithGlobalEnv {
func (t *Task) cacheKey(globalEnv []string, isRemote bool) *cache.ExecutionKeyWithGlobalEnv {
var metadataRetriever cache.MetadataRetriever
// If the task is being run locally, set the metadata retrieve to use the filesystem based implementation.
if !isRemote {
metadataRetriever = fileMetadataRetriever{}
}
return &cache.ExecutionKeyWithGlobalEnv{
GlobalEnv: globalEnv,
ExecutionKey: &cache.ExecutionKey{
Expand All @@ -56,10 +65,69 @@ func (t *Task) cacheKey(globalEnv []string) *cache.ExecutionKeyWithGlobalEnv {
OnlyFetchWorkspace: t.OnlyFetchWorkspace,
Steps: t.Steps,
BatchChangeAttributes: t.BatchChangeAttributes,
MetadataRetriever: metadataRetriever,
},
}
}

type fileMetadataRetriever struct {
}

func (f fileMetadataRetriever) Get(steps []batcheslib.Step) ([]cache.MountMetadata, error) {
var mountsMetadata []cache.MountMetadata
for _, step := range steps {
// Build up the metadata for each mount for each step
for _, mount := range step.Mount {
metadata, err := getMountMetadata(mount.Path)
if err != nil {
return nil, err
}
// A mount could be a directory containing multiple files
mountsMetadata = append(mountsMetadata, metadata...)
}
}
return mountsMetadata, nil
}

func getMountMetadata(path string) ([]cache.MountMetadata, error) {
info, err := os.Stat(path)
if errors.Is(err, os.ErrNotExist) {
return nil, errors.Newf("path %s does not exist", path)
} else if err != nil {
return nil, err
}
var metadata []cache.MountMetadata
if info.IsDir() {
dirMetadata, err := getDirectoryMountMetadata(path)
if err != nil {
return nil, err
}
metadata = append(metadata, dirMetadata...)
} else {
metadata = append(metadata, cache.MountMetadata{Path: path, Size: info.Size(), Modified: info.ModTime().UTC()})
}
return metadata, nil
}

func getDirectoryMountMetadata(path string) ([]cache.MountMetadata, error) {
dir, err := os.ReadDir(path)
if err != nil {
return nil, err
}
var metadata []cache.MountMetadata
for _, dirEntry := range dir {
newPath := filepath.Join(path, dirEntry.Name())
// Go back to the very start. Need to get the FileInfo again for the new path and figure out if it is a
// directory or a file.
fileMetadata, err := getMountMetadata(newPath)
if err != nil {
return nil, err
}
metadata = append(metadata, fileMetadata...)
}
return metadata, nil
}

func cacheKeyForStep(key *cache.ExecutionKeyWithGlobalEnv, stepIndex int) *cache.StepsCacheKeyWithGlobalEnv {
return &cache.StepsCacheKeyWithGlobalEnv{
StepsCacheKey: &cache.StepsCacheKey{
Expand Down

0 comments on commit d43a1fc

Please sign in to comment.