diff --git a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/kubectl/rolling_updater.go b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/kubectl/rolling_updater.go index e69d889a73f4..142b8e10d532 100644 --- a/Godeps/_workspace/src/k8s.io/kubernetes/pkg/kubectl/rolling_updater.go +++ b/Godeps/_workspace/src/k8s.io/kubernetes/pkg/kubectl/rolling_updater.go @@ -83,6 +83,10 @@ type RollingUpdaterConfig struct { // further, ensuring that total number of pods running at any time during // the update is atmost 130% of desired pods. MaxSurge intstr.IntOrString + // OnProgress is invoked if set during each scale cycle, to allow the caller to perform additional logic or + // abort the scale. If an error is returned the cleanup method will not be invoked. The percentage value + // is a synthetic "progress" calculation that represents the approximate percentage completion. + OnProgress func(oldRc, newRc *api.ReplicationController, percentage int) error } // RollingUpdaterCleanupPolicy is a cleanup action to take after the @@ -216,6 +220,26 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { fmt.Fprintf(out, "Scaling up %s from %d to %d, scaling down %s from %d to 0 (keep %d pods available, don't exceed %d pods)\n", newRc.Name, newRc.Spec.Replicas, desired, oldRc.Name, oldRc.Spec.Replicas, minAvailable, desired+maxSurge) + // give a caller incremental notification and allow them to exit early + goal := desired - newRc.Spec.Replicas + if goal < 0 { + goal = -goal + } + progress := func(complete bool) error { + if config.OnProgress == nil { + return nil + } + progress := desired - newRc.Spec.Replicas + if progress < 0 { + progress = -progress + } + percentage := 100 + if !complete && goal > 0 { + percentage = (goal - progress) * 100 / goal + } + return config.OnProgress(oldRc, newRc, percentage) + } + // Scale newRc and oldRc until newRc has the desired number of replicas and // oldRc has 0 replicas. progressDeadline := time.Now().UnixNano() + config.Timeout.Nanoseconds() @@ -231,6 +255,11 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } newRc = scaledRc + // notify the caller if necessary + if err := progress(false); err != nil { + return err + } + // Wait between scaling operations for things to settle. time.Sleep(config.UpdatePeriod) @@ -241,6 +270,11 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } oldRc = scaledRc + // notify the caller if necessary + if err := progress(false); err != nil { + return err + } + // If we are making progress, continue to advance the progress deadline. // Otherwise, time out with an error. progressMade := (newRc.Spec.Replicas != newReplicas) || (oldRc.Spec.Replicas != oldReplicas) @@ -251,6 +285,11 @@ func (r *RollingUpdater) Update(config *RollingUpdaterConfig) error { } } + // notify the caller if necessary + if err := progress(true); err != nil { + return err + } + // Housekeeping and cleanup policy execution. return r.cleanup(oldRc, newRc, config) } diff --git a/contrib/completions/bash/openshift b/contrib/completions/bash/openshift index 1b22e35e0743..ba901b098d8e 100644 --- a/contrib/completions/bash/openshift +++ b/contrib/completions/bash/openshift @@ -15788,6 +15788,7 @@ _openshift_infra_deploy() flags+=("--deployment=") flags+=("--namespace=") + flags+=("--until=") flags+=("--google-json-key=") flags+=("--log-flush-frequency=") diff --git a/pkg/cmd/cli/describe/deployments.go b/pkg/cmd/cli/describe/deployments.go index e174307a2e0f..cf82f35d3ef3 100644 --- a/pkg/cmd/cli/describe/deployments.go +++ b/pkg/cmd/cli/describe/deployments.go @@ -172,53 +172,78 @@ func (d *DeploymentConfigDescriber) Describe(namespace, name string) (string, er }) } -func printStrategy(strategy deployapi.DeploymentStrategy, w *tabwriter.Writer) { - switch strategy.Type { - case deployapi.DeploymentStrategyTypeRecreate: - if strategy.RecreateParams != nil { - pre := strategy.RecreateParams.Pre - mid := strategy.RecreateParams.Mid - post := strategy.RecreateParams.Post - if pre != nil { - printHook("Pre-deployment", pre, w) - } - if mid != nil { - printHook("Mid-deployment", mid, w) - } - if post != nil { - printHook("Post-deployment", post, w) - } +func multilineStringArray(sep, indent string, args ...string) string { + for i, s := range args { + if strings.HasSuffix(s, "\n") { + s = strings.TrimSuffix(s, "\n") } - case deployapi.DeploymentStrategyTypeRolling: - if strategy.RollingParams != nil { - pre := strategy.RollingParams.Pre - post := strategy.RollingParams.Post - if pre != nil { - printHook("Pre-deployment", pre, w) - } - if post != nil { - printHook("Post-deployment", post, w) - } + if strings.Contains(s, "\n") { + s = "\n" + indent + strings.Join(strings.Split(s, "\n"), "\n"+indent) + } + args[i] = s + } + strings.TrimRight(args[len(args)-1], "\n ") + return strings.Join(args, " ") +} + +func printStrategy(strategy deployapi.DeploymentStrategy, indent string, w *tabwriter.Writer) { + if strategy.CustomParams != nil { + if len(strategy.CustomParams.Image) == 0 { + fmt.Fprintf(w, "%sImage:\t%s\n", indent, "") + } else { + fmt.Fprintf(w, "%sImage:\t%s\n", indent, strategy.CustomParams.Image) } - case deployapi.DeploymentStrategyTypeCustom: - fmt.Fprintf(w, "\t Image:\t%s\n", strategy.CustomParams.Image) if len(strategy.CustomParams.Environment) > 0 { - fmt.Fprintf(w, "\t Environment:\t%s\n", formatLabels(convertEnv(strategy.CustomParams.Environment))) + fmt.Fprintf(w, "%sEnvironment:\t%s\n", indent, formatLabels(convertEnv(strategy.CustomParams.Environment))) } if len(strategy.CustomParams.Command) > 0 { - fmt.Fprintf(w, "\t Command:\t%v\n", strings.Join(strategy.CustomParams.Command, " ")) + fmt.Fprintf(w, "%sCommand:\t%v\n", indent, multilineStringArray(" ", "\t ", strategy.CustomParams.Command...)) + } + } + + if strategy.RecreateParams != nil { + pre := strategy.RecreateParams.Pre + mid := strategy.RecreateParams.Mid + post := strategy.RecreateParams.Post + if pre != nil { + printHook("Pre-deployment", pre, indent, w) + } + if mid != nil { + printHook("Mid-deployment", mid, indent, w) + } + if post != nil { + printHook("Post-deployment", post, indent, w) + } + } + + if strategy.RollingParams != nil { + pre := strategy.RollingParams.Pre + post := strategy.RollingParams.Post + if pre != nil { + printHook("Pre-deployment", pre, indent, w) + } + if post != nil { + printHook("Post-deployment", post, indent, w) } } } -func printHook(prefix string, hook *deployapi.LifecycleHook, w io.Writer) { +func printHook(prefix string, hook *deployapi.LifecycleHook, indent string, w io.Writer) { if hook.ExecNewPod != nil { - fmt.Fprintf(w, "\t %s hook (pod type, failure policy: %s):\n", prefix, hook.FailurePolicy) - fmt.Fprintf(w, "\t Container:\t%s\n", hook.ExecNewPod.ContainerName) - fmt.Fprintf(w, "\t Command:\t%v\n", strings.Join(hook.ExecNewPod.Command, " ")) - fmt.Fprintf(w, "\t Env:\t%s\n", formatLabels(convertEnv(hook.ExecNewPod.Env))) + fmt.Fprintf(w, "%s%s hook (pod type, failure policy: %s):\n", indent, prefix, hook.FailurePolicy) + fmt.Fprintf(w, "%s Container:\t%s\n", indent, hook.ExecNewPod.ContainerName) + fmt.Fprintf(w, "%s Command:\t%v\n", indent, multilineStringArray(" ", "\t ", hook.ExecNewPod.Command...)) + if len(hook.ExecNewPod.Env) > 0 { + fmt.Fprintf(w, "%s Env:\t%s\n", indent, formatLabels(convertEnv(hook.ExecNewPod.Env))) + } + } + if len(hook.TagImages) > 0 { + fmt.Fprintf(w, "%s%s hook (tag images, failure policy: %s):\n", indent, prefix, hook.FailurePolicy) + for _, image := range hook.TagImages { + fmt.Fprintf(w, "%s Tag:\tcontainer %s to %s %s %s\n", indent, image.ContainerName, image.To.Kind, image.To.Name, image.To.Namespace) + } } } @@ -262,7 +287,7 @@ func printDeploymentConfigSpec(spec deployapi.DeploymentConfigSpec, w *tabwriter // Strategy formatString(w, "Strategy", spec.Strategy.Type) - printStrategy(spec.Strategy, w) + printStrategy(spec.Strategy, " ", w) // Pod template fmt.Fprintf(w, "Template:\n") diff --git a/pkg/cmd/infra/deployer/deployer.go b/pkg/cmd/infra/deployer/deployer.go index a1bce07b851c..ad48924b1956 100644 --- a/pkg/cmd/infra/deployer/deployer.go +++ b/pkg/cmd/infra/deployer/deployer.go @@ -2,20 +2,21 @@ package deployer import ( "fmt" + "io" + "os" "sort" "time" - "github.com/golang/glog" "github.com/spf13/cobra" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/restclient" kclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubectl" + kcmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "github.com/openshift/origin/pkg/client" "github.com/openshift/origin/pkg/cmd/util" - "github.com/openshift/origin/pkg/cmd/util/clientcmd" deployapi "github.com/openshift/origin/pkg/deploy/api" "github.com/openshift/origin/pkg/deploy/strategy" "github.com/openshift/origin/pkg/deploy/strategy/recreate" @@ -28,12 +29,34 @@ const ( deployerLong = ` Perform a deployment -This command launches a deployment as described by a deployment configuration.` +This command launches a deployment as described by a deployment configuration. It accepts the name +of a replication controller created by a deployment and runs that deployment to completion. You can +use the --until flag to run the deployment until you reach the specified condition. + +Available conditions: + +* "start": after old deployments are scaled to zero +* "pre": after the pre hook completes (even if no hook specified) +* "mid": after the mid hook completes (even if no hook specified) +* A percentage of the deployment, based on which strategy is in use + * "0%" Recreate after the previous deployment is scaled to zero + * "N%" Recreate after the acceptance check if this is not the first deployment + * "0%" Rolling before the rolling deployment is started, equivalent to "pre" + * "N%" Rolling the percentage of pods in the target deployment that are ready + * "100%" All after the deployment is at full scale, but before the post hook runs + +Unrecognized conditions will be ignored and the deployment will run to completion. You can run this +command multiple times when --until is specified - hooks will only be executed once. +` ) type config struct { - DeploymentName string - Namespace string + Out, ErrOut io.Writer + + rcName string + Namespace string + + Until string } // NewCommandDeployer provides a CLI handler for deploy. @@ -41,50 +64,63 @@ func NewCommandDeployer(name string) *cobra.Command { cfg := &config{} cmd := &cobra.Command{ - Use: fmt.Sprintf("%s%s", name, clientcmd.ConfigSyntax), + Use: fmt.Sprintf("%s [--until=CONDITION]", name), Short: "Run the deployer", Long: deployerLong, Run: func(c *cobra.Command, args []string) { - if len(cfg.DeploymentName) == 0 { - glog.Fatal("deployment is required") - } - if len(cfg.Namespace) == 0 { - glog.Fatal("namespace is required") - } - - kcfg, err := restclient.InClusterConfig() - if err != nil { - glog.Fatal(err) - } - kc, err := kclient.New(kcfg) - if err != nil { - glog.Fatal(err) - } - oc, err := client.New(kcfg) - if err != nil { - glog.Fatal(err) - } - - deployer := NewDeployer(kc, oc) - if err = deployer.Deploy(cfg.Namespace, cfg.DeploymentName); err != nil { - glog.Fatal(err) + cfg.Out = os.Stdout + cfg.ErrOut = c.Out() + err := cfg.RunDeployer() + if strategy.IsConditionReached(err) { + fmt.Fprintf(os.Stdout, "--> %s\n", err.Error()) + return } + kcmdutil.CheckErr(err) }, } cmd.AddCommand(version.NewVersionCommand(name, false)) flag := cmd.Flags() - flag.StringVar(&cfg.DeploymentName, "deployment", util.Env("OPENSHIFT_DEPLOYMENT_NAME", ""), "The deployment name to start") + flag.StringVar(&cfg.rcName, "deployment", util.Env("OPENSHIFT_DEPLOYMENT_NAME", ""), "The deployment name to start") flag.StringVar(&cfg.Namespace, "namespace", util.Env("OPENSHIFT_DEPLOYMENT_NAMESPACE", ""), "The deployment namespace") + flag.StringVar(&cfg.Until, "until", "", "Exit the deployment when this condition is met. See help for more details") return cmd } +func (cfg *config) RunDeployer() error { + if len(cfg.rcName) == 0 { + return fmt.Errorf("--deployment or OPENSHIFT_DEPLOYMENT_NAME is required") + } + if len(cfg.Namespace) == 0 { + return fmt.Errorf("--namespace or OPENSHIFT_DEPLOYMENT_NAMESPACE is required") + } + + kcfg, err := restclient.InClusterConfig() + if err != nil { + return err + } + kc, err := kclient.New(kcfg) + if err != nil { + return err + } + oc, err := client.New(kcfg) + if err != nil { + return err + } + + deployer := NewDeployer(kc, oc, cfg.Out, cfg.ErrOut, cfg.Until) + return deployer.Deploy(cfg.Namespace, cfg.rcName) +} + // NewDeployer makes a new Deployer from a kube client. -func NewDeployer(client kclient.Interface, oclient client.Interface) *Deployer { +func NewDeployer(client kclient.Interface, oclient client.Interface, out, errOut io.Writer, until string) *Deployer { scaler, _ := kubectl.ScalerFor(kapi.Kind("ReplicationController"), client) return &Deployer{ + out: out, + errOut: errOut, + until: until, getDeployment: func(namespace, name string) (*kapi.ReplicationController, error) { return client.ReplicationControllers(namespace).Get(name) }, @@ -95,10 +131,10 @@ func NewDeployer(client kclient.Interface, oclient client.Interface) *Deployer { strategyFor: func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) { switch config.Spec.Strategy.Type { case deployapi.DeploymentStrategyTypeRecreate: - return recreate.NewRecreateDeploymentStrategy(client, oclient, kapi.Codecs.UniversalDecoder()), nil + return recreate.NewRecreateDeploymentStrategy(client, oclient, kapi.Codecs.UniversalDecoder(), out, errOut, until), nil case deployapi.DeploymentStrategyTypeRolling: - recreate := recreate.NewRecreateDeploymentStrategy(client, oclient, kapi.Codecs.UniversalDecoder()) - return rolling.NewRollingDeploymentStrategy(config.Namespace, client, oclient, kapi.Codecs.UniversalDecoder(), recreate), nil + recreate := recreate.NewRecreateDeploymentStrategy(client, oclient, kapi.Codecs.UniversalDecoder(), out, errOut, until) + return rolling.NewRollingDeploymentStrategy(config.Namespace, client, oclient, kapi.Codecs.UniversalDecoder(), recreate, out, errOut, until), nil default: return nil, fmt.Errorf("unsupported strategy type: %s", config.Spec.Strategy.Type) } @@ -115,6 +151,10 @@ func NewDeployer(client kclient.Interface, oclient client.Interface) *Deployer { // 4. Pass the last completed deployment and the new deployment to a strategy // to perform the deployment. type Deployer struct { + // out and errOut control display when deploy is invoked + out, errOut io.Writer + // until is a condition to run until + until string // strategyFor returns a DeploymentStrategy for config. strategyFor func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) // getDeployment finds the named deployment. @@ -125,22 +165,22 @@ type Deployer struct { scaler kubectl.Scaler } -// Deploy starts the deployment process for deploymentName. -func (d *Deployer) Deploy(namespace, deploymentName string) error { +// Deploy starts the deployment process for rcName. +func (d *Deployer) Deploy(namespace, rcName string) error { // Look up the new deployment. - to, err := d.getDeployment(namespace, deploymentName) + to, err := d.getDeployment(namespace, rcName) if err != nil { - return fmt.Errorf("couldn't get deployment %s/%s: %v", namespace, deploymentName, err) + return fmt.Errorf("couldn't get deployment %s: %v", rcName, err) } // Decode the config from the deployment. config, err := deployutil.DecodeDeploymentConfig(to, kapi.Codecs.UniversalDecoder()) if err != nil { - return fmt.Errorf("couldn't decode deployment config from deployment %s/%s: %v", to.Namespace, to.Name, err) + return fmt.Errorf("couldn't decode deployment config from deployment %s: %v", to.Name, err) } // Get a strategy for the deployment. - strategy, err := d.strategyFor(config) + s, err := d.strategyFor(config) if err != nil { return err } @@ -148,7 +188,7 @@ func (d *Deployer) Deploy(namespace, deploymentName string) error { // New deployments must have a desired replica count. desiredReplicas, hasDesired := deployutil.DeploymentDesiredReplicas(to) if !hasDesired { - return fmt.Errorf("deployment %s has no desired replica count", deployutil.LabelForDeployment(to)) + return fmt.Errorf("deployment %s has already run to completion", to.Name) } // Find all deployments for the config. @@ -189,17 +229,20 @@ func (d *Deployer) Deploy(namespace, deploymentName string) error { // Scale the deployment down to zero. retryWaitParams := kubectl.NewRetryParams(1*time.Second, 120*time.Second) if err := d.scaler.Scale(candidate.Namespace, candidate.Name, uint(0), &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}, retryWaitParams, retryWaitParams); err != nil { - glog.Errorf("Couldn't scale down prior deployment %s: %v", deployutil.LabelForDeployment(&candidate), err) + fmt.Fprintf(d.errOut, "error: Couldn't scale down prior deployment %s: %v\n", deployutil.LabelForDeployment(&candidate), err) } else { - glog.Infof("Scaled down prior deployment %s", deployutil.LabelForDeployment(&candidate)) + fmt.Fprintf(d.out, "--> Scaled older deployment %s down\n", candidate.Name) } } + if d.until == "start" { + return strategy.NewConditionReachedErr("Ready to start deployment") + } + // Perform the deployment. - if from == nil { - glog.Infof("Deploying %s for the first time (replicas: %d)", deployutil.LabelForDeployment(to), desiredReplicas) - } else { - glog.Infof("Deploying from %s to %s (replicas: %d)", deployutil.LabelForDeployment(from), deployutil.LabelForDeployment(to), desiredReplicas) + if err := s.Deploy(from, to, desiredReplicas); err != nil { + return err } - return strategy.Deploy(from, to, desiredReplicas) + fmt.Fprintf(d.out, "--> Success\n") + return nil } diff --git a/pkg/cmd/infra/deployer/deployer_test.go b/pkg/cmd/infra/deployer/deployer_test.go index 3ed3c1a252f9..51fac74ebd11 100644 --- a/pkg/cmd/infra/deployer/deployer_test.go +++ b/pkg/cmd/infra/deployer/deployer_test.go @@ -1,6 +1,7 @@ package deployer import ( + "bytes" "fmt" "strconv" "testing" @@ -135,6 +136,8 @@ func TestDeployer_deployScenarios(t *testing.T) { scaler := &scalertest.FakeScaler{} deployer := &Deployer{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, strategyFor: func(config *deployapi.DeploymentConfig) (strategy.DeploymentStrategy, error) { return &testStrategy{ deployFunc: func(from *kapi.ReplicationController, to *kapi.ReplicationController, desiredReplicas int) error { diff --git a/pkg/deploy/api/deep_copy_generated.go b/pkg/deploy/api/deep_copy_generated.go index 6417e48c4dd9..f36d9a559481 100644 --- a/pkg/deploy/api/deep_copy_generated.go +++ b/pkg/deploy/api/deep_copy_generated.go @@ -270,15 +270,6 @@ func DeepCopy_api_DeploymentLogOptions(in DeploymentLogOptions, out *DeploymentL func DeepCopy_api_DeploymentStrategy(in DeploymentStrategy, out *DeploymentStrategy, c *conversion.Cloner) error { out.Type = in.Type - if in.CustomParams != nil { - in, out := in.CustomParams, &out.CustomParams - *out = new(CustomDeploymentStrategyParams) - if err := DeepCopy_api_CustomDeploymentStrategyParams(*in, *out, c); err != nil { - return err - } - } else { - out.CustomParams = nil - } if in.RecreateParams != nil { in, out := in.RecreateParams, &out.RecreateParams *out = new(RecreateDeploymentStrategyParams) @@ -297,6 +288,15 @@ func DeepCopy_api_DeploymentStrategy(in DeploymentStrategy, out *DeploymentStrat } else { out.RollingParams = nil } + if in.CustomParams != nil { + in, out := in.CustomParams, &out.CustomParams + *out = new(CustomDeploymentStrategyParams) + if err := DeepCopy_api_CustomDeploymentStrategyParams(*in, *out, c); err != nil { + return err + } + } else { + out.CustomParams = nil + } if err := api.DeepCopy_api_ResourceRequirements(in.Resources, &out.Resources, c); err != nil { return err } diff --git a/pkg/deploy/api/types.go b/pkg/deploy/api/types.go index 719d967849fd..e8199bac981f 100644 --- a/pkg/deploy/api/types.go +++ b/pkg/deploy/api/types.go @@ -29,13 +29,16 @@ type DeploymentStrategy struct { // Type is the name of a deployment strategy. Type DeploymentStrategyType - // CustomParams are the input to the Custom deployment strategy. - CustomParams *CustomDeploymentStrategyParams // RecreateParams are the input to the Recreate deployment strategy. RecreateParams *RecreateDeploymentStrategyParams // RollingParams are the input to the Rolling deployment strategy. RollingParams *RollingDeploymentStrategyParams + // CustomParams are the input to the Custom deployment strategy, and may also + // be specified for the Recreate and Rolling strategies to customize the execution + // process that runs the deployment. + CustomParams *CustomDeploymentStrategyParams + // Resources contains resource requirements to execute the deployment Resources kapi.ResourceRequirements // Labels is a set of key, value pairs added to custom deployer and lifecycle pre/post hook pods. @@ -50,7 +53,7 @@ type DeploymentStrategyType string const ( // DeploymentStrategyTypeRecreate is a simple strategy suitable as a default. DeploymentStrategyTypeRecreate DeploymentStrategyType = "Recreate" - // DeploymentStrategyTypeCustom is a user defined strategy. + // DeploymentStrategyTypeCustom is a user defined strategy. It is optional to set. DeploymentStrategyTypeCustom DeploymentStrategyType = "Custom" // DeploymentStrategyTypeRolling uses the Kubernetes RollingUpdater. DeploymentStrategyTypeRolling DeploymentStrategyType = "Rolling" @@ -199,6 +202,10 @@ const ( // annotation value is the name of the deployer Pod which will act upon the ReplicationController // to implement the deployment behavior. DeploymentPodAnnotation = "openshift.io/deployer-pod.name" + // DeploymentIgnorePodAnnotation is an annotation on a deployment config that will bypass creating + // a deployment pod with the deployment. The caller is responsible for setting the deployment + // status and running the deployment process. + DeploymentIgnorePodAnnotation = "deploy.openshift.io/deployer-pod.ignore" // DeploymentPodTypeLabel is a label with which contains a type of deployment pod. DeploymentPodTypeLabel = "openshift.io/deployer-pod.type" // DeployerPodForDeploymentLabel is a label which groups pods related to a diff --git a/pkg/deploy/api/v1/conversion_generated.go b/pkg/deploy/api/v1/conversion_generated.go index 73be54eac1f0..61107edea2c5 100644 --- a/pkg/deploy/api/v1/conversion_generated.go +++ b/pkg/deploy/api/v1/conversion_generated.go @@ -759,15 +759,6 @@ func autoConvert_api_DeploymentStrategy_To_v1_DeploymentStrategy(in *deploy_api. defaulting.(func(*deploy_api.DeploymentStrategy))(in) } out.Type = DeploymentStrategyType(in.Type) - if in.CustomParams != nil { - in, out := &in.CustomParams, &out.CustomParams - *out = new(CustomDeploymentStrategyParams) - if err := Convert_api_CustomDeploymentStrategyParams_To_v1_CustomDeploymentStrategyParams(*in, *out, s); err != nil { - return err - } - } else { - out.CustomParams = nil - } if in.RecreateParams != nil { in, out := &in.RecreateParams, &out.RecreateParams *out = new(RecreateDeploymentStrategyParams) @@ -786,6 +777,15 @@ func autoConvert_api_DeploymentStrategy_To_v1_DeploymentStrategy(in *deploy_api. } else { out.RollingParams = nil } + if in.CustomParams != nil { + in, out := &in.CustomParams, &out.CustomParams + *out = new(CustomDeploymentStrategyParams) + if err := Convert_api_CustomDeploymentStrategyParams_To_v1_CustomDeploymentStrategyParams(*in, *out, s); err != nil { + return err + } + } else { + out.CustomParams = nil + } // TODO: Inefficient conversion - can we improve it? if err := s.Convert(&in.Resources, &out.Resources, 0); err != nil { return err diff --git a/pkg/deploy/api/validation/validation.go b/pkg/deploy/api/validation/validation.go index 62cd62f3f0e0..40b3464dbe1f 100644 --- a/pkg/deploy/api/validation/validation.go +++ b/pkg/deploy/api/validation/validation.go @@ -110,6 +110,10 @@ func validateDeploymentStrategy(strategy *deployapi.DeploymentStrategy, pod *kap errs = append(errs, field.Required(fldPath.Child("type"), "")) } + if strategy.CustomParams != nil { + errs = append(errs, validateCustomParams(strategy.CustomParams, fldPath.Child("customParams"))...) + } + switch strategy.Type { case deployapi.DeploymentStrategyTypeRecreate: if strategy.RecreateParams != nil { @@ -124,8 +128,12 @@ func validateDeploymentStrategy(strategy *deployapi.DeploymentStrategy, pod *kap case deployapi.DeploymentStrategyTypeCustom: if strategy.CustomParams == nil { errs = append(errs, field.Required(fldPath.Child("customParams"), "")) - } else { - errs = append(errs, validateCustomParams(strategy.CustomParams, fldPath.Child("customParams"))...) + } + if strategy.RollingParams != nil { + errs = append(errs, validateRollingParams(strategy.RollingParams, pod, fldPath.Child("rollingParams"))...) + } + if strategy.RecreateParams != nil { + errs = append(errs, validateRecreateParams(strategy.RecreateParams, pod, fldPath.Child("recreateParams"))...) } case "": errs = append(errs, field.Required(fldPath.Child("type"), "strategy type is required")) @@ -140,7 +148,7 @@ func validateDeploymentStrategy(strategy *deployapi.DeploymentStrategy, pod *kap errs = append(errs, validation.ValidateAnnotations(strategy.Annotations, fldPath.Child("annotations"))...) } - // TODO: validate resource requirements (prereq: https://github.com/kubernetes/kubernetes/pull/7059) + errs = append(errs, validation.ValidateResourceRequirements(&strategy.Resources, fldPath.Child("resources"))...) return errs } @@ -148,9 +156,7 @@ func validateDeploymentStrategy(strategy *deployapi.DeploymentStrategy, pod *kap func validateCustomParams(params *deployapi.CustomDeploymentStrategyParams, fldPath *field.Path) field.ErrorList { errs := field.ErrorList{} - if len(params.Image) == 0 { - errs = append(errs, field.Required(fldPath.Child("image"), "")) - } + errs = append(errs, validateEnv(params.Environment, fldPath.Child("environment"))...) return errs } @@ -235,7 +241,7 @@ func validateEnv(vars []kapi.EnvVar, fldPath *field.Path) field.ErrorList { for i, ev := range vars { vErrs := field.ErrorList{} - idxPath := fldPath.Child("name").Index(i) + idxPath := fldPath.Index(i).Child("name") if len(ev.Name) == 0 { vErrs = append(vErrs, field.Required(idxPath, "")) } diff --git a/pkg/deploy/api/validation/validation_test.go b/pkg/deploy/api/validation/validation_test.go index 7ebd1c542fb0..cc4adc280d8f 100644 --- a/pkg/deploy/api/validation/validation_test.go +++ b/pkg/deploy/api/validation/validation_test.go @@ -240,7 +240,7 @@ func TestValidateDeploymentConfigMissingFields(t *testing.T) { field.ErrorTypeRequired, "spec.strategy.customParams", }, - "missing spec.strategy.customParams.image": { + "invalid spec.strategy.customParams.environment": { api.DeploymentConfig{ ObjectMeta: kapi.ObjectMeta{Name: "foo", Namespace: "bar"}, Spec: api.DeploymentConfigSpec{ @@ -248,14 +248,18 @@ func TestValidateDeploymentConfigMissingFields(t *testing.T) { Triggers: manualTrigger(), Selector: test.OkSelector(), Strategy: api.DeploymentStrategy{ - Type: api.DeploymentStrategyTypeCustom, - CustomParams: &api.CustomDeploymentStrategyParams{}, + Type: api.DeploymentStrategyTypeCustom, + CustomParams: &api.CustomDeploymentStrategyParams{ + Environment: []kapi.EnvVar{ + {Name: "A=B"}, + }, + }, }, Template: test.OkPodTemplate(), }, }, - field.ErrorTypeRequired, - "spec.strategy.customParams.image", + field.ErrorTypeInvalid, + "spec.strategy.customParams.environment[0].name", }, "missing spec.strategy.recreateParams.pre.failurePolicy": { api.DeploymentConfig{ diff --git a/pkg/deploy/controller/deployment/controller.go b/pkg/deploy/controller/deployment/controller.go index c1ea342c4c16..16513d6cd052 100644 --- a/pkg/deploy/controller/deployment/controller.go +++ b/pkg/deploy/controller/deployment/controller.go @@ -103,6 +103,11 @@ func (c *DeploymentController) Handle(deployment *kapi.ReplicationController) er // Don't try and re-create the deployer pod. break } + + if _, ok := deployment.Annotations[deployapi.DeploymentIgnorePodAnnotation]; ok { + return nil + } + // Generate a deployer pod spec. podTemplate, err := c.makeDeployerPod(deployment) if err != nil { @@ -240,6 +245,8 @@ func (c *DeploymentController) makeDeployerPod(deployment *kapi.ReplicationContr }, }, ActiveDeadlineSeconds: &maxDeploymentDurationSeconds, + DNSPolicy: deployment.Spec.Template.Spec.DNSPolicy, + ImagePullSecrets: deployment.Spec.Template.Spec.ImagePullSecrets, // Setting the node selector on the deployer pod so that it is created // on the same set of nodes as the pods. NodeSelector: deployment.Spec.Template.Spec.NodeSelector, diff --git a/pkg/deploy/controller/deployment/factory.go b/pkg/deploy/controller/deployment/factory.go index 96cad479f3ca..361a7c49c587 100644 --- a/pkg/deploy/controller/deployment/factory.go +++ b/pkg/deploy/controller/deployment/factory.go @@ -12,6 +12,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/flowcontrol" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/watch" controller "github.com/openshift/origin/pkg/controller" @@ -135,32 +136,42 @@ func (factory *DeploymentControllerFactory) Create() controller.RunnableControll // 1. For the Recreate and Rolling strategies, strategy, use the factory's // DeployerImage as the container image, and the factory's Environment // as the container environment. -// 2. For all Custom strategy, use the strategy's image for the container -// image, and use the combination of the factory's Environment and the -// strategy's environment as the container environment. +// 2. For all Custom strategies, or if the CustomParams field is set, use +// the strategy's image for the container image, and use the combination +// of the factory's Environment and the strategy's environment as the +// container environment. +// func (factory *DeploymentControllerFactory) makeContainer(strategy *deployapi.DeploymentStrategy) *kapi.Container { - // Set default environment values - environment := []kapi.EnvVar{} - for _, env := range factory.Environment { - environment = append(environment, env) - } + image := factory.DeployerImage + var environment []kapi.EnvVar + var command []string - // Every strategy type should be handled here. - switch strategy.Type { - case deployapi.DeploymentStrategyTypeRecreate, deployapi.DeploymentStrategyTypeRolling: - // Use the factory-configured image. - case deployapi.DeploymentStrategyTypeCustom: - // Use user-defined values from the strategy input. + set := sets.NewString() + // Use user-defined values from the strategy input. + if p := strategy.CustomParams; p != nil { + if len(p.Image) > 0 { + image = p.Image + } + if len(p.Command) > 0 { + command = p.Command + } for _, env := range strategy.CustomParams.Environment { + set.Insert(env.Name) environment = append(environment, env) } - return &kapi.Container{ - Image: strategy.CustomParams.Image, - Env: environment, + } + + // Set default environment values + for _, env := range factory.Environment { + if set.Has(env.Name) { + continue } + environment = append(environment, env) } + return &kapi.Container{ - Image: factory.DeployerImage, - Env: environment, + Image: image, + Command: command, + Env: environment, } } diff --git a/pkg/deploy/strategy/interfaces.go b/pkg/deploy/strategy/interfaces.go index 557b883c09b1..6525e3eea53a 100644 --- a/pkg/deploy/strategy/interfaces.go +++ b/pkg/deploy/strategy/interfaces.go @@ -1,6 +1,9 @@ package strategy import ( + "strconv" + "strings" + kapi "k8s.io/kubernetes/pkg/api" ) @@ -23,3 +26,44 @@ type UpdateAcceptor interface { // Accept returns nil if the controller is okay, otherwise returns an error. Accept(*kapi.ReplicationController) error } + +type errConditionReached struct { + msg string +} + +func NewConditionReachedErr(msg string) error { + return &errConditionReached{msg: msg} +} + +func (e *errConditionReached) Error() string { + return e.msg +} + +func IsConditionReached(err error) bool { + value, ok := err.(*errConditionReached) + return ok && value != nil +} + +func PercentageBetween(until string, min, max int) bool { + if !strings.HasSuffix(until, "%") { + return false + } + until = until[:len(until)-1] + i, err := strconv.Atoi(until) + if err != nil { + return false + } + return i >= min && i <= max +} + +func Percentage(until string) (int, bool) { + if !strings.HasSuffix(until, "%") { + return 0, false + } + until = until[:len(until)-1] + i, err := strconv.Atoi(until) + if err != nil { + return 0, false + } + return i, true +} diff --git a/pkg/deploy/strategy/recreate/recreate.go b/pkg/deploy/strategy/recreate/recreate.go index 7196b7b8eab9..692a70f89056 100644 --- a/pkg/deploy/strategy/recreate/recreate.go +++ b/pkg/deploy/strategy/recreate/recreate.go @@ -2,11 +2,11 @@ package recreate import ( "fmt" + "io" + "io/ioutil" "os" "time" - "github.com/golang/glog" - kapi "k8s.io/kubernetes/pkg/api" kclient "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/kubectl" @@ -26,6 +26,10 @@ import ( // A failure to disable any existing deployments will be considered a // deployment failure. type RecreateDeploymentStrategy struct { + // out and errOut control where output is sent during the strategy + out, errOut io.Writer + // until is a condition that, if reached, will cause the strategy to exit early + until string // getReplicationController knows how to get a replication controller. getReplicationController func(namespace, name string) (*kapi.ReplicationController, error) // getUpdateAcceptor returns an UpdateAcceptor to verify the first replica @@ -52,14 +56,23 @@ const AcceptorInterval = 1 * time.Second // NewRecreateDeploymentStrategy makes a RecreateDeploymentStrategy backed by // a real HookExecutor and client. -func NewRecreateDeploymentStrategy(client kclient.Interface, tagClient client.ImageStreamTagsNamespacer, decoder runtime.Decoder) *RecreateDeploymentStrategy { +func NewRecreateDeploymentStrategy(client kclient.Interface, tagClient client.ImageStreamTagsNamespacer, decoder runtime.Decoder, out, errOut io.Writer, until string) *RecreateDeploymentStrategy { + if out == nil { + out = ioutil.Discard + } + if errOut == nil { + errOut = ioutil.Discard + } scaler, _ := kubectl.ScalerFor(kapi.Kind("ReplicationController"), client) return &RecreateDeploymentStrategy{ + out: out, + errOut: errOut, + until: until, getReplicationController: func(namespace, name string) (*kapi.ReplicationController, error) { return client.ReplicationControllers(namespace).Get(name) }, getUpdateAcceptor: func(timeout time.Duration) strat.UpdateAcceptor { - return stratsupport.NewAcceptNewlyObservedReadyPods(client, timeout, AcceptorInterval) + return stratsupport.NewAcceptNewlyObservedReadyPods(out, client, timeout, AcceptorInterval) }, scaler: scaler, decoder: decoder, @@ -97,67 +110,96 @@ func (s *RecreateDeploymentStrategy) DeployWithAcceptor(from *kapi.ReplicationCo // Execute any pre-hook. if params != nil && params.Pre != nil { - if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix); err != nil { - return fmt.Errorf("Pre hook failed: %s", err) + if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix, "pre"); err != nil { + return fmt.Errorf("pre hook failed: %s", err) } - glog.Infof("Pre hook finished") + } + + if s.until == "pre" { + return strat.NewConditionReachedErr("pre hook succeeded") } // Scale down the from deployment. if from != nil { - glog.Infof("Scaling %s down to zero", deployutil.LabelForDeployment(from)) + fmt.Fprintf(s.out, "--> Scaling %s down to zero\n", from.Name) _, err := s.scaleAndWait(from, 0, retryParams, waitParams) if err != nil { - return fmt.Errorf("couldn't scale %s to 0: %v", deployutil.LabelForDeployment(from), err) + return fmt.Errorf("couldn't scale %s to 0: %v", from.Name, err) } } + if s.until == "0%" { + return strat.NewConditionReachedErr("Reached 0% (no running pods)") + } + if params != nil && params.Mid != nil { - if err := s.hookExecutor.Execute(params.Mid, to, deployapi.MidHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Mid, to, deployapi.MidHookPodSuffix, "mid"); err != nil { return fmt.Errorf("mid hook failed: %s", err) } - glog.Infof("Mid hook finished") } + if s.until == "mid" { + return strat.NewConditionReachedErr("mid hook succeeded") + } + + accepted := false + // Scale up the to deployment. if desiredReplicas > 0 { - // Scale up to 1 and validate the replica, - // aborting if the replica isn't acceptable. - glog.Infof("Scaling %s to 1 before performing acceptance check", deployutil.LabelForDeployment(to)) - updatedTo, err := s.scaleAndWait(to, 1, retryParams, waitParams) - if err != nil { - return fmt.Errorf("couldn't scale %s to 1: %v", deployutil.LabelForDeployment(to), err) - } - glog.Infof("Performing acceptance check of %s", deployutil.LabelForDeployment(to)) - if err := updateAcceptor.Accept(updatedTo); err != nil { - return fmt.Errorf("update acceptor rejected %s: %v", deployutil.LabelForDeployment(to), err) + if from != nil { + // Scale up to 1 and validate the replica, + // aborting if the replica isn't acceptable. + fmt.Fprintf(s.out, "--> Scaling %s to 1 before performing acceptance check\n", to.Name) + updatedTo, err := s.scaleAndWait(to, 1, retryParams, waitParams) + if err != nil { + return fmt.Errorf("couldn't scale %s to 1: %v", to.Name, err) + } + if err := updateAcceptor.Accept(updatedTo); err != nil { + return fmt.Errorf("update acceptor rejected %s: %v", to.Name, err) + } + accepted = true + to = updatedTo + + if strat.PercentageBetween(s.until, 1, 99) { + return strat.NewConditionReachedErr(fmt.Sprintf("Reached %s", s.until)) + } } - to = updatedTo // Complete the scale up. if to.Spec.Replicas != desiredReplicas { - glog.Infof("Scaling %s to %d", deployutil.LabelForDeployment(to), desiredReplicas) + fmt.Fprintf(s.out, "--> Scaling %s to %d\n", to.Name, desiredReplicas) updatedTo, err := s.scaleAndWait(to, desiredReplicas, retryParams, waitParams) if err != nil { - return fmt.Errorf("couldn't scale %s to %d: %v", deployutil.LabelForDeployment(to), desiredReplicas, err) + return fmt.Errorf("couldn't scale %s to %d: %v", to.Name, desiredReplicas, err) } to = updatedTo } + + if !accepted { + if err := updateAcceptor.Accept(to); err != nil { + return fmt.Errorf("update acceptor rejected %s: %v", to.Name, err) + } + } + } + + if (from == nil && strat.PercentageBetween(s.until, 1, 100)) || (from != nil && s.until == "100%") { + return strat.NewConditionReachedErr(fmt.Sprintf("Reached %s", s.until)) } // Execute any post-hook. if params != nil && params.Post != nil { - if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix, "post"); err != nil { return fmt.Errorf("post hook failed: %s", err) } - glog.Infof("Post hook finished") } - glog.Infof("Deployment %s successfully made active", to.Name) return nil } func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationController, replicas int, retry *kubectl.RetryParams, wait *kubectl.RetryParams) (*kapi.ReplicationController, error) { + if replicas == deployment.Spec.Replicas && replicas == deployment.Status.Replicas { + return deployment, nil + } if err := s.scaler.Scale(deployment.Namespace, deployment.Name, uint(replicas), &kubectl.ScalePrecondition{Size: -1, ResourceVersion: ""}, retry, wait); err != nil { return nil, err } @@ -170,15 +212,15 @@ func (s *RecreateDeploymentStrategy) scaleAndWait(deployment *kapi.ReplicationCo // hookExecutor knows how to execute a deployment lifecycle hook. type hookExecutor interface { - Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error + Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error } // hookExecutorImpl is a pluggable hookExecutor. type hookExecutorImpl struct { - executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error + executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error } // Execute executes the provided lifecycle hook -func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { - return i.executeFunc(hook, deployment, label) +func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { + return i.executeFunc(hook, deployment, suffix, label) } diff --git a/pkg/deploy/strategy/recreate/recreate_test.go b/pkg/deploy/strategy/recreate/recreate_test.go index 3a4d7c683452..7bb6b7f2f861 100644 --- a/pkg/deploy/strategy/recreate/recreate_test.go +++ b/pkg/deploy/strategy/recreate/recreate_test.go @@ -1,6 +1,7 @@ package recreate import ( + "bytes" "fmt" "testing" "time" @@ -20,8 +21,9 @@ import ( func TestRecreate_initialDeployment(t *testing.T) { var deployment *kapi.ReplicationController scaler := &scalertest.FakeScaler{} - strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -40,13 +42,10 @@ func TestRecreate_initialDeployment(t *testing.T) { t.Fatalf("unexpected deploy error: %#v", err) } - if e, a := 2, len(scaler.Events); e != a { + if e, a := 1, len(scaler.Events); e != a { t.Fatalf("expected %d scale calls, got %d", e, a) } - if e, a := uint(1), scaler.Events[0].Size; e != a { - t.Errorf("expected scale up to %d, got %d", e, a) - } - if e, a := uint(3), scaler.Events[1].Size; e != a { + if e, a := uint(3), scaler.Events[0].Size; e != a { t.Errorf("expected scale up to %d, got %d", e, a) } } @@ -59,6 +58,8 @@ func TestRecreate_deploymentPreHookSuccess(t *testing.T) { hookExecuted := false strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -67,7 +68,7 @@ func TestRecreate_deploymentPreHookSuccess(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { hookExecuted = true return nil }, @@ -91,6 +92,8 @@ func TestRecreate_deploymentPreHookFail(t *testing.T) { scaler := &scalertest.FakeScaler{} strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -99,7 +102,7 @@ func TestRecreate_deploymentPreHookFail(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { return fmt.Errorf("hook execution failure") }, }, @@ -123,6 +126,8 @@ func TestRecreate_deploymentMidHookSuccess(t *testing.T) { hookExecuted := false strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -131,7 +136,7 @@ func TestRecreate_deploymentMidHookSuccess(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { hookExecuted = true return nil }, @@ -155,6 +160,8 @@ func TestRecreate_deploymentMidHookFail(t *testing.T) { scaler := &scalertest.FakeScaler{} strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -163,7 +170,7 @@ func TestRecreate_deploymentMidHookFail(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { return fmt.Errorf("hook execution failure") }, }, @@ -186,6 +193,8 @@ func TestRecreate_deploymentPostHookSuccess(t *testing.T) { hookExecuted := false strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -194,7 +203,7 @@ func TestRecreate_deploymentPostHookSuccess(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { hookExecuted = true return nil }, @@ -219,6 +228,8 @@ func TestRecreate_deploymentPostHookFail(t *testing.T) { hookExecuted := false strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -227,7 +238,7 @@ func TestRecreate_deploymentPostHookFail(t *testing.T) { }, getUpdateAcceptor: getUpdateAcceptor, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { hookExecuted = true return fmt.Errorf("post hook failure") }, @@ -249,6 +260,8 @@ func TestRecreate_acceptorSuccess(t *testing.T) { scaler := &scalertest.FakeScaler{} strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -266,8 +279,9 @@ func TestRecreate_acceptorSuccess(t *testing.T) { }, } - deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) - err := strategy.DeployWithAcceptor(nil, deployment, 2, acceptor) + oldDeployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) + deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(2), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) + err := strategy.DeployWithAcceptor(oldDeployment, deployment, 2, acceptor) if err != nil { t.Fatalf("unexpected deploy error: %#v", err) } @@ -280,7 +294,7 @@ func TestRecreate_acceptorSuccess(t *testing.T) { t.Fatalf("expected %d scale calls, got %d", e, a) } if e, a := uint(1), scaler.Events[0].Size; e != a { - t.Errorf("expected scale up to %d, got %d", e, a) + t.Errorf("expected scale down to %d, got %d", e, a) } if e, a := uint(2), scaler.Events[1].Size; e != a { t.Errorf("expected scale up to %d, got %d", e, a) @@ -292,6 +306,8 @@ func TestRecreate_acceptorFail(t *testing.T) { scaler := &scalertest.FakeScaler{} strategy := &RecreateDeploymentStrategy{ + out: &bytes.Buffer{}, + errOut: &bytes.Buffer{}, decoder: kapi.Codecs.UniversalDecoder(), retryTimeout: 1 * time.Second, retryPeriod: 1 * time.Millisecond, @@ -307,8 +323,9 @@ func TestRecreate_acceptorFail(t *testing.T) { }, } - deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) - err := strategy.DeployWithAcceptor(nil, deployment, 2, acceptor) + oldDeployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) + deployment, _ = deployutil.MakeDeployment(deploytest.OkDeploymentConfig(2), kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) + err := strategy.DeployWithAcceptor(oldDeployment, deployment, 2, acceptor) if err == nil { t.Fatalf("expected a deployment failure") } diff --git a/pkg/deploy/strategy/rolling/rolling.go b/pkg/deploy/strategy/rolling/rolling.go index 299e537588a5..690922e69c22 100644 --- a/pkg/deploy/strategy/rolling/rolling.go +++ b/pkg/deploy/strategy/rolling/rolling.go @@ -1,12 +1,13 @@ package rolling import ( + "bytes" "fmt" + "io" + "io/ioutil" "os" "time" - "github.com/golang/glog" - kapi "k8s.io/kubernetes/pkg/api" kerrors "k8s.io/kubernetes/pkg/api/errors" kclient "k8s.io/kubernetes/pkg/client/unversioned" @@ -43,6 +44,10 @@ const DefaultApiRetryTimeout = 10 * time.Second // [1] https://github.com/kubernetes/kubernetes/pull/7183 // [2] https://github.com/kubernetes/kubernetes/issues/7851 type RollingDeploymentStrategy struct { + // out and errOut control where output is sent during the strategy + out, errOut io.Writer + // until is a condition that, if reached, will cause the strategy to exit early + until string // initialStrategy is used when there are no prior deployments. initialStrategy acceptingDeploymentStrategy // client is used to deal with ReplicationControllers. @@ -78,8 +83,17 @@ type acceptingDeploymentStrategy interface { const AcceptorInterval = 1 * time.Second // NewRollingDeploymentStrategy makes a new RollingDeploymentStrategy. -func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, tags client.ImageStreamTagsNamespacer, decoder runtime.Decoder, initialStrategy acceptingDeploymentStrategy) *RollingDeploymentStrategy { +func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, tags client.ImageStreamTagsNamespacer, decoder runtime.Decoder, initialStrategy acceptingDeploymentStrategy, out, errOut io.Writer, until string) *RollingDeploymentStrategy { + if out == nil { + out = ioutil.Discard + } + if errOut == nil { + errOut = ioutil.Discard + } return &RollingDeploymentStrategy{ + out: out, + errOut: errOut, + until: until, decoder: decoder, initialStrategy: initialStrategy, client: client, @@ -92,7 +106,7 @@ func NewRollingDeploymentStrategy(namespace string, client kclient.Interface, ta }, hookExecutor: stratsupport.NewHookExecutor(client, tags, os.Stdout, decoder), getUpdateAcceptor: func(timeout time.Duration) strat.UpdateAcceptor { - return stratsupport.NewAcceptNewlyObservedReadyPods(client, timeout, AcceptorInterval) + return stratsupport.NewAcceptNewlyObservedReadyPods(out, client, timeout, AcceptorInterval) }, } } @@ -114,10 +128,9 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to if from == nil { // Execute any pre-hook. if params.Pre != nil { - if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix, "pre"); err != nil { return fmt.Errorf("Pre hook failed: %s", err) } - glog.Infof("Pre hook finished") } // Execute the delegate strategy. @@ -128,10 +141,9 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to // Execute any post-hook. Errors are logged and ignored. if params.Post != nil { - if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix, "post"); err != nil { return fmt.Errorf("post hook failed: %s", err) } - glog.Infof("Post hook finished") } // All done. @@ -141,10 +153,17 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to // Prepare for a rolling update. // Execute any pre-hook. if params.Pre != nil { - if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Pre, to, deployapi.PreHookPodSuffix, "pre"); err != nil { return fmt.Errorf("pre hook failed: %s", err) } - glog.Infof("Pre hook finished") + } + + if s.until == "pre" { + return strat.NewConditionReachedErr("pre hook succeeded") + } + + if s.until == "0%" { + return strat.NewConditionReachedErr("Reached 0% (before rollout)") } // HACK: Assign the source ID annotation that the rolling updater expects, @@ -155,23 +174,23 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to err = wait.Poll(s.apiRetryPeriod, s.apiRetryTimeout, func() (done bool, err error) { existing, err := s.client.ReplicationControllers(to.Namespace).Get(to.Name) if err != nil { - msg := fmt.Sprintf("couldn't look up deployment %s: %s", deployutil.LabelForDeployment(to), err) + msg := fmt.Sprintf("couldn't look up deployment %s: %s", to.Name, err) if kerrors.IsNotFound(err) { return false, fmt.Errorf("%s", msg) } // Try again. - glog.Infof(msg) + fmt.Fprintln(s.errOut, "error:", msg) return false, nil } if _, hasSourceId := existing.Annotations[sourceIdAnnotation]; !hasSourceId { existing.Annotations[sourceIdAnnotation] = fmt.Sprintf("%s:%s", from.Name, from.ObjectMeta.UID) if _, err := s.client.ReplicationControllers(existing.Namespace).Update(existing); err != nil { - msg := fmt.Sprintf("couldn't assign source annotation to deployment %s: %v", deployutil.LabelForDeployment(existing), err) + msg := fmt.Sprintf("couldn't assign source annotation to deployment %s: %v", existing.Name, err) if kerrors.IsNotFound(err) { return false, fmt.Errorf("%s", msg) } // Try again. - glog.Infof(msg) + fmt.Fprintln(s.errOut, "error:", msg) return false, nil } } @@ -196,7 +215,7 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to // Perform a rolling update. rollingConfig := &kubectl.RollingUpdaterConfig{ - Out: &rollingUpdaterWriter{}, + Out: &rollingUpdaterWriter{w: s.out}, OldRc: from, NewRc: to, UpdatePeriod: time.Duration(*params.UpdatePeriodSeconds) * time.Second, @@ -205,41 +224,62 @@ func (s *RollingDeploymentStrategy) Deploy(from *kapi.ReplicationController, to CleanupPolicy: kubectl.PreserveRollingUpdateCleanupPolicy, MaxSurge: params.MaxSurge, MaxUnavailable: params.MaxUnavailable, + OnProgress: func(oldRc, newRc *kapi.ReplicationController, percentage int) error { + if expect, ok := strat.Percentage(s.until); ok && percentage >= expect { + return strat.NewConditionReachedErr(fmt.Sprintf("Reached %s (currently %d%%)", s.until, percentage)) + } + return nil + }, } - err = s.rollingUpdate(rollingConfig) - if err != nil { + if err := s.rollingUpdate(rollingConfig); err != nil { return err } // Execute any post-hook. if params.Post != nil { - if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix); err != nil { + if err := s.hookExecutor.Execute(params.Post, to, deployapi.PostHookPodSuffix, "post"); err != nil { return fmt.Errorf("post hook failed: %s", err) } - glog.Info("Post hook finished") } return nil } // rollingUpdaterWriter is an io.Writer that delegates to glog. -type rollingUpdaterWriter struct{} +type rollingUpdaterWriter struct { + w io.Writer + called bool +} func (w *rollingUpdaterWriter) Write(p []byte) (n int, err error) { - glog.Info(fmt.Sprintf("RollingUpdater: %s", p)) - return len(p), nil + n = len(p) + if bytes.HasPrefix(p, []byte("Continuing update with ")) { + return n, nil + } + if bytes.HasSuffix(p, []byte("\n")) { + p = p[:len(p)-1] + } + for _, line := range bytes.Split(p, []byte("\n")) { + if w.called { + fmt.Fprintln(w.w, " ", string(line)) + } else { + w.called = true + fmt.Fprintln(w.w, "-->", string(line)) + } + } + return n, nil } // hookExecutor knows how to execute a deployment lifecycle hook. type hookExecutor interface { - Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error + Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error } // hookExecutorImpl is a pluggable hookExecutor. type hookExecutorImpl struct { - executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error + executeFunc func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error } // Execute executes the provided lifecycle hook -func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { - return i.executeFunc(hook, deployment, label) +func (i *hookExecutorImpl) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { + return i.executeFunc(hook, deployment, suffix, label) } diff --git a/pkg/deploy/strategy/rolling/rolling_test.go b/pkg/deploy/strategy/rolling/rolling_test.go index d2d730527624..2e9e5c32ba7b 100644 --- a/pkg/deploy/strategy/rolling/rolling_test.go +++ b/pkg/deploy/strategy/rolling/rolling_test.go @@ -1,6 +1,7 @@ package rolling import ( + "bytes" "fmt" "testing" "time" @@ -43,6 +44,7 @@ func TestRolling_deployInitial(t *testing.T) { config := deploytest.OkDeploymentConfig(1) config.Spec.Strategy = deploytest.OkRollingStrategy() deployment, _ := deployutil.MakeDeployment(config, kapi.Codecs.LegacyCodec(registered.GroupOrDie(kapi.GroupName).GroupVersions[0])) + strategy.out, strategy.errOut = &bytes.Buffer{}, &bytes.Buffer{} err := strategy.Deploy(nil, deployment, 2) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -96,6 +98,7 @@ func TestRolling_deployRolling(t *testing.T) { apiRetryTimeout: 10 * time.Millisecond, } + strategy.out, strategy.errOut = &bytes.Buffer{}, &bytes.Buffer{} err := strategy.Deploy(latest, deployment, 2) if err != nil { t.Fatalf("unexpected error: %v", err) @@ -171,7 +174,7 @@ func TestRolling_deployRollingHooks(t *testing.T) { return nil }, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { return hookError }, }, @@ -200,6 +203,7 @@ func TestRolling_deployRollingHooks(t *testing.T) { if tc.hookShouldFail { hookError = fmt.Errorf("hook failure") } + strategy.out, strategy.errOut = &bytes.Buffer{}, &bytes.Buffer{} err := strategy.Deploy(latest, deployment, 2) if err != nil && tc.deploymentShouldFail { t.Logf("got expected error: %v", err) @@ -230,7 +234,7 @@ func TestRolling_deployInitialHooks(t *testing.T) { return nil }, hookExecutor: &hookExecutorImpl{ - executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { + executeFunc: func(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { return hookError }, }, @@ -258,6 +262,7 @@ func TestRolling_deployInitialHooks(t *testing.T) { if tc.hookShouldFail { hookError = fmt.Errorf("hook failure") } + strategy.out, strategy.errOut = &bytes.Buffer{}, &bytes.Buffer{} err := strategy.Deploy(nil, deployment, 2) if err != nil && tc.deploymentShouldFail { t.Logf("got expected error: %v", err) diff --git a/pkg/deploy/strategy/support/lifecycle.go b/pkg/deploy/strategy/support/lifecycle.go index 958956629356..2e67c6dc41cc 100644 --- a/pkg/deploy/strategy/support/lifecycle.go +++ b/pkg/deploy/strategy/support/lifecycle.go @@ -36,8 +36,8 @@ type HookExecutor struct { podClient HookExecutorPodClient // tags allows setting image stream tags tags client.ImageStreamTagsNamespacer - // podLogDestination is where hook pod logs should be written to. - podLogDestination io.Writer + // out is where hook pod logs should be written to. + out io.Writer // podLogStream provides a reader for a pod's logs. podLogStream func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) // decoder is used for encoding/decoding. @@ -45,7 +45,7 @@ type HookExecutor struct { } // NewHookExecutor makes a HookExecutor from a client. -func NewHookExecutor(client kclient.PodsNamespacer, tags client.ImageStreamTagsNamespacer, podLogDestination io.Writer, decoder runtime.Decoder) *HookExecutor { +func NewHookExecutor(client kclient.PodsNamespacer, tags client.ImageStreamTagsNamespacer, out io.Writer, decoder runtime.Decoder) *HookExecutor { return &HookExecutor{ tags: tags, podClient: &HookExecutorPodClientImpl{ @@ -59,20 +59,20 @@ func NewHookExecutor(client kclient.PodsNamespacer, tags client.ImageStreamTagsN podLogStream: func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) { return client.Pods(namespace).GetLogs(name, opts).Stream() }, - podLogDestination: podLogDestination, - decoder: decoder, + out: out, + decoder: decoder, } } -// Execute executes hook in the context of deployment. The label is used to +// Execute executes hook in the context of deployment. The suffix is used to // distinguish the kind of hook (e.g. pre, post). -func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { +func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { var err error switch { case len(hook.TagImages) > 0: - err = e.tagImages(hook, deployment, label) + err = e.tagImages(hook, deployment, suffix, label) case hook.ExecNewPod != nil: - err = e.executeExecNewPod(hook, deployment, label) + err = e.executeExecNewPod(hook, deployment, suffix, label) } if err == nil { @@ -82,9 +82,9 @@ func (e *HookExecutor) Execute(hook *deployapi.LifecycleHook, deployment *kapi.R // Retry failures are treated the same as Abort. switch hook.FailurePolicy { case deployapi.LifecycleHookFailurePolicyAbort, deployapi.LifecycleHookFailurePolicyRetry: - return fmt.Errorf("Hook failed, aborting: %s", err) + return fmt.Errorf("%s hook failed, aborting deployment: %s", label, err) case deployapi.LifecycleHookFailurePolicyIgnore: - glog.Infof("Hook failed, ignoring: %s", err) + fmt.Fprintf(e.out, "warning: %s hook failed, deployment will continue: %v\n", label, err) return nil default: return err @@ -104,7 +104,7 @@ func findContainerImage(rc *kapi.ReplicationController, containerName string) (s } // tagImages tags images from the deployment -func (e *HookExecutor) tagImages(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { +func (e *HookExecutor) tagImages(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { var errs []error for _, action := range hook.TagImages { value, ok := findContainerImage(deployment, action.ContainerName) @@ -131,7 +131,7 @@ func (e *HookExecutor) tagImages(hook *deployapi.LifecycleHook, deployment *kapi errs = append(errs, err) continue } - glog.Infof("Tagged %q into %s/%s", value, action.To.Namespace, action.To.Name) + fmt.Fprintf(e.out, "--> %s: Tagged %q into %s/%s\n", label, value, action.To.Namespace, action.To.Name) } return utilerrors.NewAggregate(errs) @@ -146,26 +146,34 @@ func (e *HookExecutor) tagImages(hook *deployapi.LifecycleHook, deployment *kapi // * Environment (hook keys take precedence) // * Working directory // * Resources -func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, label string) error { +func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, suffix, label string) error { config, err := deployutil.DecodeDeploymentConfig(deployment, e.decoder) if err != nil { return err } // Build a pod spec from the hook config and deployment - podSpec, err := makeHookPod(hook, deployment, &config.Spec.Strategy, label) + podSpec, err := makeHookPod(hook, deployment, &config.Spec.Strategy, suffix) if err != nil { return err } + // Track whether the pod has already run to completion and avoid showing logs + // or the Success message twice. + completed, created := false, false + // Try to create the pod. pod, err := e.podClient.CreatePod(deployment.Namespace, podSpec) if err != nil { if !kerrors.IsAlreadyExists(err) { - return fmt.Errorf("couldn't create lifecycle pod for %s: %v", deployutil.LabelForDeployment(deployment), err) + return fmt.Errorf("couldn't create lifecycle pod for %s: %v", deployment.Name, err) } + completed = true + pod = podSpec + pod.Namespace = deployment.Namespace } else { - glog.V(0).Infof("Created lifecycle pod %s/%s for deployment %s", pod.Namespace, pod.Name, deployutil.LabelForDeployment(deployment)) + created = true + fmt.Fprintf(e.out, "--> %s: Running hook pod ...\n", label) } stopChannel := make(chan struct{}) @@ -178,16 +186,28 @@ func (e *HookExecutor) executeExecNewPod(hook *deployapi.LifecycleHook, deployme var once sync.Once wg := &sync.WaitGroup{} wg.Add(1) - glog.V(0).Infof("Watching logs for hook pod %s/%s while awaiting completion", pod.Namespace, pod.Name) waitLoop: for { updatedPod = nextPod() switch updatedPod.Status.Phase { case kapi.PodRunning: + completed = false go once.Do(func() { e.readPodLogs(pod, wg) }) case kapi.PodSucceeded, kapi.PodFailed: + if completed { + if updatedPod.Status.Phase == kapi.PodSucceeded { + fmt.Fprintf(e.out, "--> %s: Hook pod already succeeded\n", label) + } + wg.Done() + break waitLoop + } + if !created { + fmt.Fprintf(e.out, "--> %s: Hook pod is already running ...\n", label) + } go once.Do(func() { e.readPodLogs(pod, wg) }) break waitLoop + default: + completed = false } } // The pod is finished, wait for all logs to be consumed before returning. @@ -195,10 +215,15 @@ waitLoop: if updatedPod.Status.Phase == kapi.PodFailed { return fmt.Errorf(updatedPod.Status.Message) } + // Only show this message if we created the pod ourselves, or we saw + // the pod in a running or pending state. + if !completed { + fmt.Fprintf(e.out, "--> %s: Success\n", label) + } return nil } -// readPodLogs streams logs from pod to podLogDestination. It signals wg when +// readPodLogs streams logs from pod to out. It signals wg when // done. func (e *HookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) { defer wg.Done() @@ -209,21 +234,18 @@ func (e *HookExecutor) readPodLogs(pod *kapi.Pod, wg *sync.WaitGroup) { } logStream, err := e.podLogStream(pod.Namespace, pod.Name, opts) if err != nil || logStream == nil { - glog.V(0).Infof("Warning: couldn't get log stream for lifecycle pod %s/%s: %s", pod.Namespace, pod.Name, err) + fmt.Fprintf(e.out, "warning: Unable to retrieve hook logs from %s: %v\n", pod.Name, err) return } // Read logs. defer logStream.Close() - written, err := io.Copy(e.podLogDestination, logStream) - if err != nil { - glog.V(0).Infof("Finished reading logs for hook pod %s/%s (%d bytes): %s", pod.Namespace, pod.Name, written, err) - } else { - glog.V(0).Infof("Finished reading logs for hook pod %s/%s", pod.Namespace, pod.Name) + if _, err := io.Copy(e.out, logStream); err != nil { + fmt.Fprintf(e.out, "\nwarning: Unable to read all logs from %s, continuing: %v\n", pod.Name, err) } } // makeHookPod makes a pod spec from a hook and deployment. -func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, strategy *deployapi.DeploymentStrategy, label string) (*kapi.Pod, error) { +func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationController, strategy *deployapi.DeploymentStrategy, suffix string) (*kapi.Pod, error) { exec := hook.ExecNewPod var baseContainer *kapi.Container for _, container := range deployment.Spec.Template.Spec.Containers { @@ -298,12 +320,12 @@ func makeHookPod(hook *deployapi.LifecycleHook, deployment *kapi.ReplicationCont pod := &kapi.Pod{ ObjectMeta: kapi.ObjectMeta{ - Name: namer.GetPodName(deployment.Name, label), + Name: namer.GetPodName(deployment.Name, suffix), Annotations: map[string]string{ deployapi.DeploymentAnnotation: deployment.Name, }, Labels: map[string]string{ - deployapi.DeploymentPodTypeLabel: label, + deployapi.DeploymentPodTypeLabel: suffix, deployapi.DeployerPodForDeploymentLabel: deployment.Name, }, }, @@ -383,8 +405,9 @@ func NewPodWatch(client kclient.PodsNamespacer, namespace, name, resourceVersion // NewAcceptNewlyObservedReadyPods makes a new AcceptNewlyObservedReadyPods // from a real client. -func NewAcceptNewlyObservedReadyPods(kclient kclient.PodsNamespacer, timeout time.Duration, interval time.Duration) *AcceptNewlyObservedReadyPods { +func NewAcceptNewlyObservedReadyPods(out io.Writer, kclient kclient.PodsNamespacer, timeout time.Duration, interval time.Duration) *AcceptNewlyObservedReadyPods { return &AcceptNewlyObservedReadyPods{ + out: out, timeout: timeout, interval: interval, acceptedPods: sets.NewString(), @@ -423,6 +446,7 @@ func NewAcceptNewlyObservedReadyPods(kclient kclient.PodsNamespacer, timeout tim // Note that this struct is stateful and intended for use with a single // deployment and should be discarded and recreated between deployments. type AcceptNewlyObservedReadyPods struct { + out io.Writer // getDeploymentPodStore should return a Store containing all the pods for // the named deployment, and a channel to stop whatever process is feeding // the store. @@ -443,7 +467,11 @@ func (c *AcceptNewlyObservedReadyPods) Accept(deployment *kapi.ReplicationContro defer close(stopStore) // Start checking for pod updates. - glog.V(0).Infof("Waiting %.f seconds for pods owned by deployment %q to become ready (checking every %.f seconds; %d pods previously accepted)", c.timeout.Seconds(), deployutil.LabelForDeployment(deployment), c.interval.Seconds(), c.acceptedPods.Len()) + if c.acceptedPods.Len() > 0 { + fmt.Fprintf(c.out, "--> Waiting up to %s for pods in deployment %s to become ready (%d pods previously accepted)\n", c.timeout, deployment.Name, c.acceptedPods.Len()) + } else { + fmt.Fprintf(c.out, "--> Waiting up to %s for pods in deployment %s to become ready\n", c.timeout, deployment.Name) + } err := wait.Poll(c.interval, c.timeout, func() (done bool, err error) { // Check for pod readiness. unready := sets.NewString() @@ -464,20 +492,19 @@ func (c *AcceptNewlyObservedReadyPods) Accept(deployment *kapi.ReplicationContro } // Check to see if we're done. if unready.Len() == 0 { - glog.V(0).Infof("All pods ready for %s", deployutil.LabelForDeployment(deployment)) return true, nil } // Otherwise, try again later. - glog.V(4).Infof("Still waiting for %d pods to become ready for deployment %s", unready.Len(), deployutil.LabelForDeployment(deployment)) + glog.V(4).Infof("Still waiting for %d pods to become ready for deployment %s", unready.Len(), deployment.Name) return false, nil }) // Handle acceptance failure. if err != nil { if err == wait.ErrWaitTimeout { - return fmt.Errorf("pods for deployment %q took longer than %.f seconds to become ready", deployutil.LabelForDeployment(deployment), c.timeout.Seconds()) + return fmt.Errorf("pods for deployment %q took longer than %.f seconds to become ready", deployment.Name, c.timeout.Seconds()) } - return fmt.Errorf("pod readiness check failed for deployment %q: %v", deployutil.LabelForDeployment(deployment), err) + return fmt.Errorf("pod readiness check failed for deployment %q: %v", deployment.Name, err) } return nil } diff --git a/pkg/deploy/strategy/support/lifecycle_test.go b/pkg/deploy/strategy/support/lifecycle_test.go index ee084c2c19c4..b26a5d591d32 100644 --- a/pkg/deploy/strategy/support/lifecycle_test.go +++ b/pkg/deploy/strategy/support/lifecycle_test.go @@ -47,7 +47,7 @@ func TestHookExecutor_executeExecNewCreatePodFailure(t *testing.T) { decoder: kapi.Codecs.UniversalDecoder(), } - err := executor.executeExecNewPod(hook, deployment, "hook") + err := executor.executeExecNewPod(hook, deployment, "hook", "test") if err == nil { t.Fatalf("expected an error") @@ -80,20 +80,20 @@ func TestHookExecutor_executeExecNewPodSucceeded(t *testing.T) { return func() *kapi.Pod { return createdPod } }, }, - podLogDestination: podLogs, + out: podLogs, podLogStream: func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) { return ioutil.NopCloser(strings.NewReader("test")), nil }, decoder: kapi.Codecs.UniversalDecoder(), } - err := executor.executeExecNewPod(hook, deployment, "hook") + err := executor.executeExecNewPod(hook, deployment, "hook", "test") if err != nil { t.Fatalf("unexpected error: %s", err) } - if e, a := "test", podLogs.String(); e != a { + if e, a := "--> test: Running hook pod ...\ntest--> test: Success\n", podLogs.String(); e != a { t.Fatalf("expected pod logs to be %q, got %q", e, a) } @@ -132,14 +132,14 @@ func TestHookExecutor_executeExecNewPodFailed(t *testing.T) { return func() *kapi.Pod { return createdPod } }, }, - podLogDestination: ioutil.Discard, + out: ioutil.Discard, podLogStream: func(namespace, name string, opts *kapi.PodLogOptions) (io.ReadCloser, error) { return nil, fmt.Errorf("can't access logs") }, decoder: kapi.Codecs.UniversalDecoder(), } - err := executor.executeExecNewPod(hook, deployment, "hook") + err := executor.executeExecNewPod(hook, deployment, "hook", "test") if err == nil { t.Fatalf("expected an error, got none") @@ -516,6 +516,7 @@ func TestAcceptNewlyObservedReadyPods_scenarios(t *testing.T) { deployment, _ := deployutil.MakeDeployment(deploytest.OkDeploymentConfig(1), kapi.Codecs.LegacyCodec(deployapi.SchemeGroupVersion)) deployment.Spec.Replicas = 1 + acceptor.out = &bytes.Buffer{} err := acceptor.Accept(deployment) if s.accepted { diff --git a/pkg/deploy/util/util.go b/pkg/deploy/util/util.go index 4a5d37f85b54..c449d0e273b6 100644 --- a/pkg/deploy/util/util.go +++ b/pkg/deploy/util/util.go @@ -229,6 +229,9 @@ func MakeDeployment(config *deployapi.DeploymentConfig, codec runtime.Codec) (*a }, }, } + if value, ok := config.Annotations[deployapi.DeploymentIgnorePodAnnotation]; ok { + deployment.Annotations[deployapi.DeploymentIgnorePodAnnotation] = value + } return deployment, nil } diff --git a/test/extended/deployments/deployments.go b/test/extended/deployments/deployments.go index f4378db7db16..fc82b9898f15 100644 --- a/test/extended/deployments/deployments.go +++ b/test/extended/deployments/deployments.go @@ -27,6 +27,7 @@ var _ = g.Describe("deploymentconfigs", func() { var ( deploymentFixture = exutil.FixturePath("..", "extended", "fixtures", "test-deployment-test.yaml") simpleDeploymentFixture = exutil.FixturePath("..", "extended", "fixtures", "deployment-simple.yaml") + customDeploymentFixture = exutil.FixturePath("..", "extended", "fixtures", "custom-deployment.yaml") oc = exutil.NewCLI("cli-deployment", exutil.KubeConfigPath()) ) @@ -141,8 +142,8 @@ var _ = g.Describe("deploymentconfigs", func() { o.Expect(err).NotTo(o.HaveOccurred()) g.By(fmt.Sprintf("checking the logs for substrings\n%s", out)) o.Expect(out).To(o.ContainSubstring("deployment-test-1 to 2")) - o.Expect(out).To(o.ContainSubstring("Pre hook finished")) - o.Expect(out).To(o.ContainSubstring("Deployment deployment-test-1 successfully made active")) + o.Expect(out).To(o.ContainSubstring("--> pre: Success")) + o.Expect(out).To(o.ContainSubstring("--> Success")) g.By("verifying the deployment is marked complete and scaled to zero") o.Expect(waitForLatestCondition(oc, "deployment-test", deploymentRunTimeout, deploymentReachedCompletion)).NotTo(o.HaveOccurred()) @@ -177,13 +178,38 @@ var _ = g.Describe("deploymentconfigs", func() { o.Expect(err).NotTo(o.HaveOccurred()) g.By(fmt.Sprintf("checking the logs for substrings\n%s", out)) o.Expect(out).To(o.ContainSubstring(fmt.Sprintf("deployment-test-%d up to 1", i+2))) - o.Expect(out).To(o.ContainSubstring("Pre hook finished")) + o.Expect(out).To(o.ContainSubstring("--> pre: Success")) + o.Expect(out).To(o.ContainSubstring("test pre hook executed")) + o.Expect(out).To(o.ContainSubstring("--> Success")) g.By("verifying the deployment is marked complete and scaled to zero") o.Expect(waitForLatestCondition(oc, "deployment-test", deploymentRunTimeout, deploymentReachedCompletion)).NotTo(o.HaveOccurred()) } }) }) + + g.Describe("with custom deployments", func() { + g.It("should run the custom deployment steps [Conformance]", func() { + out, err := oc.Run("create").Args("-f", customDeploymentFixture).Output() + o.Expect(err).NotTo(o.HaveOccurred()) + + o.Expect(waitForLatestCondition(oc, "custom-deployment", deploymentRunTimeout, deploymentRunning)).NotTo(o.HaveOccurred()) + + out, err = oc.Run("logs").Args("-f", "dc/custom-deployment").Output() + o.Expect(err).NotTo(o.HaveOccurred()) + g.By(fmt.Sprintf("checking the logs for substrings\n%s", out)) + o.Expect(out).To(o.ContainSubstring("--> pre: Running hook pod ...")) + o.Expect(out).To(o.ContainSubstring("test pre hook executed")) + o.Expect(out).To(o.ContainSubstring("--> Scaling custom-deployment-1 to 2")) + o.Expect(out).To(o.ContainSubstring("--> Reached 50%")) + o.Expect(out).To(o.ContainSubstring("Halfway")) + o.Expect(out).To(o.ContainSubstring("Finished")) + o.Expect(out).To(o.ContainSubstring("--> Success")) + + g.By("verifying the deployment is marked complete") + o.Expect(waitForLatestCondition(oc, "custom-deployment", deploymentRunTimeout, deploymentReachedCompletion)).NotTo(o.HaveOccurred()) + }) + }) }) func deploymentStatuses(rcs []kapi.ReplicationController) []string { diff --git a/test/extended/fixtures/custom-deployment.yaml b/test/extended/fixtures/custom-deployment.yaml new file mode 100644 index 000000000000..d66493bad227 --- /dev/null +++ b/test/extended/fixtures/custom-deployment.yaml @@ -0,0 +1,43 @@ +apiVersion: v1 +kind: DeploymentConfig +metadata: + name: custom-deployment +spec: + replicas: 2 + selector: + name: custom-deployment + strategy: + type: Rolling + rollingParams: + pre: + failurePolicy: Abort + execNewPod: + containerName: myapp + command: + - /bin/echo + - test pre hook executed + customParams: + command: + - /bin/sh + - -c + - | + set -e + openshift-deploy --until=50% + echo Halfway + openshift-deploy + echo Finished + template: + metadata: + labels: + name: custom-deployment + spec: + terminationGracePeriodSeconds: 0 + containers: + - image: "docker.io/centos:centos7" + imagePullPolicy: IfNotPresent + name: myapp + command: + - /bin/sleep + - "100" + triggers: + - type: ConfigChange diff --git a/test/extended/fixtures/test-deployment-test.yaml b/test/extended/fixtures/test-deployment-test.yaml index e0fd23533276..bde53b721883 100644 --- a/test/extended/fixtures/test-deployment-test.yaml +++ b/test/extended/fixtures/test-deployment-test.yaml @@ -21,6 +21,7 @@ spec: labels: name: deployment-test spec: + terminationGracePeriodSeconds: 0 containers: - image: "docker.io/centos:centos7" imagePullPolicy: IfNotPresent diff --git a/test/integration/imagestream_test.go b/test/integration/imagestream_test.go index a3fe4ab9b7df..26e91e7c7a50 100644 --- a/test/integration/imagestream_test.go +++ b/test/integration/imagestream_test.go @@ -328,7 +328,7 @@ func TestImageStreamTagLifecycleHook(t *testing.T) { }, }, }, - "test", + "test", "test", ) if err != nil { t.Fatal(err) @@ -366,7 +366,7 @@ func TestImageStreamTagLifecycleHook(t *testing.T) { }, }, }, - "test", + "test", "test", ) if err != nil { t.Fatal(err) @@ -398,7 +398,7 @@ func TestImageStreamTagLifecycleHook(t *testing.T) { }, }, }, - "test", + "test", "test", ) if err != nil { t.Fatal(err)