diff --git a/.github/workflows/daily_tests.yml b/.github/workflows/daily_tests.yml index 17bfd27c0c..a414e697ba 100644 --- a/.github/workflows/daily_tests.yml +++ b/.github/workflows/daily_tests.yml @@ -48,12 +48,6 @@ jobs: # shell: bash # if: matrix.os == 'windows-latest' - - uses: actions/cache@v1 - with: - path: ~/go/pkg/mod - key: go-${{ hashFiles('**/go.sum') }} - restore-keys: go- - - name: Compile tests binaries run: | # unit tests binaries diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index e755957762..7a49449571 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -39,12 +39,6 @@ jobs: - name: Checkout code uses: actions/checkout@v2 - - uses: actions/cache@v1 - with: - path: ~/go/pkg/mod - key: go-${{ hashFiles('**/go.sum') }} - restore-keys: go- - - name: Compile tests binaries run: | # unit tests binaries diff --git a/README.md b/README.md index deb15951f4..03e333016f 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ werf is not a complete CI/CD solution, but a tool for creating pipelines that ca - Developing applications locally with werf [#1940](https://github.com/werf/werf/issues/1940). - ~Content-based tagging~ [#1184](https://github.com/werf/werf/issues/1184). - ~Support for the most Docker registry implementations~ [#2199](https://github.com/werf/werf/issues/2199). -- Parallel image builds [#2200](https://github.com/werf/werf/issues/2200). +- ~Parallel image builds~ [#2200](https://github.com/werf/werf/issues/2200). - Proven approaches and recipes for the most popular CI systems [#1617](https://github.com/werf/werf/issues/1617). - ~Distributed builds with the shared Docker registry~ [#1614](https://github.com/werf/werf/issues/1614). - Support for Helm 3 [#1606](https://github.com/werf/werf/issues/1606). @@ -73,7 +73,7 @@ werf is not a complete CI/CD solution, but a tool for creating pipelines that ca - Effortlessly build as many images as you like in one project. - Build images using Dockerfiles or Stapel builder instructions. - Build images concurrently on a single host (using file locks). -- Build images simultaneously (coming soon) [#2200](https://github.com/werf/werf/issues/2200). +- Build images simultaneously. - Build images distributedly. - Advanced building process with Stapel: - Incremental rebuilds based on git history. diff --git a/README_ru.md b/README_ru.md index 4d67ef5b26..4d2e73b702 100644 --- a/README_ru.md +++ b/README_ru.md @@ -63,7 +63,7 @@ werf — не CI/CD-система, а инструмент для постро - Локальная разработка приложений с werf [#1940](https://github.com/werf/werf/issues/1940). - ~Тегирование, основанное на контенте~ [#1184](https://github.com/werf/werf/issues/1184). - ~Поддержка большинства имлементаций Docker registry~ [#2199](https://github.com/werf/werf/issues/2199). -- Параллельная сборка образов [#2200](https://github.com/werf/werf/issues/2200). +- ~Параллельная сборка образов~ [#2200](https://github.com/werf/werf/issues/2200). - Лучшие практики и рецепты для наиболее популярных CI-систем [#1617](https://github.com/werf/werf/issues/1617). - ~Распределенная сборка с общим Docker registry~ [#1614](https://github.com/werf/werf/issues/1614). - Поддержка Helm 3 [#1606](https://github.com/werf/werf/issues/1606). @@ -76,8 +76,8 @@ werf — не CI/CD-система, а инструмент для постро - Удобная сборка произвольного числа образов в одном проекте. - Сборка образов как из Dockerfile, так и из инструкций сборщика Stapel. - Параллельные сборки на одном хосте (с использованием файловых блокировок). -- Распределенная сборка (скоро) [#1614](https://github.com/werf/werf/issues/1614). -- Параллельная сборка образов (скоро) [#2200](https://github.com/werf/werf/issues/2200). +- Распределенная сборка. +- Параллельная сборка описанных в werf.yaml образов. - Расширенная сборка со сборщиком Stapel: - Инкрементальная пересборка на основе истории изменений Git. - Сборка образов с Shell-инструкциями и Ansible-заданиями. diff --git a/cmd/werf/build_and_publish/main.go b/cmd/werf/build_and_publish/main.go index b87802f8e5..c552fc0a0d 100644 --- a/cmd/werf/build_and_publish/main.go +++ b/cmd/werf/build_and_publish/main.go @@ -23,9 +23,6 @@ import ( var cmdData struct { PullUsername string PullPassword string - - IntrospectBeforeError bool - IntrospectAfterError bool } var commonCmdData common.CmdData @@ -94,6 +91,8 @@ If one or more IMAGE_NAME parameters specified, werf will build images stages an common.SetupLogOptions(&commonCmdData, cmd) common.SetupLogProjectDir(&commonCmdData, cmd) + common.SetupIntrospectAfterError(&commonCmdData, cmd) + common.SetupIntrospectBeforeError(&commonCmdData, cmd) common.SetupIntrospectStage(&commonCmdData, cmd) common.SetupSynchronization(&commonCmdData, cmd) @@ -110,9 +109,7 @@ If one or more IMAGE_NAME parameters specified, werf will build images stages an common.SetupGitUnshallow(&commonCmdData, cmd) common.SetupAllowGitShallowClone(&commonCmdData, cmd) - - cmd.Flags().BoolVarP(&cmdData.IntrospectAfterError, "introspect-error", "", false, "Introspect failed stage in the state, right after running failed assembly instruction") - cmd.Flags().BoolVarP(&cmdData.IntrospectBeforeError, "introspect-before-error", "", false, "Introspect failed stage in the clean state, before running all assembly instructions of the stage") + common.SetupParallelOptions(&commonCmdData, cmd) return cmd } @@ -211,7 +208,7 @@ func runBuildAndPublish(imagesToProcess []string) error { } }() - introspectOptions, err := common.GetIntrospectOptions(&commonCmdData, werfConfig) + buildStagesOptions, err := common.GetBuildStagesOptions(&commonCmdData, werfConfig) if err != nil { return err } @@ -221,14 +218,8 @@ func runBuildAndPublish(imagesToProcess []string) error { return err } - opts := build.BuildAndPublishOptions{ - BuildStagesOptions: build.BuildStagesOptions{ - ImageBuildOptions: container_runtime.BuildOptions{ - IntrospectAfterError: cmdData.IntrospectAfterError, - IntrospectBeforeError: cmdData.IntrospectBeforeError, - }, - IntrospectOptions: introspectOptions, - }, + buildAndPublishOptions := build.BuildAndPublishOptions{ + BuildStagesOptions: buildStagesOptions, PublishImagesOptions: build.PublishImagesOptions{ ImagesToPublish: imagesToProcess, TagOptions: tagOpts, @@ -237,13 +228,18 @@ func runBuildAndPublish(imagesToProcess []string) error { }, } + conveyorOptions, err := common.GetConveyorOptionsWithParallel(&commonCmdData, buildAndPublishOptions.BuildStagesOptions) + if err != nil { + return err + } + logboek.LogOptionalLn() - conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, imagesToProcess, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, imagesRepo, storageLockManager, common.GetConveyorOptions(&commonCmdData)) + conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, imagesToProcess, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, imagesRepo, storageLockManager, conveyorOptions) defer conveyorWithRetry.Terminate() if err := conveyorWithRetry.WithRetryBlock(ctx, func(c *build.Conveyor) error { - return c.BuildAndPublish(ctx, opts) + return c.BuildAndPublish(ctx, buildAndPublishOptions) }); err != nil { return err } diff --git a/cmd/werf/ci_env/ci_env.go b/cmd/werf/ci_env/ci_env.go index a959d7a20d..9c21d28bbd 100644 --- a/cmd/werf/ci_env/ci_env.go +++ b/cmd/werf/ci_env/ci_env.go @@ -15,6 +15,7 @@ import ( "github.com/spf13/cobra" "github.com/werf/logboek" + "github.com/werf/logboek/pkg/level" "github.com/werf/werf/cmd/werf/common" "github.com/werf/werf/pkg/docker" @@ -75,7 +76,7 @@ Currently supported only GitLab (gitlab) and GitHub (github) CI systems`, } func runCIEnv(cmd *cobra.Command, args []string) error { - logboek.Streams().Mute() + logboek.SetAcceptedLevel(level.Error) if err := werf.Init(*commonCmdData.TmpDir, *commonCmdData.HomeDir); err != nil { return fmt.Errorf("initialization error: %s", err) diff --git a/cmd/werf/common/common.go b/cmd/werf/common/common.go index 167283aa9a..84a3603e20 100644 --- a/cmd/werf/common/common.go +++ b/cmd/werf/common/common.go @@ -80,6 +80,8 @@ type CmdData struct { GitHistorySynchronization *bool GitUnshallow *bool AllowGitShallowClone *bool + Parallel *bool + ParallelTasksLimit *int64 DockerConfig *string InsecureRegistry *bool @@ -95,7 +97,9 @@ type CmdData struct { WithoutKube *bool - StagesToIntrospect *[]string + IntrospectBeforeError *bool + IntrospectAfterError *bool + StagesToIntrospect *[]string LogDebug *bool LogPretty *bool @@ -731,6 +735,21 @@ func SetupAllowGitShallowClone(cmdData *CmdData, cmd *cobra.Command) { cmd.Flags().BoolVarP(cmdData.AllowGitShallowClone, "allow-git-shallow-clone", "", GetBoolEnvironmentDefaultFalse("WERF_ALLOW_GIT_SHALLOW_CLONE"), "Sign the intention of using shallow clone despite restrictions (default $WERF_ALLOW_GIT_SHALLOW_CLONE)") } +func SetupParallelOptions(cmdData *CmdData, cmd *cobra.Command) { + SetupParallel(cmdData, cmd) + SetupParallelTasksLimit(cmdData, cmd) +} + +func SetupParallel(cmdData *CmdData, cmd *cobra.Command) { + cmdData.Parallel = new(bool) + cmd.Flags().BoolVarP(cmdData.Parallel, "parallel", "p", GetBoolEnvironmentDefaultFalse("WERF_PARALLEL"), "Run in parallel (default $WERF_PARALLEL)") +} + +func SetupParallelTasksLimit(cmdData *CmdData, cmd *cobra.Command) { + cmdData.ParallelTasksLimit = new(int64) + cmd.Flags().Int64VarP(cmdData.ParallelTasksLimit, "parallel-tasks-limit", "", -1, "Parallel tasks limit (default $WERF_PARALLEL_TASKS_LIMIT or without limit)") +} + func SetupGitUnshallow(cmdData *CmdData, cmd *cobra.Command) { cmdData.GitUnshallow = new(bool) cmd.Flags().BoolVarP(cmdData.GitUnshallow, "git-unshallow", "", GetBoolEnvironmentDefaultFalse("WERF_GIT_UNSHALLOW"), "Convert project git clone to full one (default $WERF_GIT_UNSHALLOW)") @@ -746,6 +765,16 @@ func SetupLogProjectDir(cmdData *CmdData, cmd *cobra.Command) { cmd.Flags().BoolVarP(cmdData.LogProjectDir, "log-project-dir", "", GetBoolEnvironmentDefaultFalse("WERF_LOG_PROJECT_DIR"), `Print current project directory path (default $WERF_LOG_PROJECT_DIR)`) } +func SetupIntrospectAfterError(cmdData *CmdData, cmd *cobra.Command) { + cmdData.IntrospectAfterError = new(bool) + cmd.Flags().BoolVarP(cmdData.IntrospectAfterError, "introspect-error", "", false, "Introspect failed stage in the state, right after running failed assembly instruction") +} + +func SetupIntrospectBeforeError(cmdData *CmdData, cmd *cobra.Command) { + cmdData.IntrospectBeforeError = new(bool) + cmd.Flags().BoolVarP(cmdData.IntrospectBeforeError, "introspect-before-error", "", false, "Introspect failed stage in the clean state, before running all assembly instructions of the stage") +} + func SetupIntrospectStage(cmdData *CmdData, cmd *cobra.Command) { cmdData.StagesToIntrospect = new([]string) cmd.Flags().StringArrayVarP(cmdData.StagesToIntrospect, "introspect-stage", "", []string{}, `Introspect a specific stage. The option can be used multiple times to introspect several stages. @@ -856,6 +885,21 @@ func getIntEnvVar(varName string) (*int64, error) { return nil, nil } +func GetParallelTasksLimit(cmdData *CmdData) (int64, error) { + v, err := getInt64EnvVar("WERF_PARALLEL_TASKS_LIMIT") + if err != nil { + return 0, err + } + if v == nil { + v = cmdData.ParallelTasksLimit + } + if *v <= 0 { + return -1, nil + } else { + return *v, nil + } +} + func GetGitTagStrategyLimit(cmdData *CmdData) (int64, error) { v, err := getInt64EnvVar("WERF_GIT_TAG_STRATEGY_LIMIT") if err != nil { diff --git a/cmd/werf/common/conveyor_options.go b/cmd/werf/common/conveyor_options.go index 81856a126f..88dda08d31 100644 --- a/cmd/werf/common/conveyor_options.go +++ b/cmd/werf/common/conveyor_options.go @@ -1,8 +1,12 @@ package common import ( + "fmt" + "github.com/werf/werf/pkg/build" "github.com/werf/werf/pkg/build/stage" + "github.com/werf/werf/pkg/config" + "github.com/werf/werf/pkg/container_runtime" ) func GetConveyorOptions(commonCmdData *CmdData) build.ConveyorOptions { @@ -16,3 +20,34 @@ func GetConveyorOptions(commonCmdData *CmdData) build.ConveyorOptions { AllowGitShallowClone: *commonCmdData.AllowGitShallowClone, } } + +func GetConveyorOptionsWithParallel(commonCmdData *CmdData, buildStagesOptions build.BuildStagesOptions) (build.ConveyorOptions, error) { + conveyorOptions := GetConveyorOptions(commonCmdData) + conveyorOptions.Parallel = !(buildStagesOptions.ImageBuildOptions.IntrospectAfterError || buildStagesOptions.ImageBuildOptions.IntrospectBeforeError || len(buildStagesOptions.Targets) != 0) && *commonCmdData.Parallel + + parallelTasksLimit, err := GetParallelTasksLimit(commonCmdData) + if err != nil { + return conveyorOptions, fmt.Errorf("getting parallel tasks limit failed: %s", err) + } + + conveyorOptions.ParallelTasksLimit = parallelTasksLimit + + return conveyorOptions, nil +} + +func GetBuildStagesOptions(commonCmdData *CmdData, werfConfig *config.WerfConfig) (build.BuildStagesOptions, error) { + introspectOptions, err := GetIntrospectOptions(commonCmdData, werfConfig) + if err != nil { + return build.BuildStagesOptions{}, err + } + + options := build.BuildStagesOptions{ + ImageBuildOptions: container_runtime.BuildOptions{ + IntrospectAfterError: *commonCmdData.IntrospectAfterError, + IntrospectBeforeError: *commonCmdData.IntrospectBeforeError, + }, + IntrospectOptions: introspectOptions, + } + + return options, nil +} diff --git a/cmd/werf/converge/converge.go b/cmd/werf/converge/converge.go index 9f065f468a..ec6608c202 100644 --- a/cmd/werf/converge/converge.go +++ b/cmd/werf/converge/converge.go @@ -115,6 +115,7 @@ werf converge --stages-storage registry.mydomain.com/web/back/stages --images-re common.SetupGitUnshallow(&commonCmdData, cmd) common.SetupAllowGitShallowClone(&commonCmdData, cmd) + common.SetupParallelOptions(&commonCmdData, cmd) cmd.Flags().IntVarP(&cmdData.Timeout, "timeout", "t", 0, "Resources tracking timeout in seconds") @@ -233,7 +234,7 @@ func runConverge() error { return fmt.Errorf("cannot init kubedog: %s", err) } - opts := build.BuildAndPublishOptions{ + buildAndPublishOptions := build.BuildAndPublishOptions{ BuildStagesOptions: build.BuildStagesOptions{ ImageBuildOptions: container_runtime.BuildOptions{}, }, @@ -277,11 +278,16 @@ func runConverge() error { } imagesRepository = imagesRepo.String() - conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, nil, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, imagesRepo, storageLockManager, common.GetConveyorOptions(&commonCmdData)) + conveyorOptions, err := common.GetConveyorOptionsWithParallel(&commonCmdData, buildAndPublishOptions.BuildStagesOptions) + if err != nil { + return err + } + + conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, nil, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, imagesRepo, storageLockManager, conveyorOptions) defer conveyorWithRetry.Terminate() if err := conveyorWithRetry.WithRetryBlock(ctx, func(c *build.Conveyor) error { - if err := c.BuildAndPublish(ctx, opts); err != nil { + if err := c.BuildAndPublish(ctx, buildAndPublishOptions); err != nil { return err } diff --git a/cmd/werf/helm/get_autogenerated_values/main.go b/cmd/werf/helm/get_autogenerated_values/main.go index 6229b6423b..f97ceb1ef0 100644 --- a/cmd/werf/helm/get_autogenerated_values/main.go +++ b/cmd/werf/helm/get_autogenerated_values/main.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/cobra" "github.com/werf/logboek" + "github.com/werf/logboek/pkg/level" "github.com/werf/werf/cmd/werf/common" helm_common "github.com/werf/werf/cmd/werf/helm/common" @@ -65,8 +66,7 @@ These values includes project name, docker images ids and other`), } func runGetServiceValues() error { - logboek.Streams().Mute() - defer logboek.Streams().Unmute() + logboek.SetAcceptedLevel(level.Error) ctx := common.BackgroundContext() @@ -139,7 +139,7 @@ func runGetServiceValues() error { defer func() { err := ssh_agent.Terminate() if err != nil { - logboek.Warn().LogF("WARNING: ssh agent termination failed: %s\n", err) + logboek.Error().LogF("WARNING: ssh agent termination failed: %s\n", err) } }() diff --git a/cmd/werf/helm/secret/common/common.go b/cmd/werf/helm/secret/common/common.go index 642765acbd..95338f44c6 100644 --- a/cmd/werf/helm/secret/common/common.go +++ b/cmd/werf/helm/secret/common/common.go @@ -8,6 +8,7 @@ import ( "golang.org/x/crypto/ssh/terminal" + "github.com/werf/logboek" "github.com/werf/logboek/pkg/style" "github.com/werf/werf/pkg/util" @@ -45,7 +46,7 @@ func InputFromInteractiveStdin() ([]byte, error) { isStdoutTerminal := terminal.IsTerminal(int(os.Stdout.Fd())) if isStdoutTerminal { - fmt.Printf(style.Highlight().Colorize("Enter secret: ")) + fmt.Printf(logboek.Colorize(style.Highlight(), "Enter secret: ")) } data, err = terminal.ReadPassword(int(os.Stdin.Fd())) diff --git a/cmd/werf/helm/secret/common/edit.go b/cmd/werf/helm/secret/common/edit.go index aa55b93b4d..49c7143f9c 100644 --- a/cmd/werf/helm/secret/common/edit.go +++ b/cmd/werf/helm/secret/common/edit.go @@ -148,7 +148,7 @@ func readEditedFile(m secret.Manager, filePath string, values bool) ([]byte, []b func askForConfirmation() (bool, error) { r := os.Stdin - fmt.Println(style.Highlight().Colorize("Do you want to continue editing the file (Y/n)?")) + fmt.Println(logboek.Colorize(style.Highlight(), "Do you want to continue editing the file (Y/n)?")) isTerminal := terminal.IsTerminal(int(r.Fd())) if isTerminal { diff --git a/cmd/werf/stage/image/main.go b/cmd/werf/stage/image/main.go index ded11a3015..9bf088bfb8 100644 --- a/cmd/werf/stage/image/main.go +++ b/cmd/werf/stage/image/main.go @@ -3,20 +3,19 @@ package run import ( "fmt" - "github.com/werf/werf/pkg/image" - - "github.com/werf/werf/pkg/stages_manager" - "github.com/spf13/cobra" "github.com/werf/logboek" + "github.com/werf/logboek/pkg/level" "github.com/werf/werf/cmd/werf/common" "github.com/werf/werf/pkg/build" "github.com/werf/werf/pkg/container_runtime" "github.com/werf/werf/pkg/docker" + "github.com/werf/werf/pkg/image" "github.com/werf/werf/pkg/logging" "github.com/werf/werf/pkg/ssh_agent" + "github.com/werf/werf/pkg/stages_manager" "github.com/werf/werf/pkg/tmp_manager" "github.com/werf/werf/pkg/true_git" "github.com/werf/werf/pkg/werf" @@ -34,7 +33,7 @@ func NewCmd() *cobra.Command { common.DisableOptionsInUseLineAnno: "1", }, RunE: func(cmd *cobra.Command, args []string) error { - logboek.Streams().Mute() + logboek.SetAcceptedLevel(level.Error) var imageName string if len(args) > 1 { diff --git a/cmd/werf/stages/build/cmd_factory/main.go b/cmd/werf/stages/build/cmd_factory/main.go index 1f6aaf8a07..5e475764dd 100644 --- a/cmd/werf/stages/build/cmd_factory/main.go +++ b/cmd/werf/stages/build/cmd_factory/main.go @@ -25,9 +25,6 @@ import ( type CmdData struct { PullUsername string PullPassword string - - IntrospectBeforeError bool - IntrospectAfterError bool } func NewCmdWithData(cmdData *CmdData, commonCmdData *common.CmdData) *cobra.Command { @@ -84,6 +81,8 @@ If one or more IMAGE_NAME parameters specified, werf will build only these image common.SetupInsecureRegistry(commonCmdData, cmd) common.SetupSkipTlsVerifyRegistry(commonCmdData, cmd) + common.SetupIntrospectAfterError(commonCmdData, cmd) + common.SetupIntrospectBeforeError(commonCmdData, cmd) common.SetupIntrospectStage(commonCmdData, cmd) common.SetupLogOptions(commonCmdData, cmd) @@ -100,9 +99,7 @@ If one or more IMAGE_NAME parameters specified, werf will build only these image common.SetupGitUnshallow(commonCmdData, cmd) common.SetupAllowGitShallowClone(commonCmdData, cmd) - - cmd.Flags().BoolVarP(&cmdData.IntrospectAfterError, "introspect-error", "", false, "Introspect failed stage in the state, right after running failed assembly instruction") - cmd.Flags().BoolVarP(&cmdData.IntrospectBeforeError, "introspect-before-error", "", false, "Introspect failed stage in the clean state, before running all assembly instructions of the stage") + common.SetupParallelOptions(commonCmdData, cmd) return cmd } @@ -191,26 +188,23 @@ func runStagesBuild(cmdData *CmdData, commonCmdData *common.CmdData, imagesToPro } }() - introspectOptions, err := common.GetIntrospectOptions(commonCmdData, werfConfig) + buildStagesOptions, err := common.GetBuildStagesOptions(commonCmdData, werfConfig) if err != nil { return err } - opts := build.BuildStagesOptions{ - ImageBuildOptions: container_runtime.BuildOptions{ - IntrospectAfterError: cmdData.IntrospectAfterError, - IntrospectBeforeError: cmdData.IntrospectBeforeError, - }, - IntrospectOptions: introspectOptions, + conveyorOptions, err := common.GetConveyorOptionsWithParallel(commonCmdData, buildStagesOptions) + if err != nil { + return err } logboek.LogOptionalLn() - conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, imagesToProcess, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, nil, storageLockManager, common.GetConveyorOptions(commonCmdData)) + conveyorWithRetry := build.NewConveyorWithRetryWrapper(werfConfig, imagesToProcess, projectDir, projectTmpDir, ssh_agent.SSHAuthSock, containerRuntime, stagesManager, nil, storageLockManager, conveyorOptions) defer conveyorWithRetry.Terminate() if err := conveyorWithRetry.WithRetryBlock(ctx, func(c *build.Conveyor) error { - return c.BuildStages(ctx, opts) + return c.BuildStages(ctx, buildStagesOptions) }); err != nil { return err } diff --git a/docs/_includes/cli/werf_build.md b/docs/_includes/cli/werf_build.md index 6fa5428cf8..d9bc308602 100644 --- a/docs/_includes/cli/werf_build.md +++ b/docs/_includes/cli/werf_build.md @@ -115,6 +115,10 @@ werf build [IMAGE_NAME...] [options] * interactive terminal width or 140 --log-verbose=false: Enable verbose output (default $WERF_LOG_VERBOSE). + -p, --parallel=false: + Run in parallel (default $WERF_PARALLEL) + --parallel-tasks-limit=-1: + Parallel tasks limit (default $WERF_PARALLEL_TASKS_LIMIT or without limit) --repo-docker-hub-password='': Common Docker Hub password for any stages storage or images repo specified for the command (default $WERF_REPO_DOCKER_HUB_PASSWORD) diff --git a/docs/_includes/cli/werf_build_and_publish.md b/docs/_includes/cli/werf_build_and_publish.md index 7905fb23e2..1658e2d392 100644 --- a/docs/_includes/cli/werf_build_and_publish.md +++ b/docs/_includes/cli/werf_build_and_publish.md @@ -147,6 +147,10 @@ werf build-and-publish [IMAGE_NAME...] [options] * interactive terminal width or 140 --log-verbose=false: Enable verbose output (default $WERF_LOG_VERBOSE). + -p, --parallel=false: + Run in parallel (default $WERF_PARALLEL) + --parallel-tasks-limit=-1: + Parallel tasks limit (default $WERF_PARALLEL_TASKS_LIMIT or without limit) --publish-report-format='json': Publish report format (only json available for now, $WERF_PUBLISH_REPORT_FORMAT by default) diff --git a/docs/_includes/cli/werf_converge.md b/docs/_includes/cli/werf_converge.md index 6f24eaa897..7ba4b8d041 100644 --- a/docs/_includes/cli/werf_converge.md +++ b/docs/_includes/cli/werf_converge.md @@ -149,6 +149,10 @@ werf converge --stages-storage registry.mydomain.com/web/back/stages --images-re --namespace='': Use specified Kubernetes namespace (default [[ project ]]-[[ env ]] template or deploy.namespace custom template from werf.yaml or $WERF_NAMESPACE) + -p, --parallel=false: + Run in parallel (default $WERF_PARALLEL) + --parallel-tasks-limit=-1: + Parallel tasks limit (default $WERF_PARALLEL_TASKS_LIMIT or without limit) --release='': Use specified Helm release name (default [[ project ]]-[[ env ]] template or deploy.helmRelease custom template from werf.yaml or $WERF_RELEASE) diff --git a/docs/_includes/cli/werf_stages_build.md b/docs/_includes/cli/werf_stages_build.md index 032cc2e825..df782f13c7 100644 --- a/docs/_includes/cli/werf_stages_build.md +++ b/docs/_includes/cli/werf_stages_build.md @@ -115,6 +115,10 @@ werf stages build [IMAGE_NAME...] [options] * interactive terminal width or 140 --log-verbose=false: Enable verbose output (default $WERF_LOG_VERBOSE). + -p, --parallel=false: + Run in parallel (default $WERF_PARALLEL) + --parallel-tasks-limit=-1: + Parallel tasks limit (default $WERF_PARALLEL_TASKS_LIMIT or without limit) --repo-docker-hub-password='': Common Docker Hub password for any stages storage or images repo specified for the command (default $WERF_REPO_DOCKER_HUB_PASSWORD) diff --git a/docs/_includes/readme/features.md b/docs/_includes/readme/features.md index f23d0b2e09..b9cf721af5 100644 --- a/docs/_includes/readme/features.md +++ b/docs/_includes/readme/features.md @@ -14,7 +14,7 @@ - Developing applications locally with werf [#1940](https://github.com/werf/werf/issues/1940). - ~Content-based tagging~ [#1184](https://github.com/werf/werf/issues/1184). - ~Support for the most Docker registry implementations~ [#2199](https://github.com/werf/werf/issues/2199). -- Parallel image builds [#2200](https://github.com/werf/werf/issues/2200). +- ~Parallel image builds~ [#2200](https://github.com/werf/werf/issues/2200). - Proven approaches and recipes for the most popular CI systems [#1617](https://github.com/werf/werf/issues/1617). - ~Distributed builds with the shared Docker registry~ [#1614](https://github.com/werf/werf/issues/1614). - Support for Helm 3 [#1606](https://github.com/werf/werf/issues/1606). @@ -27,7 +27,7 @@ - Effortlessly build as many images as you like in one project. - Build images using Dockerfiles or Stapel builder instructions. - Build images concurrently on a single host (using file locks). -- Build images simultaneously (coming soon) [#2200](https://github.com/werf/werf/issues/2200). +- Build images simultaneously. - Build images distributedly. - Advanced building process with Stapel: - Incremental rebuilds based on git history. diff --git a/docs/_includes/readme_ru/features.md b/docs/_includes/readme_ru/features.md index 3f09c5c383..0f8971f4c7 100644 --- a/docs/_includes/readme_ru/features.md +++ b/docs/_includes/readme_ru/features.md @@ -13,7 +13,7 @@ - Локальная разработка приложений с werf [#1940](https://github.com/werf/werf/issues/1940). - ~Тегирование, основанное на контенте~ [#1184](https://github.com/werf/werf/issues/1184). - ~Поддержка большинства имлементаций Docker registry~ [#2199](https://github.com/werf/werf/issues/2199). -- Параллельная сборка образов [#2200](https://github.com/werf/werf/issues/2200). +- ~Параллельная сборка образов~ [#2200](https://github.com/werf/werf/issues/2200). - Лучшие практики и рецепты для наиболее популярных CI-систем [#1617](https://github.com/werf/werf/issues/1617). - ~Распределенная сборка с общим Docker registry~ [#1614](https://github.com/werf/werf/issues/1614). - Поддержка Helm 3 [#1606](https://github.com/werf/werf/issues/1606). @@ -26,8 +26,8 @@ - Удобная сборка произвольного числа образов в одном проекте. - Сборка образов как из Dockerfile, так и из инструкций сборщика Stapel. - Параллельные сборки на одном хосте (с использованием файловых блокировок). -- Распределенная сборка (скоро) [#1614](https://github.com/werf/werf/issues/1614). -- Параллельная сборка образов (скоро) [#2200](https://github.com/werf/werf/issues/2200). +- Распределенная сборка. +- Параллельная сборка описанных в werf.yaml образов. - Расширенная сборка со сборщиком Stapel: - Инкрементальная пересборка на основе истории изменений Git. - Сборка образов с Shell-инструкциями и Ansible-заданиями. diff --git a/docs/pages/index.md b/docs/pages/index.md index c99e4e3a34..4f7522f9e2 100644 --- a/docs/pages/index.md +++ b/docs/pages/index.md @@ -246,7 +246,7 @@ layout: default
  • #1184 - Content-based tagging scheme. + Content-based tagging scheme.
  • @@ -258,7 +258,7 @@ layout: default
  • #1614 - Distributed builds with common Docker registry. + Distributed builds with common Docker registry.
  • @@ -270,7 +270,7 @@ layout: default
  • #2200 - Concurrent building of images. + Concurrent building of images.
  • diff --git a/docs/pages_ru/index.md b/docs/pages_ru/index.md index 8639717258..34afcf7b4b 100644 --- a/docs/pages_ru/index.md +++ b/docs/pages_ru/index.md @@ -248,19 +248,19 @@ layout: default
  • #1184 - Тегирование, основанное на контенте. + Тегирование, основанное на контенте.
  • #2199 - Поддержка большинства сервисов,
    предоставляющих Docker registry. + Поддержка большинства сервисов,
    предоставляющих Docker registry.
  • #1614 - Распределенная сборка с общим Docker registry. + Распределенная сборка с общим Docker registry.
  • @@ -272,7 +272,7 @@ layout: default
  • #2200 - Параллельная сборка образов. + Параллельная сборка образов.
  • diff --git a/go.mod b/go.mod index 7eef124fc2..cacc0396d8 100644 --- a/go.mod +++ b/go.mod @@ -59,9 +59,9 @@ require ( github.com/theupdateframework/notary v0.6.1 // indirect github.com/tonistiigi/fsutil v0.0.0-20200724193237-c3ed55f3b481 // indirect github.com/tonistiigi/go-rosetta v0.0.0-20200727161949-f79598599c5d // indirect - github.com/werf/kubedog v0.4.1-0.20200810223601-b04eea8908f3 + github.com/werf/kubedog v0.4.1-0.20200818145659-3aa022a95c07 github.com/werf/lockgate v0.0.0-20200729113342-ec2c142f71ea - github.com/werf/logboek v0.4.2 + github.com/werf/logboek v0.4.3 github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 diff --git a/go.sum b/go.sum index 18f8796454..4e98f449e0 100644 --- a/go.sum +++ b/go.sum @@ -1480,12 +1480,14 @@ github.com/werf/helm/v3 v3.0.0-20200810125217-27be3ebf58f2 h1:I38q4Jp1OsGBPVXGqV github.com/werf/helm/v3 v3.0.0-20200810125217-27be3ebf58f2/go.mod h1:ZaXz/vzktgwjyGGFbUWtIQkscfE7WYoRGP2szqAFHR0= github.com/werf/kubedog v0.4.1-0.20200810223601-b04eea8908f3 h1:Al6Rim/nDLK4qLhQMJYIzYh4eGM+LeALw6d/9RvBjPw= github.com/werf/kubedog v0.4.1-0.20200810223601-b04eea8908f3/go.mod h1:1kabCQ7X01iZCS0PEPcw1jidgecjCLT9WE8LuZ5gSi0= +github.com/werf/kubedog v0.4.1-0.20200818145659-3aa022a95c07 h1:pKU78lxdl5Ck4zkWzwH74FOnAC3uf/5u5rbS8SRcXwk= +github.com/werf/kubedog v0.4.1-0.20200818145659-3aa022a95c07/go.mod h1:b/5MyrNDmxXtr68uucMmZCZmWS3h7cfAcgqtQbHsw5o= github.com/werf/lockgate v0.0.0-20200729113342-ec2c142f71ea h1:R5tJUhL5a3YfHTrHWyuAdJW3h//fmONrpHJjjAZ79e4= github.com/werf/lockgate v0.0.0-20200729113342-ec2c142f71ea/go.mod h1:/CeY6KDiBSCU9PUmjt7zGhqpzp8FAPg/wNVfLZHQGWI= -github.com/werf/logboek v0.4.0 h1:aF3wGEtGtoxU3BVHUcQcWkfCjQSWfZ49NSvgxHYzhjE= github.com/werf/logboek v0.4.0/go.mod h1:ojB1CXOMReLbo3qvjxSdYHOERz6kEv3/Qc8t/qeUjRw= -github.com/werf/logboek v0.4.2 h1:2tdN8HWvHgQJge35Wqjxu8OO6N6Nsa2rmRe/N1G6rb0= github.com/werf/logboek v0.4.2/go.mod h1:79ek+WZSj2+UDdQcebk9lbOKj6S7PaJEljVihgQdJq8= +github.com/werf/logboek v0.4.3 h1:aXAVLlGT/ZiD8Srj+B71Tw20PtDyuYZSEJZjODNraeA= +github.com/werf/logboek v0.4.3/go.mod h1:79ek+WZSj2+UDdQcebk9lbOKj6S7PaJEljVihgQdJq8= github.com/xanzy/go-gitlab v0.31.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug= github.com/xanzy/go-gitlab v0.32.0/go.mod h1:sPLojNBn68fMUWSxIJtdVVIP8uSBYqesTfDUseX11Ug= github.com/xanzy/ssh-agent v0.2.1 h1:TCbipTQL2JiiCprBWx9frJ2eJlCYT00NmctrHxVAr70= diff --git a/integration/managed_images/_fixtures/default/werf.yaml b/integration/managed_images/_fixtures/default/werf.yaml index d6f8508392..4320f55099 100644 --- a/integration/managed_images/_fixtures/default/werf.yaml +++ b/integration/managed_images/_fixtures/default/werf.yaml @@ -1,8 +1,17 @@ project: none configVersion: 1 --- -image: [a, b, c] +image: a from: alpine +fromCacheVersion: a +--- +image: b +from: alpine +fromCacheVersion: b +--- +image: c +from: alpine +fromCacheVersion: c --- artifact: d from: alpine diff --git a/pkg/build/build_phase.go b/pkg/build/build_phase.go index 5bfa6c2b1e..628bf1d69f 100644 --- a/pkg/build/build_phase.go +++ b/pkg/build/build_phase.go @@ -139,7 +139,13 @@ func (phase *BuildPhase) onImageStage(ctx context.Context, img *Image, stg stage } if phase.ShouldBeBuiltMode { - return phase.calculateStage(ctx, img, stg, true) + err := phase.calculateStage(ctx, img, stg, true) + if err != nil { + return err + } + + defer phase.Conveyor.GetStageSignatureMutex(stg.GetSignature()).Unlock() + return nil } else { if stg.Name() != "from" && stg.Name() != "dockerfile" { if phase.StagesIterator.PrevNonEmptyStage == nil { @@ -156,6 +162,7 @@ func (phase *BuildPhase) onImageStage(ctx context.Context, img *Image, stg stage if err := phase.calculateStage(ctx, img, stg, false); err != nil { return err } + defer phase.Conveyor.GetStageSignatureMutex(stg.GetSignature()).Unlock() // Stage is cached in the stages storage if stg.GetImage().GetStageDescription() != nil { @@ -230,6 +237,14 @@ func (phase *BuildPhase) calculateStage(ctx context.Context, img *Image, stg sta } stg.SetSignature(stageSig) + logboek.Context(ctx).Info().LogProcessInline("Locking stage %s handling", stg.LogDetailedName()). + Options(func(options types.LogProcessInlineOptionsInterface) { + if !phase.Conveyor.Parallel { + options.Mute() + } + }). + Do(phase.Conveyor.GetStageSignatureMutex(stg.GetSignature()).Lock) + if stages, err := phase.Conveyor.StagesManager.GetStagesBySignature(ctx, stg.LogDetailedName(), stageSig); err != nil { return err } else { @@ -578,3 +593,8 @@ E.g.: logboek.Context(ctx).Warn().LogLn() }) } + +func (phase *BuildPhase) Clone() Phase { + u := *phase + return &u +} diff --git a/pkg/build/conveyor.go b/pkg/build/conveyor.go index 4c8acff9f8..8470f03f0a 100644 --- a/pkg/build/conveyor.go +++ b/pkg/build/conveyor.go @@ -12,6 +12,7 @@ import ( "reflect" "strconv" "strings" + "sync" "github.com/docker/cli/cli/command/image/build" "github.com/docker/docker/pkg/fileutils" @@ -52,7 +53,8 @@ type Conveyor struct { gitReposCaches map[string]*stage.GitRepoCache - imagesInOrder []*Image + images []*Image + imageSets [][]*Image stageImages map[string]*container_runtime.StageImage localGitRepo *git_repo.Local @@ -70,9 +72,15 @@ type Conveyor struct { importServers map[string]import_server.ImportServer ConveyorOptions + + mutex sync.Mutex + serviceRWMutex map[string]*sync.RWMutex + stageSignatureMutex map[string]*sync.Mutex } type ConveyorOptions struct { + Parallel bool + ParallelTasksLimit int64 LocalGitRepoVirtualMergeOptions stage.VirtualMergeOptions GitUnshallow bool AllowGitShallowClone bool @@ -93,7 +101,8 @@ func NewConveyor(werfConfig *config.WerfConfig, imageNamesToProcess []string, pr gitReposCaches: make(map[string]*stage.GitRepoCache), baseImagesRepoIdsCache: make(map[string]string), baseImagesRepoErrCache: make(map[string]error), - imagesInOrder: []*Image{}, + images: []*Image{}, + imageSets: [][]*Image{}, remoteGitRepos: make(map[string]*git_repo.Remote), tmpDir: filepath.Join(baseTmpDir, util.GenerateConsistentRandomString(10)), importServers: make(map[string]import_server.ImportServer), @@ -104,16 +113,92 @@ func NewConveyor(werfConfig *config.WerfConfig, imageNamesToProcess []string, pr StagesManager: stagesManager, ConveyorOptions: opts, + + serviceRWMutex: map[string]*sync.RWMutex{}, + stageSignatureMutex: map[string]*sync.Mutex{}, } return c, c.Init() } +func (c *Conveyor) getServiceRWMutex(service string) *sync.RWMutex { + c.mutex.Lock() + defer c.mutex.Unlock() + + rwMutex, ok := c.serviceRWMutex[service] + if !ok { + rwMutex = &sync.RWMutex{} + c.serviceRWMutex[service] = rwMutex + } + + return rwMutex +} + +func (c *Conveyor) IsBaseImagesRepoIdsCacheExist(key string) bool { + c.getServiceRWMutex("BaseImagesRepoIdsCache").RLock() + defer c.getServiceRWMutex("BaseImagesRepoIdsCache").RUnlock() + + _, exist := c.baseImagesRepoIdsCache[key] + return exist +} + +func (c *Conveyor) GetBaseImagesRepoIdsCache(key string) string { + c.getServiceRWMutex("BaseImagesRepoIdsCache").RLock() + defer c.getServiceRWMutex("BaseImagesRepoIdsCache").RUnlock() + + return c.baseImagesRepoIdsCache[key] +} + +func (c *Conveyor) SetBaseImagesRepoIdsCache(key, value string) { + c.getServiceRWMutex("BaseImagesRepoIdsCache").Lock() + defer c.getServiceRWMutex("BaseImagesRepoIdsCache").Unlock() + + c.baseImagesRepoIdsCache[key] = value +} + +func (c *Conveyor) IsBaseImagesRepoErrCacheExist(key string) bool { + c.getServiceRWMutex("GetBaseImagesRepoErrCache").RLock() + defer c.getServiceRWMutex("GetBaseImagesRepoErrCache").RUnlock() + + _, exist := c.baseImagesRepoErrCache[key] + return exist +} + +func (c *Conveyor) GetBaseImagesRepoErrCache(key string) error { + c.getServiceRWMutex("GetBaseImagesRepoErrCache").RLock() + defer c.getServiceRWMutex("GetBaseImagesRepoErrCache").RUnlock() + + return c.baseImagesRepoErrCache[key] +} + +func (c *Conveyor) SetBaseImagesRepoErrCache(key string, err error) { + c.getServiceRWMutex("BaseImagesRepoErrCache").Lock() + defer c.getServiceRWMutex("BaseImagesRepoErrCache").Unlock() + + c.baseImagesRepoErrCache[key] = err +} + +func (c *Conveyor) GetStageSignatureMutex(stage string) *sync.Mutex { + c.mutex.Lock() + defer c.mutex.Unlock() + + m, ok := c.stageSignatureMutex[stage] + if !ok { + m = &sync.Mutex{} + c.stageSignatureMutex[stage] = m + } + + return m +} + func (c *Conveyor) GetLocalGitRepoVirtualMergeOptions() stage.VirtualMergeOptions { return c.ConveyorOptions.LocalGitRepoVirtualMergeOptions } func (c *Conveyor) GetImportServer(ctx context.Context, imageName, stageName string) (import_server.ImportServer, error) { + c.getServiceRWMutex("ImportServer").Lock() + defer c.getServiceRWMutex("ImportServer").Unlock() + importServerName := imageName if stageName != "" { importServerName += "/" + stageName @@ -174,7 +259,7 @@ func (c *Conveyor) AppendOnTerminateFunc(f func() error) { func (c *Conveyor) Terminate(ctx context.Context) error { var terminateErrors []error - for gitRepoName, gitRepoCache := range c.gitReposCaches { + for gitRepoName, gitRepoCache := range c.GetGitRepoCaches() { if err := gitRepoCache.Terminate(); err != nil { terminateErrors = append(terminateErrors, fmt.Errorf("unable to terminate cache of git repo '%s': %s", gitRepoName, err)) } @@ -202,7 +287,17 @@ func (c *Conveyor) Terminate(ctx context.Context) error { return nil } -func (c *Conveyor) GetGitRepoCache(gitRepoName string) *stage.GitRepoCache { +func (c *Conveyor) GetGitRepoCaches() map[string]*stage.GitRepoCache { + c.getServiceRWMutex("GitRepoCaches").RLock() + defer c.getServiceRWMutex("GitRepoCaches").RUnlock() + + return c.gitReposCaches +} + +func (c *Conveyor) GetOrCreateGitRepoCache(gitRepoName string) *stage.GitRepoCache { + c.getServiceRWMutex("GitRepoCaches").Lock() + defer c.getServiceRWMutex("GitRepoCaches").Unlock() + if _, hasKey := c.gitReposCaches[gitRepoName]; !hasKey { c.gitReposCaches[gitRepoName] = &stage.GitRepoCache{ Archives: make(map[string]git_repo.Archive), @@ -210,17 +305,38 @@ func (c *Conveyor) GetGitRepoCache(gitRepoName string) *stage.GitRepoCache { Checksums: make(map[string]git_repo.Checksum), } } + return c.gitReposCaches[gitRepoName] } func (c *Conveyor) SetLocalGitRepo(repo *git_repo.Local) { + c.getServiceRWMutex("LocalGitRepo").Lock() + defer c.getServiceRWMutex("LocalGitRepo").Unlock() + c.localGitRepo = repo } func (c *Conveyor) GetLocalGitRepo() *git_repo.Local { + c.getServiceRWMutex("LocalGitRepo").RLock() + defer c.getServiceRWMutex("LocalGitRepo").RUnlock() + return c.localGitRepo } +func (c *Conveyor) SetRemoteGitRepo(key string, repo *git_repo.Remote) { + c.getServiceRWMutex("RemoteGitRepo").Lock() + defer c.getServiceRWMutex("RemoteGitRepo").Unlock() + + c.remoteGitRepos[key] = repo +} + +func (c *Conveyor) GetRemoteGitRepo(key string) *git_repo.Remote { + c.getServiceRWMutex("RemoteGitRepo").RLock() + defer c.getServiceRWMutex("RemoteGitRepo").RUnlock() + + return c.remoteGitRepos[key] +} + type TagOptions struct { CustomTags []string TagsByGitTag []string @@ -283,7 +399,7 @@ func (c *Conveyor) GetImageInfoGetters(configImages []*config.StapelImage, confi for _, imageName := range imagesNames { var tag string if tagStrategy == tag_strategy.StagesSignature { - for _, img := range c.imagesInOrder { + for _, img := range c.images { if img.GetName() == imageName { tag = img.GetContentSignature() break @@ -377,78 +493,62 @@ func (c *Conveyor) determineStages(ctx context.Context) error { } func (c *Conveyor) doDetermineStages(ctx context.Context) error { - imagesInterfaces := getImageConfigsInOrder(ctx, c) - for _, imageInterfaceConfig := range imagesInterfaces { - var img *Image - var imageLogName string - var style *style.Style - - switch imageConfig := imageInterfaceConfig.(type) { - case config.StapelImageInterface: - imageLogName = logging.ImageLogProcessName(imageConfig.ImageBaseConfig().Name, imageConfig.IsArtifact()) - style = ImageLogProcessStyle(imageConfig.IsArtifact()) - case *config.ImageFromDockerfile: - imageLogName = logging.ImageLogProcessName(imageConfig.Name, false) - style = ImageLogProcessStyle(false) - } + imageConfigsToProcess := getImageConfigsToProcess(ctx, c) + configSets := c.werfConfig.ImagesWithDependenciesBySets(imageConfigsToProcess) + + for _, iteration := range configSets { + var imageSet []*Image + + for _, imageInterfaceConfig := range iteration { + var img *Image + var imageLogName string + var style *style.Style + + switch imageConfig := imageInterfaceConfig.(type) { + case config.StapelImageInterface: + imageLogName = logging.ImageLogProcessName(imageConfig.ImageBaseConfig().Name, imageConfig.IsArtifact()) + style = ImageLogProcessStyle(imageConfig.IsArtifact()) + case *config.ImageFromDockerfile: + imageLogName = logging.ImageLogProcessName(imageConfig.Name, false) + style = ImageLogProcessStyle(false) + } - err := logboek.Context(ctx).Info().LogProcess(imageLogName). - Options(func(options types.LogProcessOptionsInterface) { - options.Style(style) - }). - DoError(func() error { - var err error + err := logboek.Context(ctx).Info().LogProcess(imageLogName). + Options(func(options types.LogProcessOptionsInterface) { + options.Style(style) + }). + DoError(func() error { + var err error - switch imageConfig := imageInterfaceConfig.(type) { - case config.StapelImageInterface: - img, err = prepareImageBasedOnStapelImageConfig(ctx, imageConfig, c) - case *config.ImageFromDockerfile: - img, err = prepareImageBasedOnImageFromDockerfile(ctx, imageConfig, c) - } + switch imageConfig := imageInterfaceConfig.(type) { + case config.StapelImageInterface: + img, err = prepareImageBasedOnStapelImageConfig(ctx, imageConfig, c) + case *config.ImageFromDockerfile: + img, err = prepareImageBasedOnImageFromDockerfile(ctx, imageConfig, c) + } - if err != nil { - return err - } + if err != nil { + return err + } - c.imagesInOrder = append(c.imagesInOrder, img) + c.images = append(c.images, img) + imageSet = append(imageSet, img) - return nil - }) + return nil + }) - if err != nil { - return err + if err != nil { + return err + } } + + c.imageSets = append(c.imageSets, imageSet) } return nil } func (c *Conveyor) runPhases(ctx context.Context, phases []Phase, logImages bool) error { - // TODO: Parallelize builds - //images (по зависимостям), dependantImagesByStage - //dependantImagesByStage строится в InitializationPhase, спросить у stage что она ждет. - //Количество воркеров-goroutine ограничено. - //Надо распределить images по воркерам. - // - //for img := range images { - // Goroutine { - // phases = append(phases, NewBuildStage()) - // - // for phase = range phases { - // phase.OnStart() - // - // for stage = range stages { - // for img = dependantImagesByStage[stage.name] { - // wait <-imgChanMap[img] - // } - // phase.HandleStage(stage) - // } - // } - // - // close(imgChanMap[img]) - // } Goroutine - //} - if lock, err := c.StorageLockManager.LockStagesAndImages(ctx, c.projectName(), storage.LockStagesAndImagesOptions{GetOrCreateImagesOnly: true}); err != nil { return fmt.Errorf("unable to lock stages and images (to get or create stages and images only): %s", err) } else { @@ -457,13 +557,6 @@ func (c *Conveyor) runPhases(ctx context.Context, phases []Phase, logImages bool }) } - var imagesLogger types.ManagerInterface - if logImages { - imagesLogger = logboek.Context(ctx).Default() - } else { - imagesLogger = logboek.Context(ctx).Info() - } - for _, phase := range phases { logProcess := logboek.Context(ctx).Debug().LogProcess("Phase %s -- BeforeImages()", phase.Name()) logProcess.Start() @@ -474,64 +567,198 @@ func (c *Conveyor) runPhases(ctx context.Context, phases []Phase, logImages bool logProcess.End() } - for _, img := range c.imagesInOrder { - if err := imagesLogger.LogProcess(img.LogDetailedName()). - Options(func(options types.LogProcessOptionsInterface) { - options.Style(img.LogProcessStyle()) - }). + if err := c.doImages(ctx, phases, logImages); err != nil { + return err + } + + for _, phase := range phases { + if err := logboek.Context(ctx).Debug().LogProcess(fmt.Sprintf("Phase %s -- AfterImages()", phase.Name())). DoError(func() error { + if err := phase.AfterImages(ctx); err != nil { + return fmt.Errorf("phase %s after images handler failed: %s", phase.Name(), err) + } + + return nil + }); err != nil { + return err + } + } + + return nil +} + +func (c *Conveyor) doImages(ctx context.Context, phases []Phase, logImages bool) error { + if !c.Parallel { + for _, img := range c.images { + if err := c.doImage(ctx, img, phases, logImages); err != nil { + return err + } + } + } else { + return c.doImagesInParallel(ctx, phases, logImages) + } + + return nil +} + +type goResult struct { + buff *bytes.Buffer + err error +} + +func (c *Conveyor) doImagesInParallel(ctx context.Context, phases []Phase, logImages bool) error { + doImageSetInParallel := func(setId int) error { + var buffs, doneBuffs []*bytes.Buffer + var goCtxs []context.Context + var goResults []goResult + imageSetCounter := len(c.imageSets[setId]) + + queueLength := imageSetCounter + if c.ParallelTasksLimit > 0 { + queueLength = int(c.ParallelTasksLimit) + } + + queueCh := make(chan bool, queueLength) + errCh := make(chan goResult) + doneCh := make(chan goResult) + quitCh := make(chan bool) + + isLiveOutputOn := true + + for ind := range c.imageSets[setId] { + var goCtx context.Context + var goResult goResult + + if ind == 0 { + goCtx = ctx + } else { + buf := bytes.NewBuffer([]byte{}) + goResult.buff = buf + buffs = append(buffs, buf) + goCtx = logboek.NewContext(ctx, logboek.NewSubLogger(buf, buf)) + logboek.Context(goCtx).Streams().SetPrefixStyle(style.Highlight()) + } + + goCtxs = append(goCtxs, goCtx) + goResults = append(goResults, goResult) + } + + go func() { + for ind, img := range c.imageSets[setId] { + goImg := img + goCtx := goCtxs[ind] + goResult := goResults[ind] + + var goPhases []Phase for _, phase := range phases { - logProcess := logboek.Context(ctx).Debug().LogProcess("Phase %s -- BeforeImageStages()", phase.Name()) - logProcess.Start() - if err := phase.BeforeImageStages(ctx, img); err != nil { - logProcess.Fail() - return fmt.Errorf("phase %s before image %s stages handler failed: %s", phase.Name(), img.GetLogName(), err) + goPhases = append(goPhases, phase.Clone()) + } + + go func() { + err := c.doImage(goCtx, goImg, goPhases, logImages) + goResult.err = err + + ch := doneCh + if err != nil { + ch = errCh } - logProcess.End() - logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- OnImageStage()", phase.Name()) - logProcess.Start() - for _, stg := range img.GetStages() { - logboek.Context(ctx).Debug().LogF("Phase %s -- OnImageStage() %s %s\n", phase.Name(), img.GetLogName(), stg.LogDetailedName()) - if err := phase.OnImageStage(ctx, img, stg); err != nil { - logProcess.Fail() - return fmt.Errorf("phase %s on image %s stage %s handler failed: %s", phase.Name(), img.GetLogName(), stg.Name(), err) - } + select { + case ch <- goResult: + return + case <-quitCh: + return } - logProcess.End() + }() - logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- AfterImageStages()", phase.Name()) - logProcess.Start() - if err := phase.AfterImageStages(ctx, img); err != nil { - logProcess.Fail() - return fmt.Errorf("phase %s after image %s stages handler failed: %s", phase.Name(), img.GetLogName(), err) + queueCh <- true + } + }() + + renderBuff := func(buf *bytes.Buffer) { + logboek.Streams().DoWithoutIndent(func() { + if logboek.Context(ctx).Streams().IsPrefixWithTimeEnabled() { + logboek.Context(ctx).Streams().DisablePrefixWithTime() + defer logboek.Context(ctx).Streams().EnablePrefixWithTime() + } + + _, _ = logboek.Context(ctx).ProxyOutStream().Write(buf.Bytes()) + logboek.Context(ctx).LogOptionalLn() + }) + } + + outerLoop: + for { + select { + case res := <-doneCh: + <-queueCh + + if res.buff != nil { + if isLiveOutputOn { + doneBuffs = append(doneBuffs, res.buff) + } else { + renderBuff(res.buff) } - logProcess.End() + } else { + isLiveOutputOn = false + for _, buf := range doneBuffs { + renderBuff(buf) + } + } - logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- ImageProcessingShouldBeStopped()", phase.Name()) - logProcess.Start() - if phase.ImageProcessingShouldBeStopped(ctx, img) { - logProcess.End() - return nil + imageSetCounter-- + if imageSetCounter == 0 { + break outerLoop + } + case res := <-errCh: + close(quitCh) + + if res.buff != nil { + logboek.Context(ctx).Reset() + for _, buf := range buffs { + if buf != res.buff { + renderBuff(buf) + } + } + + renderBuff(res.buff) + } else { + if logboek.Context(ctx).Info().IsAccepted() { + for _, buf := range buffs { + renderBuff(buf) + } } - logProcess.End() } - return nil - }); err != nil { - return err + return res.err + } } + + return nil } - for _, phase := range phases { - if err := logboek.Context(ctx).Debug().LogProcess(fmt.Sprintf("Phase %s -- AfterImages()", phase.Name())). - DoError(func() error { - if err := phase.AfterImages(ctx); err != nil { - return fmt.Errorf("phase %s after images handler failed: %s", phase.Name(), err) + blockMsg := "Concurrent builds plan" + if c.ParallelTasksLimit > 0 { + blockMsg = fmt.Sprintf("%s (simultaneously no more than %d image(s))", blockMsg, c.ParallelTasksLimit) + } + + logboek.Context(ctx).LogBlock(blockMsg). + Options(func(options types.LogBlockOptionsInterface) { + options.Style(style.Highlight()) + }). + Do(func() { + for setId := range c.imageSets { + logboek.Context(ctx).LogFHighlight("Set #%d:\n", setId) + for _, img := range c.imageSets[setId] { + logboek.Context(ctx).LogLnHighlight("-", img.name) } + logboek.Context(ctx).LogOptionalLn() - return nil - }); err != nil { + } + }) + + for setId := range c.imageSets { + if err := doImageSetInParallel(setId); err != nil { return err } } @@ -539,34 +766,97 @@ func (c *Conveyor) runPhases(ctx context.Context, phases []Phase, logImages bool return nil } +func (c *Conveyor) doImage(ctx context.Context, img *Image, phases []Phase, logImages bool) error { + var imagesLogger types.ManagerInterface + if logImages { + imagesLogger = logboek.Context(ctx).Default() + } else { + imagesLogger = logboek.Context(ctx).Info() + } + + return imagesLogger.LogProcess(img.LogDetailedName()). + Options(func(options types.LogProcessOptionsInterface) { + options.Style(img.LogProcessStyle()) + }). + DoError(func() error { + for _, phase := range phases { + logProcess := logboek.Context(ctx).Debug().LogProcess("Phase %s -- BeforeImageStages()", phase.Name()) + logProcess.Start() + if err := phase.BeforeImageStages(ctx, img); err != nil { + logProcess.Fail() + return fmt.Errorf("phase %s before image %s stages handler failed: %s", phase.Name(), img.GetLogName(), err) + } + logProcess.End() + + logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- OnImageStage()", phase.Name()) + logProcess.Start() + for _, stg := range img.GetStages() { + logboek.Context(ctx).Debug().LogF("Phase %s -- OnImageStage() %s %s\n", phase.Name(), img.GetLogName(), stg.LogDetailedName()) + if err := phase.OnImageStage(ctx, img, stg); err != nil { + logProcess.Fail() + return fmt.Errorf("phase %s on image %s stage %s handler failed: %s", phase.Name(), img.GetLogName(), stg.Name(), err) + } + } + logProcess.End() + + logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- AfterImageStages()", phase.Name()) + logProcess.Start() + if err := phase.AfterImageStages(ctx, img); err != nil { + logProcess.Fail() + return fmt.Errorf("phase %s after image %s stages handler failed: %s", phase.Name(), img.GetLogName(), err) + } + logProcess.End() + + logProcess = logboek.Context(ctx).Debug().LogProcess("Phase %s -- ImageProcessingShouldBeStopped()", phase.Name()) + logProcess.Start() + if phase.ImageProcessingShouldBeStopped(ctx, img) { + logProcess.End() + return nil + } + logProcess.End() + } + + return nil + }) +} + func (c *Conveyor) projectName() string { return c.werfConfig.Meta.Project } func (c *Conveyor) GetStageImage(name string) *container_runtime.StageImage { + c.getServiceRWMutex("StageImages").RLock() + defer c.getServiceRWMutex("StageImages").RUnlock() + return c.stageImages[name] } func (c *Conveyor) UnsetStageImage(name string) { + c.getServiceRWMutex("StageImages").Lock() + defer c.getServiceRWMutex("StageImages").Unlock() + delete(c.stageImages, name) } func (c *Conveyor) SetStageImage(stageImage *container_runtime.StageImage) { + c.getServiceRWMutex("StageImages").Lock() + defer c.getServiceRWMutex("StageImages").Unlock() + c.stageImages[stageImage.Name()] = stageImage } func (c *Conveyor) GetOrCreateStageImage(fromImage *container_runtime.StageImage, name string) *container_runtime.StageImage { - if img, ok := c.stageImages[name]; ok { + if img := c.GetStageImage(name); img != nil { return img } img := container_runtime.NewStageImage(fromImage, name, c.ContainerRuntime.(*container_runtime.LocalDockerServerRuntime)) - c.stageImages[name] = img + c.SetStageImage(img) return img } func (c *Conveyor) GetImage(name string) *Image { - for _, img := range c.imagesInOrder { + for _, img := range c.images { if img.GetName() == name { return img } @@ -613,8 +903,9 @@ func (c *Conveyor) GetImageTmpDir(imageName string) string { } func (c *Conveyor) GetProjectRepoCommit(ctx context.Context) (string, error) { - if c.localGitRepo != nil { - return c.localGitRepo.HeadCommit(ctx) + localGitRepo := c.GetLocalGitRepo() + if localGitRepo != nil { + return localGitRepo.HeadCommit(ctx) } else { return "", nil } @@ -676,28 +967,6 @@ func getFromFields(imageBaseConfig *config.StapelImageBase) (string, string, boo return from, fromImageName, imageBaseConfig.FromLatest } -func getImageConfigsInOrder(ctx context.Context, c *Conveyor) []config.ImageInterface { - var images []config.ImageInterface - for _, imageInterf := range getImageConfigsToProcess(ctx, c) { - var imagesInBuildOrder []config.ImageInterface - - switch image := imageInterf.(type) { - case *config.StapelImage, *config.StapelImageArtifact: - imagesInBuildOrder = c.werfConfig.ImageTree(image) - case *config.ImageFromDockerfile: - imagesInBuildOrder = append(imagesInBuildOrder, image) - } - - for i := 0; i < len(imagesInBuildOrder); i++ { - if isNotInArr(images, imagesInBuildOrder[i]) { - images = append(images, imagesInBuildOrder[i]) - } - } - } - - return images -} - func getImageConfigsToProcess(ctx context.Context, c *Conveyor) []config.ImageInterface { var imageConfigsToProcess []config.ImageInterface @@ -722,16 +991,6 @@ func getImageConfigsToProcess(ctx context.Context, c *Conveyor) []config.ImageIn return imageConfigsToProcess } -func isNotInArr(arr []config.ImageInterface, obj config.ImageInterface) bool { - for _, elm := range arr { - if reflect.DeepEqual(elm, obj) { - return false - } - } - - return true -} - func initStages(ctx context.Context, image *Image, imageInterfaceConfig config.StapelImageInterface, c *Conveyor) error { var stages []stage.Interface @@ -842,8 +1101,8 @@ func generateGitMappings(ctx context.Context, imageBaseConfig *config.StapelImag } for _, remoteGitMappingConfig := range imageBaseConfig.Git.Remote { - remoteGitRepo, exist := c.remoteGitRepos[remoteGitMappingConfig.Name] - if !exist { + remoteGitRepo := c.GetRemoteGitRepo(remoteGitMappingConfig.Name) + if remoteGitRepo == nil { var err error remoteGitRepo, err = git_repo.OpenRemoteRepo(remoteGitMappingConfig.Name, remoteGitMappingConfig.Url) if err != nil { @@ -857,7 +1116,7 @@ func generateGitMappings(ctx context.Context, imageBaseConfig *config.StapelImag return nil, err } - c.remoteGitRepos[remoteGitMappingConfig.Name] = remoteGitRepo + c.SetRemoteGitRepo(remoteGitMappingConfig.Name, remoteGitRepo) } gitMappings = append(gitMappings, gitRemoteArtifactInit(remoteGitMappingConfig, remoteGitRepo, imageBaseConfig.Name, c)) @@ -981,7 +1240,7 @@ func gitRemoteArtifactInit(remoteGitMappingConfig *config.GitRemote, remoteGitRe gitMapping.GitRepoInterface = remoteGitRepo - gitMapping.GitRepoCache = c.GetGitRepoCache(remoteGitRepo.GetName()) + gitMapping.GitRepoCache = c.GetOrCreateGitRepoCache(remoteGitRepo.GetName()) return gitMapping } @@ -993,7 +1252,7 @@ func gitLocalPathInit(localGitMappingConfig *config.GitLocal, localGitRepo *git_ gitMapping.GitRepoInterface = localGitRepo - gitMapping.GitRepoCache = c.GetGitRepoCache(localGitRepo.GetName()) + gitMapping.GitRepoCache = c.GetOrCreateGitRepoCache(localGitRepo.GetName()) return gitMapping } diff --git a/pkg/build/image.go b/pkg/build/image.go index 7e521a1444..54c1b7183f 100644 --- a/pkg/build/image.go +++ b/pkg/build/image.go @@ -195,13 +195,16 @@ func (i *Image) FetchBaseImage(ctx context.Context, c *Conveyor) error { } func (i *Image) getFromBaseImageIdFromRegistry(ctx context.Context, c *Conveyor, baseImageName string) (string, error) { + c.getServiceRWMutex("baseImagesRepoIdsCache" + baseImageName).Lock() + defer c.getServiceRWMutex("baseImagesRepoIdsCache" + baseImageName).Unlock() + if i.baseImageRepoId != "" { return i.baseImageRepoId, nil - } else if cachedBaseImageRepoId, exist := c.baseImagesRepoIdsCache[baseImageName]; exist { - i.baseImageRepoId = cachedBaseImageRepoId - return cachedBaseImageRepoId, nil - } else if cachedBaseImagesRepoErr, exist := c.baseImagesRepoErrCache[baseImageName]; exist { - return "", cachedBaseImagesRepoErr + } else if c.IsBaseImagesRepoIdsCacheExist(baseImageName) { + i.baseImageRepoId = c.GetBaseImagesRepoIdsCache(baseImageName) + return i.baseImageRepoId, nil + } else if c.IsBaseImagesRepoErrCacheExist(baseImageName) { + return "", c.GetBaseImagesRepoErrCache(baseImageName) } var fetchedBaseRepoImage *image.Info @@ -210,7 +213,7 @@ func (i *Image) getFromBaseImageIdFromRegistry(ctx context.Context, c *Conveyor, var fetchImageIdErr error fetchedBaseRepoImage, fetchImageIdErr = docker_registry.API().GetRepoImage(ctx, baseImageName) if fetchImageIdErr != nil { - c.baseImagesRepoErrCache[baseImageName] = fetchImageIdErr + c.SetBaseImagesRepoErrCache(baseImageName, fetchImageIdErr) return fmt.Errorf("can not get base image id from registry (%s): %s", baseImageName, fetchImageIdErr) } @@ -220,7 +223,7 @@ func (i *Image) getFromBaseImageIdFromRegistry(ctx context.Context, c *Conveyor, } i.baseImageRepoId = fetchedBaseRepoImage.ID - c.baseImagesRepoIdsCache[baseImageName] = i.baseImageRepoId + c.SetBaseImagesRepoIdsCache(baseImageName, i.baseImageRepoId) return i.baseImageRepoId, nil } diff --git a/pkg/build/phase.go b/pkg/build/phase.go index a6277b96c5..de584d9f12 100644 --- a/pkg/build/phase.go +++ b/pkg/build/phase.go @@ -14,6 +14,7 @@ type Phase interface { OnImageStage(ctx context.Context, img *Image, stg stage.Interface) error AfterImageStages(ctx context.Context, img *Image) error ImageProcessingShouldBeStopped(ctx context.Context, img *Image) bool + Clone() Phase } type BasePhase struct { diff --git a/pkg/build/publish_images_phase.go b/pkg/build/publish_images_phase.go index cb95c37495..212e74e5bf 100644 --- a/pkg/build/publish_images_phase.go +++ b/pkg/build/publish_images_phase.go @@ -130,10 +130,11 @@ func (phase *PublishImagesPhase) publishImage(ctx context.Context, img *Image) e nonEmptySchemeInOrder = append(nonEmptySchemeInOrder, strategy) } - if phase.Conveyor.localGitRepo != nil { + localGitRepo := phase.Conveyor.GetLocalGitRepo() + if localGitRepo != nil { if err := logboek.Context(ctx).Info().LogProcess(fmt.Sprintf("publishing image %s git metadata", img.GetName())). DoError(func() error { - headCommit, err := phase.Conveyor.localGitRepo.HeadCommit(ctx) + headCommit, err := localGitRepo.HeadCommit(ctx) if err != nil { return err } @@ -374,3 +375,8 @@ func (phase *PublishImagesPhase) checkImageAlreadyExists(ctx context.Context, ex return imageContentSignature == repoImageContentSignature, repoDockerImageID, nil } + +func (phase *PublishImagesPhase) Clone() Phase { + u := *phase + return &u +} diff --git a/pkg/config/werf.go b/pkg/config/werf.go index c02979f402..961e63c449 100644 --- a/pkg/config/werf.go +++ b/pkg/config/werf.go @@ -277,28 +277,84 @@ func (c *WerfConfig) validateInfiniteLoopBetweenRelatedImages() error { return nil } -func (c *WerfConfig) ImageTree(interf ImageInterface) (tree []ImageInterface) { +func (c *WerfConfig) ImagesWithDependenciesBySets(images []ImageInterface) (sets [][]ImageInterface) { + sets = [][]ImageInterface{} + isDepChecked := map[ImageInterface]bool{} + imageDepsToHandle := c.imageDependenciesInOrder(images) + + for len(imageDepsToHandle) != 0 { + var currentDeps []ImageInterface + + outerLoop: + for image, deps := range imageDepsToHandle { + for _, dep := range deps { + _, ok := isDepChecked[dep] + if !ok { + continue outerLoop + } + } + + currentDeps = append(currentDeps, image) + } + + for _, dep := range currentDeps { + isDepChecked[dep] = true + delete(imageDepsToHandle, dep) + } + + sets = append(sets, currentDeps) + } + + return sets +} + +func (c *WerfConfig) imageDependenciesInOrder(images []ImageInterface) (imageDeps map[ImageInterface][]ImageInterface) { + imageDeps = map[ImageInterface][]ImageInterface{} + stack := images + + for len(stack) != 0 { + current := stack[0] + stack = stack[1:] + + imageDeps[current] = c.imageDependencies(current) + + outerLoop: + for _, dep := range imageDeps[current] { + for key, _ := range imageDeps { + if key == dep { + continue outerLoop + } + } + + stack = append(stack, dep) + } + } + + return imageDeps +} + +func (c *WerfConfig) imageDependencies(interf ImageInterface) (deps []ImageInterface) { switch i := interf.(type) { case StapelImageInterface: if i.ImageBaseConfig().FromImageName != "" { - tree = append(tree, c.ImageTree(c.GetImage(i.ImageBaseConfig().FromImageName))...) + deps = append(deps, c.GetImage(i.ImageBaseConfig().FromImageName)) } if i.ImageBaseConfig().FromImageArtifactName != "" { - tree = append(tree, c.ImageTree(c.GetArtifact(i.ImageBaseConfig().FromImageArtifactName))...) + deps = append(deps, c.GetArtifact(i.ImageBaseConfig().FromImageArtifactName)) } for _, imp := range i.imports() { if imp.ImageName != "" { - tree = append(tree, c.ImageTree(c.GetImage(imp.ImageName))...) + deps = append(deps, c.GetImage(imp.ImageName)) } else if imp.ArtifactName != "" { - tree = append(tree, c.ImageTree(c.GetArtifact(imp.ArtifactName))...) + deps = append(deps, c.GetArtifact(imp.ArtifactName)) } } case *ImageFromDockerfile: } - return append(tree, interf) + return deps } func (c *WerfConfig) relatedImageImages(interf ImageInterface) (images []ImageInterface) {