From d56406a9184e58d9ef3126002fa7353931ef14cc Mon Sep 17 00:00:00 2001 From: Thorsten Ball Date: Tue, 23 Jun 2020 14:24:34 +0200 Subject: [PATCH] Move action execution related code to campaigns package --- cmd/src/actions_exec.go | 185 ++------------- cmd/src/actions_exec_backend.go | 117 --------- cmd/src/actions_scope_query.go | 5 +- cmd/src/patch_sets_create_from_patches.go | 9 +- go.sum | 1 + internal/campaigns/action.go | 154 ++++++++++++ .../campaigns/execution_cache.go | 44 ++-- internal/campaigns/executor.go | 222 ++++++++++++++++++ .../campaigns/logger.go | 50 ++-- .../campaigns/run_action.go | 118 +--------- 10 files changed, 470 insertions(+), 435 deletions(-) delete mode 100644 cmd/src/actions_exec_backend.go create mode 100644 internal/campaigns/action.go rename cmd/src/actions_cache.go => internal/campaigns/execution_cache.go (53%) create mode 100644 internal/campaigns/executor.go rename cmd/src/actions_exec_logger.go => internal/campaigns/logger.go (85%) rename cmd/src/actions_exec_backend_runner.go => internal/campaigns/run_action.go (69%) diff --git a/cmd/src/actions_exec.go b/cmd/src/actions_exec.go index 52feafd8d5..335bc347fa 100644 --- a/cmd/src/actions_exec.go +++ b/cmd/src/actions_exec.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "bytes" "context" "encoding/json" "flag" @@ -19,43 +18,11 @@ import ( "time" "github.com/fatih/color" - "github.com/hashicorp/go-multierror" "github.com/mattn/go-isatty" "github.com/pkg/errors" - "github.com/sourcegraph/src-cli/schema" - "github.com/xeipuuv/gojsonschema" + "github.com/sourcegraph/src-cli/internal/campaigns" ) -type Action struct { - ScopeQuery string `json:"scopeQuery,omitempty"` - Steps []*ActionStep `json:"steps"` -} - -type ActionStep struct { - Type string `json:"type"` // "command" - Image string `json:"image,omitempty"` // Docker image - CacheDirs []string `json:"cacheDirs,omitempty"` - Args []string `json:"args,omitempty"` - - // ImageContentDigest is an internal field that should not be set by users. - ImageContentDigest string -} - -type PatchInput struct { - Repository string `json:"repository"` - BaseRevision string `json:"baseRevision"` - BaseRef string `json:"baseRef"` - Patch string `json:"patch"` -} - -func userCacheDir() (string, error) { - userCacheDir, err := os.UserCacheDir() - if err != nil { - return "", err - } - return filepath.Join(userCacheDir, "sourcegraph-src"), nil -} - const defaultTimeout = 60 * time.Minute func init() { @@ -132,7 +99,7 @@ Format of the action JSON files: fmt.Println(usage) } - cacheDir, _ := userCacheDir() + cacheDir, _ := campaigns.UserCacheDir() if cacheDir != "" { cacheDir = filepath.Join(cacheDir, "action-exec") } @@ -211,12 +178,12 @@ Format of the action JSON files: } } - err = validateActionDefinition(actionFile) + err = campaigns.ValidateActionDefinition(actionFile) if err != nil { return err } - var action Action + var action campaigns.Action if err := jsonxUnmarshal(string(actionFile), &action); err != nil { return errors.Wrap(err, "invalid JSON action file") } @@ -238,19 +205,21 @@ Format of the action JSON files: os.Exit(2) }() - logger := newActionLogger(*verbose, *keepLogsFlag) + logger := campaigns.NewActionLogger(*verbose, *keepLogsFlag) // Fetch Docker images etc. - err = prepareAction(ctx, action, logger) + err = campaigns.PrepareAction(ctx, action, logger) if err != nil { return errors.Wrap(err, "Failed to prepare action") } - opts := actionExecutorOptions{ - timeout: *timeoutFlag, - keepLogs: *keepLogsFlag, - clearCache: *clearCacheFlag, - cache: actionExecutionDiskCache{dir: *cacheDirFlag}, + opts := campaigns.ExecutorOpts{ + Endpoint: cfg.Endpoint, + AccessToken: cfg.AccessToken, + Timeout: *timeoutFlag, + KeepLogs: *keepLogsFlag, + ClearCache: *clearCacheFlag, + Cache: campaigns.ExecutionDiskCache{Dir: *cacheDirFlag}, } // Query repos over which to run action @@ -264,15 +233,15 @@ Format of the action JSON files: totalSteps := len(repos) * len(action.Steps) logger.Start(totalSteps) - executor := newActionExecutor(action, *parallelismFlag, logger, opts) + executor := campaigns.NewExecutor(action, *parallelismFlag, logger, opts) for _, repo := range repos { - executor.enqueueRepo(repo) + executor.EnqueueRepo(repo) } - go executor.start(ctx) - err = executor.wait() + go executor.Start(ctx) + err = executor.Wait() - patches := executor.allPatches() + patches := executor.AllPatches() if len(patches) == 0 { // We call os.Exit because we don't want to return the error // and have it printed. @@ -344,115 +313,7 @@ Format of the action JSON files: }) } -func formatValidationErrs(es []error) string { - points := make([]string, len(es)) - for i, err := range es { - points[i] = fmt.Sprintf("- %s", err) - } - - return fmt.Sprintf( - "Validating action definition failed:\n%s\n", - strings.Join(points, "\n")) -} - -func validateActionDefinition(def []byte) error { - sl := gojsonschema.NewSchemaLoader() - sc, err := sl.Compile(gojsonschema.NewStringLoader(schema.ActionSchemaJSON)) - if err != nil { - return errors.Wrapf(err, "failed to compile actions schema") - } - - normalized, err := jsonxToJSON(string(def)) - if err != nil { - return err - } - - res, err := sc.Validate(gojsonschema.NewBytesLoader(normalized)) - if err != nil { - return errors.Wrap(err, "failed to validate config against schema") - } - - errs := &multierror.Error{ErrorFormat: formatValidationErrs} - for _, err := range res.Errors() { - e := err.String() - // Remove `(root): ` from error formatting since these errors are - // presented to users. - e = strings.TrimPrefix(e, "(root): ") - errs = multierror.Append(errs, errors.New(e)) - } - - return errs.ErrorOrNil() -} - -func prepareAction(ctx context.Context, action Action, logger *actionLogger) error { - // Build any Docker images. - for _, step := range action.Steps { - if step.Type == "docker" { - // Set digests for Docker images so we don't cache action runs in 2 different images with - // the same tag. - var err error - step.ImageContentDigest, err = getDockerImageContentDigest(ctx, step.Image, logger) - if err != nil { - return errors.Wrap(err, "Failed to get Docker image content digest") - } - } - } - - return nil -} - -// getDockerImageContentDigest gets the content digest for the image. Note that this -// is different from the "distribution digest" (which is what you can use to specify -// an image to `docker run`, as in `my/image@sha256:xxx`). We need to use the -// content digest because the distribution digest is only computed for images that -// have been pulled from or pushed to a registry. See -// https://windsock.io/explaining-docker-image-ids/ under "A Final Twist" for a good -// explanation. -func getDockerImageContentDigest(ctx context.Context, image string, logger *actionLogger) (string, error) { - // TODO!(sqs): is image id the right thing to use here? it is NOT the - // digest. but the digest is not calculated for all images (unless they are - // pulled/pushed from/to a registry), see - // https://github.com/moby/moby/issues/32016. - out, err := exec.CommandContext(ctx, "docker", "image", "inspect", "--format", "{{.Id}}", "--", image).CombinedOutput() - if err != nil { - if !strings.Contains(string(out), "No such image") { - return "", fmt.Errorf("error inspecting docker image %q: %s", image, bytes.TrimSpace(out)) - } - logger.Infof("Pulling Docker image %q...\n", image) - pullCmd := exec.CommandContext(ctx, "docker", "image", "pull", image) - prefix := fmt.Sprintf("docker image pull %s", image) - pullCmd.Stdout = logger.InfoPipe(prefix) - pullCmd.Stderr = logger.ErrorPipe(prefix) - - err = pullCmd.Start() - if err != nil { - return "", fmt.Errorf("error pulling docker image %q: %s", image, err) - } - err = pullCmd.Wait() - if err != nil { - return "", fmt.Errorf("error pulling docker image %q: %s", image, err) - } - } - out, err = exec.CommandContext(ctx, "docker", "image", "inspect", "--format", "{{.Id}}", "--", image).CombinedOutput() - // This time, the image MUST be present, so the issue must be something else. - if err != nil { - return "", fmt.Errorf("error inspecting docker image %q: %s", image, bytes.TrimSpace(out)) - } - id := string(bytes.TrimSpace(out)) - if id == "" { - return "", fmt.Errorf("unexpected empty docker image content ID for %q", image) - } - return id, nil -} - -type ActionRepo struct { - ID string - Name string - Rev string - BaseRef string -} - -func actionRepos(ctx context.Context, scopeQuery string, includeUnsupported bool, logger *actionLogger) ([]ActionRepo, error) { +func actionRepos(ctx context.Context, scopeQuery string, includeUnsupported bool, logger *campaigns.ActionLogger) ([]campaigns.ActionRepo, error) { hasCount, err := regexp.MatchString(`count:\d+`, scopeQuery) if err != nil { return nil, err @@ -563,7 +424,7 @@ fragment repositoryFields on Repository { skipped := []string{} unsupported := []string{} - reposByID := map[string]ActionRepo{} + reposByID := map[string]campaigns.ActionRepo{} for _, searchResult := range result.Data.Search.Results.Results { var repo Repository @@ -595,7 +456,7 @@ fragment repositoryFields on Repository { } if _, ok := reposByID[repo.ID]; !ok { - reposByID[repo.ID] = ActionRepo{ + reposByID[repo.ID] = campaigns.ActionRepo{ ID: repo.ID, Name: repo.Name, Rev: repo.DefaultBranch.Target.OID, @@ -604,7 +465,7 @@ fragment repositoryFields on Repository { } } - repos := make([]ActionRepo, 0, len(reposByID)) + repos := make([]campaigns.ActionRepo, 0, len(reposByID)) for _, repo := range reposByID { repos = append(repos, repo) } @@ -640,6 +501,8 @@ fragment repositoryFields on Repository { return repos, nil } +var yellow = color.New(color.FgYellow) + func isGitAvailable() bool { cmd := exec.Command("git", "version") if err := cmd.Run(); err != nil { diff --git a/cmd/src/actions_exec_backend.go b/cmd/src/actions_exec_backend.go deleted file mode 100644 index 4509cd3bd7..0000000000 --- a/cmd/src/actions_exec_backend.go +++ /dev/null @@ -1,117 +0,0 @@ -package main - -import ( - "context" - "sync" - "time" - - "github.com/neelance/parallel" -) - -type actionExecutorOptions struct { - keepLogs bool - timeout time.Duration - - clearCache bool - cache actionExecutionCache -} - -type actionExecutor struct { - action Action - opt actionExecutorOptions - - reposMu sync.Mutex - repos map[ActionRepo]ActionRepoStatus - - par *parallel.Run - doneEnqueuing chan struct{} - - logger *actionLogger -} - -func newActionExecutor(action Action, parallelism int, logger *actionLogger, opt actionExecutorOptions) *actionExecutor { - if opt.cache == nil { - opt.cache = actionExecutionNoOpCache{} - } - - return &actionExecutor{ - action: action, - opt: opt, - repos: map[ActionRepo]ActionRepoStatus{}, - par: parallel.NewRun(parallelism), - logger: logger, - - doneEnqueuing: make(chan struct{}), - } -} - -func (x *actionExecutor) enqueueRepo(repo ActionRepo) { - x.updateRepoStatus(repo, ActionRepoStatus{EnqueuedAt: time.Now()}) -} - -func (x *actionExecutor) updateRepoStatus(repo ActionRepo, status ActionRepoStatus) { - x.reposMu.Lock() - defer x.reposMu.Unlock() - - // Perform delta update. - prev := x.repos[repo] - if status.LogFile == "" { - status.LogFile = prev.LogFile - } - if status.EnqueuedAt.IsZero() { - status.EnqueuedAt = prev.EnqueuedAt - } - if status.StartedAt.IsZero() { - status.StartedAt = prev.StartedAt - } - if status.FinishedAt.IsZero() { - status.FinishedAt = prev.FinishedAt - } - if status.Patch == (PatchInput{}) { - status.Patch = prev.Patch - } - if status.Err == nil { - status.Err = prev.Err - } - - x.repos[repo] = status -} - -func (x *actionExecutor) allPatches() []PatchInput { - patches := make([]PatchInput, 0, len(x.repos)) - x.reposMu.Lock() - defer x.reposMu.Unlock() - for _, status := range x.repos { - if patch := status.Patch; patch != (PatchInput{}) && status.Err == nil { - patches = append(patches, status.Patch) - } - } - return patches -} - -func (x *actionExecutor) start(ctx context.Context) { - x.reposMu.Lock() - allRepos := make([]ActionRepo, 0, len(x.repos)) - for repo := range x.repos { - allRepos = append(allRepos, repo) - } - x.reposMu.Unlock() - - for _, repo := range allRepos { - x.par.Acquire() - go func(repo ActionRepo) { - defer x.par.Release() - err := x.do(ctx, repo) - if err != nil { - x.par.Error(err) - } - }(repo) - } - - close(x.doneEnqueuing) -} - -func (x *actionExecutor) wait() error { - <-x.doneEnqueuing - return x.par.Wait() -} diff --git a/cmd/src/actions_scope_query.go b/cmd/src/actions_scope_query.go index 9bc24422d0..475d26463c 100644 --- a/cmd/src/actions_scope_query.go +++ b/cmd/src/actions_scope_query.go @@ -9,6 +9,7 @@ import ( "os" "github.com/pkg/errors" + "github.com/sourcegraph/src-cli/internal/campaigns" ) func init() { @@ -52,7 +53,7 @@ Examples: return err } - var action Action + var action campaigns.Action if err := jsonxUnmarshal(string(actionFile), &action); err != nil { return errors.Wrap(err, "invalid JSON action file") } @@ -67,7 +68,7 @@ Examples: } } - logger := newActionLogger(*verbose, false) + logger := campaigns.NewActionLogger(*verbose, false) repos, err := actionRepos(ctx, action.ScopeQuery, *includeUnsupportedFlag, logger) if err != nil { return err diff --git a/cmd/src/patch_sets_create_from_patches.go b/cmd/src/patch_sets_create_from_patches.go index b522b6dd90..86113f6da3 100644 --- a/cmd/src/patch_sets_create_from_patches.go +++ b/cmd/src/patch_sets_create_from_patches.go @@ -10,6 +10,7 @@ import ( "github.com/mattn/go-isatty" "github.com/pkg/errors" + "github.com/sourcegraph/src-cli/internal/campaigns" ) func init() { @@ -62,7 +63,7 @@ Examples: log.Println("# Waiting for JSON patches input on stdin...") } - var patches []PatchInput + var patches []campaigns.PatchInput if err := json.NewDecoder(os.Stdin).Decode(&patches); err != nil { return errors.Wrap(err, "invalid JSON patches input") } @@ -88,7 +89,7 @@ mutation CreatePatchSetFromPatches($patches: [PatchInput!]!) { func createPatchSetFromPatches( apiFlags *apiFlags, - patches []PatchInput, + patches []campaigns.PatchInput, tmpl *template.Template, numChangesets int, ) error { @@ -112,9 +113,9 @@ func createPatchSetFromPatches( // (e.g. "refs/heads/master") in `BaseRevision`, so we need to copy the // value over. if !supportsBaseRef { - patchesWithoutBaseRef := make([]PatchInput, len(patches)) + patchesWithoutBaseRef := make([]campaigns.PatchInput, len(patches)) for i, p := range patches { - patchesWithoutBaseRef[i] = PatchInput{ + patchesWithoutBaseRef[i] = campaigns.PatchInput{ Repository: p.Repository, BaseRevision: p.BaseRef, BaseRef: "IGNORE-THIS", diff --git a/go.sum b/go.sum index 4a6d6cf90e..143c6b604b 100644 --- a/go.sum +++ b/go.sum @@ -37,6 +37,7 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/segmentio/textio v1.2.0 h1:Ug4IkV3kh72juJbG8azoSBlgebIbUUxVNrfFcKHfTSQ= github.com/segmentio/textio v1.2.0/go.mod h1:+Rb7v0YVODP+tK5F7FD9TCkV7gOYx9IgLHWiqtvY8ag= +github.com/shurcooL/go v0.0.0-20200502201357-93f07166e636 h1:aSISeOcal5irEhJd1M+IrApc0PdcN7e7Aj4yuEnOrfQ= github.com/sourcegraph/jsonx v0.0.0-20190114210550-ba8cb36a8614 h1:MrlKMpoGse4bCneDoK/c+ZbPGqOP5Hme5ulatc8smbQ= github.com/sourcegraph/jsonx v0.0.0-20190114210550-ba8cb36a8614/go.mod h1:7jkSQ2sdxwXMaIDxKJotTt+hwKnT9b/wbJFU7/ObUEY= github.com/ssor/bom v0.0.0-20170718123548-6386211fdfcf h1:pvbZ0lM0XWPBqUKqFU8cmavspvIl9nulOYwdy6IFRRo= diff --git a/internal/campaigns/action.go b/internal/campaigns/action.go new file mode 100644 index 0000000000..c7304fb53e --- /dev/null +++ b/internal/campaigns/action.go @@ -0,0 +1,154 @@ +package campaigns + +import ( + "bytes" + "context" + "fmt" + "os/exec" + "strings" + + "github.com/hashicorp/go-multierror" + "github.com/pkg/errors" + "github.com/sourcegraph/jsonx" + "github.com/sourcegraph/src-cli/schema" + "github.com/xeipuuv/gojsonschema" +) + +type Action struct { + ScopeQuery string `json:"scopeQuery,omitempty"` + Steps []*ActionStep `json:"steps"` +} + +type ActionStep struct { + Type string `json:"type"` // "command" + Image string `json:"image,omitempty"` // Docker image + CacheDirs []string `json:"cacheDirs,omitempty"` + Args []string `json:"args,omitempty"` + + // ImageContentDigest is an internal field that should not be set by users. + ImageContentDigest string +} + +type PatchInput struct { + Repository string `json:"repository"` + BaseRevision string `json:"baseRevision"` + BaseRef string `json:"baseRef"` + Patch string `json:"patch"` +} + +type ActionRepo struct { + ID string + Name string + Rev string + BaseRef string +} + +func ValidateActionDefinition(def []byte) error { + sl := gojsonschema.NewSchemaLoader() + sc, err := sl.Compile(gojsonschema.NewStringLoader(schema.ActionSchemaJSON)) + if err != nil { + return errors.Wrapf(err, "failed to compile actions schema") + } + + normalized, err := jsonxToJSON(string(def)) + if err != nil { + return err + } + + res, err := sc.Validate(gojsonschema.NewBytesLoader(normalized)) + if err != nil { + return errors.Wrap(err, "failed to validate config against schema") + } + + errs := &multierror.Error{ErrorFormat: formatValidationErrs} + for _, err := range res.Errors() { + e := err.String() + // Remove `(root): ` from error formatting since these errors are + // presented to users. + e = strings.TrimPrefix(e, "(root): ") + errs = multierror.Append(errs, errors.New(e)) + } + + return errs.ErrorOrNil() +} + +func formatValidationErrs(es []error) string { + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("- %s", err) + } + + return fmt.Sprintf( + "Validating action definition failed:\n%s\n", + strings.Join(points, "\n")) +} + +func PrepareAction(ctx context.Context, action Action, logger *ActionLogger) error { + // Build any Docker images. + for _, step := range action.Steps { + if step.Type == "docker" { + // Set digests for Docker images so we don't cache action runs in 2 different images with + // the same tag. + var err error + step.ImageContentDigest, err = getDockerImageContentDigest(ctx, step.Image, logger) + if err != nil { + return errors.Wrap(err, "Failed to get Docker image content digest") + } + } + } + + return nil +} + +// getDockerImageContentDigest gets the content digest for the image. Note that this +// is different from the "distribution digest" (which is what you can use to specify +// an image to `docker run`, as in `my/image@sha256:xxx`). We need to use the +// content digest because the distribution digest is only computed for images that +// have been pulled from or pushed to a registry. See +// https://windsock.io/explaining-docker-image-ids/ under "A Final Twist" for a good +// explanation. +func getDockerImageContentDigest(ctx context.Context, image string, logger *ActionLogger) (string, error) { + // TODO!(sqs): is image id the right thing to use here? it is NOT the + // digest. but the digest is not calculated for all images (unless they are + // pulled/pushed from/to a registry), see + // https://github.com/moby/moby/issues/32016. + out, err := exec.CommandContext(ctx, "docker", "image", "inspect", "--format", "{{.Id}}", "--", image).CombinedOutput() + if err != nil { + if !strings.Contains(string(out), "No such image") { + return "", fmt.Errorf("error inspecting docker image %q: %s", image, bytes.TrimSpace(out)) + } + logger.Infof("Pulling Docker image %q...\n", image) + pullCmd := exec.CommandContext(ctx, "docker", "image", "pull", image) + prefix := fmt.Sprintf("docker image pull %s", image) + pullCmd.Stdout = logger.InfoPipe(prefix) + pullCmd.Stderr = logger.ErrorPipe(prefix) + + err = pullCmd.Start() + if err != nil { + return "", fmt.Errorf("error pulling docker image %q: %s", image, err) + } + err = pullCmd.Wait() + if err != nil { + return "", fmt.Errorf("error pulling docker image %q: %s", image, err) + } + } + out, err = exec.CommandContext(ctx, "docker", "image", "inspect", "--format", "{{.Id}}", "--", image).CombinedOutput() + // This time, the image MUST be present, so the issue must be something else. + if err != nil { + return "", fmt.Errorf("error inspecting docker image %q: %s", image, bytes.TrimSpace(out)) + } + id := string(bytes.TrimSpace(out)) + if id == "" { + return "", fmt.Errorf("unexpected empty docker image content ID for %q", image) + } + return id, nil +} + +// jsonxToJSON converts jsonx to plain JSON. +func jsonxToJSON(text string) ([]byte, error) { + data, errs := jsonx.Parse(text, jsonx.ParseOptions{Comments: true, TrailingCommas: true}) + if len(errs) > 0 { + return data, fmt.Errorf("failed to parse JSON: %v", errs) + } + return data, nil +} diff --git a/cmd/src/actions_cache.go b/internal/campaigns/execution_cache.go similarity index 53% rename from cmd/src/actions_cache.go rename to internal/campaigns/execution_cache.go index 9666954127..4261b4a944 100644 --- a/cmd/src/actions_cache.go +++ b/internal/campaigns/execution_cache.go @@ -1,4 +1,4 @@ -package main +package campaigns import ( "context" @@ -12,22 +12,30 @@ import ( "github.com/pkg/errors" ) -type actionExecutionCacheKey struct { +func UserCacheDir() (string, error) { + userCacheDir, err := os.UserCacheDir() + if err != nil { + return "", err + } + return filepath.Join(userCacheDir, "sourcegraph-src"), nil +} + +type ExecutionCacheKey struct { Repo ActionRepo Runs []*ActionStep } -type actionExecutionCache interface { - get(ctx context.Context, key actionExecutionCacheKey) (result PatchInput, ok bool, err error) - set(ctx context.Context, key actionExecutionCacheKey, result PatchInput) error - clear(ctx context.Context, key actionExecutionCacheKey) error +type ExecutionCache interface { + Get(ctx context.Context, key ExecutionCacheKey) (result PatchInput, ok bool, err error) + Set(ctx context.Context, key ExecutionCacheKey, result PatchInput) error + Clear(ctx context.Context, key ExecutionCacheKey) error } -type actionExecutionDiskCache struct { - dir string +type ExecutionDiskCache struct { + Dir string } -func (c actionExecutionDiskCache) cacheFilePath(key actionExecutionCacheKey) (string, error) { +func (c ExecutionDiskCache) cacheFilePath(key ExecutionCacheKey) (string, error) { keyJSON, err := json.Marshal(key) if err != nil { return "", errors.Wrap(err, "Failed to marshal JSON when generating action cache key") @@ -36,10 +44,10 @@ func (c actionExecutionDiskCache) cacheFilePath(key actionExecutionCacheKey) (st b := sha256.Sum256(keyJSON) keyString := base64.RawURLEncoding.EncodeToString(b[:16]) - return filepath.Join(c.dir, keyString+".json"), nil + return filepath.Join(c.Dir, keyString+".json"), nil } -func (c actionExecutionDiskCache) get(ctx context.Context, key actionExecutionCacheKey) (PatchInput, bool, error) { +func (c ExecutionDiskCache) Get(ctx context.Context, key ExecutionCacheKey) (PatchInput, bool, error) { path, err := c.cacheFilePath(key) if err != nil { return PatchInput{}, false, err @@ -65,7 +73,7 @@ func (c actionExecutionDiskCache) get(ctx context.Context, key actionExecutionCa return result, true, nil } -func (c actionExecutionDiskCache) set(ctx context.Context, key actionExecutionCacheKey, result PatchInput) error { +func (c ExecutionDiskCache) Set(ctx context.Context, key ExecutionCacheKey, result PatchInput) error { data, err := json.Marshal(result) if err != nil { return err @@ -83,7 +91,7 @@ func (c actionExecutionDiskCache) set(ctx context.Context, key actionExecutionCa return ioutil.WriteFile(path, data, 0600) } -func (c actionExecutionDiskCache) clear(ctx context.Context, key actionExecutionCacheKey) error { +func (c ExecutionDiskCache) Clear(ctx context.Context, key ExecutionCacheKey) error { path, err := c.cacheFilePath(key) if err != nil { return err @@ -96,18 +104,18 @@ func (c actionExecutionDiskCache) clear(ctx context.Context, key actionExecution return os.Remove(path) } -// actionExecutionNoOpCache is an implementation of actionExecutionCache that does not store or +// ExecutionNoOpCache is an implementation of actionExecutionCache that does not store or // retrieve cache entries. -type actionExecutionNoOpCache struct{} +type ExecutionNoOpCache struct{} -func (actionExecutionNoOpCache) get(ctx context.Context, key actionExecutionCacheKey) (result PatchInput, ok bool, err error) { +func (ExecutionNoOpCache) Get(ctx context.Context, key ExecutionCacheKey) (result PatchInput, ok bool, err error) { return PatchInput{}, false, nil } -func (actionExecutionNoOpCache) set(ctx context.Context, key actionExecutionCacheKey, result PatchInput) error { +func (ExecutionNoOpCache) Set(ctx context.Context, key ExecutionCacheKey, result PatchInput) error { return nil } -func (actionExecutionNoOpCache) clear(ctx context.Context, key actionExecutionCacheKey) error { +func (ExecutionNoOpCache) Clear(ctx context.Context, key ExecutionCacheKey) error { return nil } diff --git a/internal/campaigns/executor.go b/internal/campaigns/executor.go new file mode 100644 index 0000000000..1792b69bb8 --- /dev/null +++ b/internal/campaigns/executor.go @@ -0,0 +1,222 @@ +package campaigns + +import ( + "context" + "fmt" + "os/exec" + "strings" + "sync" + "time" + + "github.com/neelance/parallel" + "github.com/pkg/errors" +) + +type ActionRepoStatus struct { + Cached bool + + LogFile string + EnqueuedAt time.Time + StartedAt time.Time + FinishedAt time.Time + + Patch PatchInput + Err error +} + +type ExecutorOpts struct { + Endpoint string + AccessToken string + + KeepLogs bool + Timeout time.Duration + + ClearCache bool + Cache ExecutionCache +} + +type Executor struct { + action Action + opt ExecutorOpts + + reposMu sync.Mutex + repos map[ActionRepo]ActionRepoStatus + + par *parallel.Run + doneEnqueuing chan struct{} + + logger *ActionLogger +} + +func NewExecutor(action Action, parallelism int, logger *ActionLogger, opt ExecutorOpts) *Executor { + if opt.Cache == nil { + opt.Cache = ExecutionNoOpCache{} + } + + return &Executor{ + action: action, + opt: opt, + repos: map[ActionRepo]ActionRepoStatus{}, + par: parallel.NewRun(parallelism), + logger: logger, + + doneEnqueuing: make(chan struct{}), + } +} + +func (x *Executor) EnqueueRepo(repo ActionRepo) { + x.updateRepoStatus(repo, ActionRepoStatus{EnqueuedAt: time.Now()}) +} + +func (x *Executor) updateRepoStatus(repo ActionRepo, status ActionRepoStatus) { + x.reposMu.Lock() + defer x.reposMu.Unlock() + + // Perform delta update. + prev := x.repos[repo] + if status.LogFile == "" { + status.LogFile = prev.LogFile + } + if status.EnqueuedAt.IsZero() { + status.EnqueuedAt = prev.EnqueuedAt + } + if status.StartedAt.IsZero() { + status.StartedAt = prev.StartedAt + } + if status.FinishedAt.IsZero() { + status.FinishedAt = prev.FinishedAt + } + if status.Patch == (PatchInput{}) { + status.Patch = prev.Patch + } + if status.Err == nil { + status.Err = prev.Err + } + + x.repos[repo] = status +} + +func (x *Executor) AllPatches() []PatchInput { + patches := make([]PatchInput, 0, len(x.repos)) + x.reposMu.Lock() + defer x.reposMu.Unlock() + for _, status := range x.repos { + if patch := status.Patch; patch != (PatchInput{}) && status.Err == nil { + patches = append(patches, status.Patch) + } + } + return patches +} + +func (x *Executor) Start(ctx context.Context) { + x.reposMu.Lock() + allRepos := make([]ActionRepo, 0, len(x.repos)) + for repo := range x.repos { + allRepos = append(allRepos, repo) + } + x.reposMu.Unlock() + + for _, repo := range allRepos { + x.par.Acquire() + go func(repo ActionRepo) { + defer x.par.Release() + err := x.do(ctx, repo) + if err != nil { + x.par.Error(err) + } + }(repo) + } + + close(x.doneEnqueuing) +} + +func (x *Executor) Wait() error { + <-x.doneEnqueuing + return x.par.Wait() +} + +func (x *Executor) do(ctx context.Context, repo ActionRepo) (err error) { + // Check if cached. + cacheKey := ExecutionCacheKey{Repo: repo, Runs: x.action.Steps} + if x.opt.ClearCache { + if err := x.opt.Cache.Clear(ctx, cacheKey); err != nil { + return errors.Wrapf(err, "clearing cache for %s", repo.Name) + } + } else { + if result, ok, err := x.opt.Cache.Get(ctx, cacheKey); err != nil { + return errors.Wrapf(err, "checking cache for %s", repo.Name) + } else if ok { + status := ActionRepoStatus{Cached: true, Patch: result} + x.updateRepoStatus(repo, status) + x.logger.RepoCacheHit(repo, len(x.action.Steps), status.Patch != PatchInput{}) + return nil + } + } + + prefix := "action-" + strings.Replace(strings.Replace(repo.Name, "/", "-", -1), "github.com-", "", -1) + + logFileName, err := x.logger.AddRepo(repo) + if err != nil { + return errors.Wrapf(err, "failed to setup logging for repo %s", repo.Name) + } + + x.updateRepoStatus(repo, ActionRepoStatus{ + LogFile: logFileName, + StartedAt: time.Now(), + }) + + runCtx, cancel := context.WithTimeout(ctx, x.opt.Timeout) + defer cancel() + + patch, err := runAction(runCtx, x.opt.Endpoint, x.opt.AccessToken, prefix, repo.Name, repo.Rev, x.action.Steps, x.logger) + status := ActionRepoStatus{ + FinishedAt: time.Now(), + } + if len(patch) > 0 { + status.Patch = PatchInput{ + Repository: repo.ID, + BaseRevision: repo.Rev, + BaseRef: repo.BaseRef, + Patch: string(patch), + } + } + if err != nil { + if reachedTimeout(runCtx, err) { + err = &errTimeoutReached{timeout: x.opt.Timeout} + } + status.Err = err + } + + x.updateRepoStatus(repo, status) + lerr := x.logger.RepoFinished(repo.Name, len(patch) > 0, err) + if lerr != nil { + return lerr + } + + // Add to cache if successful. + if err == nil { + // We don't use runCtx here because we want to write to the cache even + // if we've now reached the timeout + if err := x.opt.Cache.Set(ctx, cacheKey, status.Patch); err != nil { + return errors.Wrapf(err, "caching result for %s", repo.Name) + } + } + + return err +} + +type errTimeoutReached struct{ timeout time.Duration } + +func (e *errTimeoutReached) Error() string { + return fmt.Sprintf("Timeout reached. Execution took longer than %s.", e.timeout) +} + +func reachedTimeout(cmdCtx context.Context, err error) bool { + if ee, ok := errors.Cause(err).(*exec.ExitError); ok { + if ee.String() == "signal: killed" && cmdCtx.Err() == context.DeadlineExceeded { + return true + } + } + + return errors.Is(err, context.DeadlineExceeded) +} diff --git a/cmd/src/actions_exec_logger.go b/internal/campaigns/logger.go similarity index 85% rename from cmd/src/actions_exec_logger.go rename to internal/campaigns/logger.go index 2925ee7c54..659b499dc6 100644 --- a/cmd/src/actions_exec_logger.go +++ b/internal/campaigns/logger.go @@ -1,4 +1,4 @@ -package main +package campaigns import ( "bytes" @@ -27,7 +27,7 @@ var ( grey = color.New(color.FgHiBlack) ) -type actionLogger struct { +type ActionLogger struct { verbose bool keepLogs bool @@ -41,7 +41,7 @@ type actionLogger struct { logWriters map[string]io.Writer } -func newActionLogger(verbose, keepLogs bool) *actionLogger { +func NewActionLogger(verbose, keepLogs bool) *ActionLogger { useColor := isatty.IsTerminal(os.Stderr.Fd()) || isatty.IsCygwinTerminal(os.Stderr.Fd()) if useColor { color.NoColor = false @@ -49,7 +49,7 @@ func newActionLogger(verbose, keepLogs bool) *actionLogger { progress := new(progress) - return &actionLogger{ + return &ActionLogger{ verbose: verbose, keepLogs: keepLogs, highlight: color.New(color.Bold, color.FgGreen).SprintFunc(), @@ -63,23 +63,23 @@ func newActionLogger(verbose, keepLogs bool) *actionLogger { } } -func (a *actionLogger) Start(totalSteps int) { +func (a *ActionLogger) Start(totalSteps int) { a.progress.SetTotalSteps(int64(totalSteps)) } -func (a *actionLogger) Infof(format string, args ...interface{}) { +func (a *ActionLogger) Infof(format string, args ...interface{}) { if a.verbose { a.log("", grey, format, args...) } } -func (a *actionLogger) Warnf(format string, args ...interface{}) { +func (a *ActionLogger) Warnf(format string, args ...interface{}) { if a.verbose { a.log("", yellow, "WARNING: "+format, args...) } } -func (a *actionLogger) ActionFailed(err error, patches []PatchInput) { +func (a *ActionLogger) ActionFailed(err error, patches []PatchInput) { a.out.Close() fmt.Fprintln(os.Stderr) if perr, ok := err.(parallel.Errors); ok { @@ -103,14 +103,14 @@ func (a *actionLogger) ActionFailed(err error, patches []PatchInput) { } } -func (a *actionLogger) ActionSuccess(patches []PatchInput) { +func (a *ActionLogger) ActionSuccess(patches []PatchInput) { a.out.Close() fmt.Fprintln(os.Stderr) format := "✔ Action produced %d patches." hiGreen.Fprintf(os.Stderr, format, len(patches)) } -func (a *actionLogger) RepoCacheHit(repo ActionRepo, stepCount int, patchProduced bool) { +func (a *ActionLogger) RepoCacheHit(repo ActionRepo, stepCount int, patchProduced bool) { a.progress.IncStepsComplete(int64(stepCount)) if patchProduced { a.progress.IncPatchCount() @@ -120,7 +120,7 @@ func (a *actionLogger) RepoCacheHit(repo ActionRepo, stepCount int, patchProduce a.log(repo.Name, grey, "Cached result found: no diff produced for this repository.\n") } -func (a *actionLogger) AddRepo(repo ActionRepo) (string, error) { +func (a *ActionLogger) AddRepo(repo ActionRepo) (string, error) { prefix := "action-" + strings.Replace(strings.Replace(repo.Name, "/", "-", -1), "github.com-", "", -1) logFile, err := ioutil.TempFile(tempDirPrefix, prefix+"-log") @@ -138,26 +138,26 @@ func (a *actionLogger) AddRepo(repo ActionRepo) (string, error) { return logFile.Name(), nil } -func (a *actionLogger) RepoWriter(repoName string) (io.Writer, bool) { +func (a *ActionLogger) RepoWriter(repoName string) (io.Writer, bool) { a.mu.Lock() defer a.mu.Unlock() w, ok := a.logWriters[repoName] return w, ok } -func (a *actionLogger) InfoPipe(prefix string) io.Writer { +func (a *ActionLogger) InfoPipe(prefix string) io.Writer { stdoutPrefix := fmt.Sprintf("%s -> [STDOUT]: ", yellow.Sprint(prefix)) stderr := textio.NewPrefixWriter(os.Stderr, stdoutPrefix) return io.Writer(stderr) } -func (a *actionLogger) ErrorPipe(prefix string) io.Writer { +func (a *ActionLogger) ErrorPipe(prefix string) io.Writer { stderrPrefix := fmt.Sprintf("%s -> [STDERR]: ", yellow.Sprint(prefix)) stderr := textio.NewPrefixWriter(os.Stderr, stderrPrefix) return io.Writer(stderr) } -func (a *actionLogger) RepoStdoutStderr(repoName string) (io.Writer, io.Writer, bool) { +func (a *ActionLogger) RepoStdoutStderr(repoName string) (io.Writer, io.Writer, bool) { a.mu.Lock() defer a.mu.Unlock() w, ok := a.logWriters[repoName] @@ -171,7 +171,7 @@ func (a *actionLogger) RepoStdoutStderr(repoName string) (io.Writer, io.Writer, return io.MultiWriter(stdout, w), io.MultiWriter(stderr, w), ok } -func (a *actionLogger) RepoFinished(repoName string, patchProduced bool, actionErr error) error { +func (a *ActionLogger) RepoFinished(repoName string, patchProduced bool, actionErr error) error { a.mu.Lock() f, ok := a.logFiles[repoName] if !ok { @@ -208,42 +208,42 @@ func (a *actionLogger) RepoFinished(repoName string, patchProduced bool, actionE return nil } -func (a *actionLogger) RepoStarted(repoName, rev string, steps []*ActionStep) { +func (a *ActionLogger) RepoStarted(repoName, rev string, steps []*ActionStep) { a.write(repoName, yellow, "Starting action @ %s (%d steps)\n", rev, len(steps)) } -func (a *actionLogger) CommandStepStarted(repoName string, step int, args []string) { +func (a *ActionLogger) CommandStepStarted(repoName string, step int, args []string) { a.write(repoName, yellow, "%s command %v\n", boldBlack.Sprintf("[Step %d]", step), args) } -func (a *actionLogger) CommandStepErrored(repoName string, step int, err error) { +func (a *ActionLogger) CommandStepErrored(repoName string, step int, err error) { a.progress.IncStepsComplete(1) a.progress.IncStepsFailed() a.write(repoName, boldRed, "%s %s.\n", boldBlack.Sprintf("[Step %d]", step), err) } -func (a *actionLogger) CommandStepDone(repoName string, step int) { +func (a *ActionLogger) CommandStepDone(repoName string, step int) { a.progress.IncStepsComplete(1) a.write(repoName, yellow, "%s Done.\n", boldBlack.Sprintf("[Step %d]", step)) } -func (a *actionLogger) DockerStepStarted(repoName string, step int, image string) { +func (a *ActionLogger) DockerStepStarted(repoName string, step int, image string) { a.write(repoName, yellow, "%s docker run %s\n", boldBlack.Sprintf("[Step %d]", step), image) } -func (a *actionLogger) DockerStepErrored(repoName string, step int, err error, elapsed time.Duration) { +func (a *ActionLogger) DockerStepErrored(repoName string, step int, err error, elapsed time.Duration) { a.progress.IncStepsComplete(1) a.progress.IncStepsFailed() a.write(repoName, boldRed, "%s %s. (%s)\n", boldBlack.Sprintf("[Step %d]", step), err, elapsed) } -func (a *actionLogger) DockerStepDone(repoName string, step int, elapsed time.Duration) { +func (a *ActionLogger) DockerStepDone(repoName string, step int, elapsed time.Duration) { a.progress.IncStepsComplete(1) a.write(repoName, yellow, "%s Done. (%s)\n", boldBlack.Sprintf("[Step %d]", step), elapsed) } // write writes to the RepoWriter associated with the given repoName and logs the message using the log method. -func (a *actionLogger) write(repoName string, c *color.Color, format string, args ...interface{}) { +func (a *ActionLogger) write(repoName string, c *color.Color, format string, args ...interface{}) { if w, ok := a.RepoWriter(repoName); ok { fmt.Fprintf(w, format, args...) } @@ -251,7 +251,7 @@ func (a *actionLogger) write(repoName string, c *color.Color, format string, arg } // log logs only to stderr, it does not log to our repoWriters. -func (a *actionLogger) log(repoName string, c *color.Color, format string, args ...interface{}) { +func (a *ActionLogger) log(repoName string, c *color.Color, format string, args ...interface{}) { if len(repoName) > 0 { format = fmt.Sprintf("%s -> %s", c.Sprint(repoName), format) } diff --git a/cmd/src/actions_exec_backend_runner.go b/internal/campaigns/run_action.go similarity index 69% rename from cmd/src/actions_exec_backend_runner.go rename to internal/campaigns/run_action.go index 38dc036c4b..27ba461c50 100644 --- a/cmd/src/actions_exec_backend_runner.go +++ b/internal/campaigns/run_action.go @@ -1,4 +1,4 @@ -package main +package campaigns import ( "archive/zip" @@ -21,108 +21,10 @@ import ( "golang.org/x/net/context/ctxhttp" ) -type ActionRepoStatus struct { - Cached bool - - LogFile string - EnqueuedAt time.Time - StartedAt time.Time - FinishedAt time.Time - - Patch PatchInput - Err error -} - -func (x *actionExecutor) do(ctx context.Context, repo ActionRepo) (err error) { - // Check if cached. - cacheKey := actionExecutionCacheKey{Repo: repo, Runs: x.action.Steps} - if x.opt.clearCache { - if err := x.opt.cache.clear(ctx, cacheKey); err != nil { - return errors.Wrapf(err, "clearing cache for %s", repo.Name) - } - } else { - if result, ok, err := x.opt.cache.get(ctx, cacheKey); err != nil { - return errors.Wrapf(err, "checking cache for %s", repo.Name) - } else if ok { - status := ActionRepoStatus{Cached: true, Patch: result} - x.updateRepoStatus(repo, status) - x.logger.RepoCacheHit(repo, len(x.action.Steps), status.Patch != PatchInput{}) - return nil - } - } - - prefix := "action-" + strings.Replace(strings.Replace(repo.Name, "/", "-", -1), "github.com-", "", -1) - - logFileName, err := x.logger.AddRepo(repo) - if err != nil { - return errors.Wrapf(err, "failed to setup logging for repo %s", repo.Name) - } - - x.updateRepoStatus(repo, ActionRepoStatus{ - LogFile: logFileName, - StartedAt: time.Now(), - }) - - runCtx, cancel := context.WithTimeout(ctx, x.opt.timeout) - defer cancel() - - patch, err := runAction(runCtx, prefix, repo.Name, repo.Rev, x.action.Steps, x.logger) - status := ActionRepoStatus{ - FinishedAt: time.Now(), - } - if len(patch) > 0 { - status.Patch = PatchInput{ - Repository: repo.ID, - BaseRevision: repo.Rev, - BaseRef: repo.BaseRef, - Patch: string(patch), - } - } - if err != nil { - if reachedTimeout(runCtx, err) { - err = &errTimeoutReached{timeout: x.opt.timeout} - } - status.Err = err - } - - x.updateRepoStatus(repo, status) - lerr := x.logger.RepoFinished(repo.Name, len(patch) > 0, err) - if lerr != nil { - return lerr - } - - // Add to cache if successful. - if err == nil { - // We don't use runCtx here because we want to write to the cache even - // if we've now reached the timeout - if err := x.opt.cache.set(ctx, cacheKey, status.Patch); err != nil { - return errors.Wrapf(err, "caching result for %s", repo.Name) - } - } - - return err -} - -type errTimeoutReached struct{ timeout time.Duration } - -func (e *errTimeoutReached) Error() string { - return fmt.Sprintf("Timeout reached. Execution took longer than %s.", e.timeout) -} - -func reachedTimeout(cmdCtx context.Context, err error) bool { - if ee, ok := errors.Cause(err).(*exec.ExitError); ok { - if ee.String() == "signal: killed" && cmdCtx.Err() == context.DeadlineExceeded { - return true - } - } - - return errors.Is(err, context.DeadlineExceeded) -} - -func runAction(ctx context.Context, prefix, repoName, rev string, steps []*ActionStep, logger *actionLogger) ([]byte, error) { +func runAction(ctx context.Context, endpoint, accessToken, prefix, repoName, rev string, steps []*ActionStep, logger *ActionLogger) ([]byte, error) { logger.RepoStarted(repoName, rev, steps) - zipFile, err := fetchRepositoryArchive(ctx, repoName, rev) + zipFile, err := fetchRepositoryArchive(ctx, endpoint, accessToken, repoName, rev) if err != nil { return nil, errors.Wrap(err, "Fetching ZIP archive failed") } @@ -203,7 +105,7 @@ func runAction(ctx context.Context, prefix, repoName, rev string, steps []*Actio // persistentCacheDir returns a host directory that persists across runs of this // action for this repository. It is useful for (e.g.) yarn and npm caches. persistentCacheDir := func(containerDir string) (string, error) { - baseCacheDir, err := userCacheDir() + baseCacheDir, err := UserCacheDir() if err != nil { return "", err } @@ -277,8 +179,8 @@ func unzipToTempDir(ctx context.Context, zipFile, prefix string) (string, error) return volumeDir, unzip(zipFile, volumeDir) } -func fetchRepositoryArchive(ctx context.Context, repoName, rev string) (*os.File, error) { - zipURL, err := repositoryZipArchiveURL(repoName, rev, "") +func fetchRepositoryArchive(ctx context.Context, endpoint, accessToken, repoName, rev string) (*os.File, error) { + zipURL, err := repositoryZipArchiveURL(endpoint, repoName, rev, "") if err != nil { return nil, err } @@ -288,8 +190,8 @@ func fetchRepositoryArchive(ctx context.Context, repoName, rev string) (*os.File return nil, err } req.Header.Set("Accept", "application/zip") - if cfg.AccessToken != "" { - req.Header.Set("Authorization", "token "+cfg.AccessToken) + if accessToken != "" { + req.Header.Set("Authorization", "token "+accessToken) } resp, err := ctxhttp.Do(ctx, nil, req) if err != nil { @@ -313,8 +215,8 @@ func fetchRepositoryArchive(ctx context.Context, repoName, rev string) (*os.File return f, nil } -func repositoryZipArchiveURL(repoName, rev, token string) (*url.URL, error) { - u, err := url.Parse(cfg.Endpoint) +func repositoryZipArchiveURL(endpoint, repoName, rev, token string) (*url.URL, error) { + u, err := url.Parse(endpoint) if err != nil { return nil, err }