Skip to content

Commit

Permalink
Add TaskSetDefinition as required configuration for ECS deployment (#…
Browse files Browse the repository at this point in the history
…2021)

**What this PR does / why we need it**:

**Which issue(s) this PR fixes**:

Fixes #2011 

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
-->
```release-note
NONE
```

This PR was merged by Kapetanios.
  • Loading branch information
khanhtc1202 committed May 28, 2021
1 parent 0e48cee commit 6549919
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 34 deletions.
1 change: 1 addition & 0 deletions pkg/app/piped/cloudprovider/ecs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
"ecs.go",
"service.go",
"task.go",
"taskset.go",
],
importpath = "github.com/pipe-cd/pipe/pkg/app/piped/cloudprovider/ecs",
visibility = ["//visibility:public"],
Expand Down
9 changes: 4 additions & 5 deletions pkg/app/piped/cloudprovider/ecs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func (c *client) CreateService(ctx context.Context, service types.Service) (*typ
DeploymentConfiguration: service.DeploymentConfiguration,
EnableECSManagedTags: service.EnableECSManagedTags,
HealthCheckGracePeriodSeconds: service.HealthCheckGracePeriodSeconds,
LoadBalancers: service.LoadBalancers,
PlacementConstraints: service.PlacementConstraints,
PlacementStrategy: service.PlacementStrategy,
PlatformVersion: service.PlatformVersion,
Expand Down Expand Up @@ -134,7 +133,7 @@ func (c *client) RegisterTaskDefinition(ctx context.Context, taskDefinition type
return output.TaskDefinition, nil
}

func (c *client) CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition) (*types.TaskSet, error) {
func (c *client) CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, taskSet types.TaskSet) (*types.TaskSet, error) {
if taskDefinition.TaskDefinitionArn == nil {
return nil, fmt.Errorf("failed to create task set of task family %s: no task definition provided", *taskDefinition.Family)
}
Expand All @@ -146,9 +145,9 @@ func (c *client) CreateTaskSet(ctx context.Context, service types.Service, taskD
Scale: &types.Scale{Unit: types.ScaleUnitPercent, Value: float64(100)},
// If you specify the awsvpc network mode, the task is allocated an elastic network interface,
// and you must specify a NetworkConfiguration when run a task with the task definition.
// TODO: Find better way to get those 2 values instead of set it via service def.
NetworkConfiguration: service.NetworkConfiguration,
LaunchType: service.LaunchType,
NetworkConfiguration: taskSet.NetworkConfiguration,
LaunchType: taskSet.LaunchType,
LoadBalancers: taskSet.LoadBalancers,
}
output, err := c.client.CreateTaskSet(ctx, input)
if err != nil {
Expand Down
19 changes: 7 additions & 12 deletions pkg/app/piped/cloudprovider/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,14 @@ import (
"github.com/pipe-cd/pipe/pkg/config"
)

const (
defaultTaskDefinitionFilename = "taskdef.yaml"
defaultserviceDefinitionFilename = "servicedef.yaml"
)

// Client is wrapper of ECS client.
type Client interface {
ServiceExists(ctx context.Context, clusterName string, servicesName string) (bool, error)
CreateService(ctx context.Context, service types.Service) (*types.Service, error)
UpdateService(ctx context.Context, service types.Service) (*types.Service, error)
RegisterTaskDefinition(ctx context.Context, taskDefinition types.TaskDefinition) (*types.TaskDefinition, error)
GetPrimaryTaskSet(ctx context.Context, service types.Service) (*types.TaskSet, error)
CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition) (*types.TaskSet, error)
CreateTaskSet(ctx context.Context, service types.Service, taskDefinition types.TaskDefinition, taskSet types.TaskSet) (*types.TaskSet, error)
DeleteTaskSet(ctx context.Context, service types.Service, taskSet types.TaskSet) error
UpdateServicePrimaryTaskSet(ctx context.Context, service types.Service, taskSet types.TaskSet) (*types.TaskSet, error)
}
Expand All @@ -50,22 +45,22 @@ type Registry interface {

// LoadServiceDefinition returns ServiceDefinition object from a given service definition file.
func LoadServiceDefinition(appDir, serviceDefinitionFilename string) (types.Service, error) {
if serviceDefinitionFilename == "" {
serviceDefinitionFilename = defaultserviceDefinitionFilename
}
path := filepath.Join(appDir, serviceDefinitionFilename)
return loadServiceDefinition(path)
}

// LoadTaskDefinition returns TaskDefinition object from a given task definition file.
func LoadTaskDefinition(appDir, taskDefinition string) (types.TaskDefinition, error) {
if taskDefinition == "" {
taskDefinition = defaultTaskDefinitionFilename
}
path := filepath.Join(appDir, taskDefinition)
return loadTaskDefinition(path)
}

// LoadTaskSetDefinition returns TaskSetDefinition object from a given task set definition file.
func LoadTaskSetDefinition(appDir, taskSetDefinition string) (types.TaskSet, error) {
path := filepath.Join(appDir, taskSetDefinition)
return loadTaskSetDefinition(path)
}

type registry struct {
clients map[string]Client
mu sync.RWMutex
Expand Down
40 changes: 40 additions & 0 deletions pkg/app/piped/cloudprovider/ecs/taskset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 The PipeCD Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ecs

import (
"io/ioutil"

"sigs.k8s.io/yaml"

"github.com/aws/aws-sdk-go-v2/service/ecs/types"
)

func loadTaskSetDefinition(path string) (types.TaskSet, error) {
data, err := ioutil.ReadFile(path)
if err != nil {
return types.TaskSet{}, err
}
return parseTaskSetDefinition(data)
}

func parseTaskSetDefinition(data []byte) (types.TaskSet, error) {
var obj types.TaskSet
// TODO: Support loading TaskSetDefinition file with JSON format
if err := yaml.Unmarshal(data, &obj); err != nil {
return types.TaskSet{}, err
}
return obj, nil
}
6 changes: 5 additions & 1 deletion pkg/app/piped/executor/ecs/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,12 @@ func (e *deployExecutor) ensureSync(ctx context.Context) model.StageStatus {
if !ok {
return model.StageStatus_STAGE_FAILURE
}
taskSetDefinition, ok := loadTaskSetDefinition(&e.Input, e.deployCfg.Input.TaskSetDefinitionFile, e.deploySource)
if !ok {
return model.StageStatus_STAGE_FAILURE
}

if !sync(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, taskDefinition, servicedefinition) {
if !sync(ctx, &e.Input, e.cloudProviderName, e.cloudProviderCfg, taskDefinition, taskSetDefinition, servicedefinition) {
return model.StageStatus_STAGE_FAILURE
}

Expand Down
31 changes: 20 additions & 11 deletions pkg/app/piped/executor/ecs/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,32 @@ func loadServiceDefinition(in *executor.Input, serviceDefinitionFile string, ds
}

func loadTaskDefinition(in *executor.Input, taskDefinitionFile string, ds *deploysource.DeploySource) (types.TaskDefinition, bool) {
in.LogPersister.Infof("Loading service manifest at the %s commit (%s)", ds.RevisionName, ds.RevisionName)
in.LogPersister.Infof("Loading task definition manifest at the %s commit (%s)", ds.RevisionName, ds.RevisionName)

taskDefinition, err := provider.LoadTaskDefinition(ds.AppDir, taskDefinitionFile)
if err != nil {
in.LogPersister.Errorf("Failed to load ECS service definition (%v)", err)
in.LogPersister.Errorf("Failed to load ECS task definition (%v)", err)
return types.TaskDefinition{}, false
}

in.LogPersister.Infof("Successfully loaded the ECS service definition at the %s commit", ds.RevisionName)
in.LogPersister.Infof("Successfully loaded the ECS task definition at the %s commit", ds.RevisionName)
return taskDefinition, true
}

func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderECSConfig, taskDefinition types.TaskDefinition, serviceDefinition types.Service) bool {
func loadTaskSetDefinition(in *executor.Input, taskSetDefinitionFile string, ds *deploysource.DeploySource) (types.TaskSet, bool) {
in.LogPersister.Infof("Loading task set definition manifest at the %s commit (%s)", ds.RevisionName, ds.RevisionName)

taskSetDefinition, err := provider.LoadTaskSetDefinition(ds.AppDir, taskSetDefinitionFile)
if err != nil {
in.LogPersister.Errorf("Failed to load task set definition (%v)", err)
return types.TaskSet{}, false
}

in.LogPersister.Infof("Successfully loaded the ECS task set definition at the %s commit", ds.RevisionName)
return taskSetDefinition, true
}

func sync(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderECSConfig, taskDefinition types.TaskDefinition, taskSetDefinition types.TaskSet, serviceDefinition types.Service) bool {
in.LogPersister.Infof("Start applying the ECS task definition")
client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger)
if err != nil {
Expand All @@ -101,7 +114,7 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo
}

// Build and publish new version of ECS service and task definition.
ok := build(ctx, in, client, taskDefinition, serviceDefinition)
ok := build(ctx, in, client, taskDefinition, taskSetDefinition, serviceDefinition)
if !ok {
in.LogPersister.Errorf("Failed to build new version for ECS %s", *serviceDefinition.ServiceName)
return false
Expand All @@ -111,7 +124,7 @@ func sync(ctx context.Context, in *executor.Input, cloudProviderName string, clo
return true
}

func build(ctx context.Context, in *executor.Input, client provider.Client, taskDefinition types.TaskDefinition, serviceDefinition types.Service) bool {
func build(ctx context.Context, in *executor.Input, client provider.Client, taskDefinition types.TaskDefinition, taskSetDefinition types.TaskSet, serviceDefinition types.Service) bool {
td, err := client.RegisterTaskDefinition(ctx, taskDefinition)
if err != nil {
in.LogPersister.Errorf("Failed to register ECS task definition of family %s: %v", *taskDefinition.Family, err)
Expand Down Expand Up @@ -139,10 +152,6 @@ func build(ctx context.Context, in *executor.Input, client provider.Client, task
}
}

// hack: Set this two value to enable create TaskSet (those values are ignored on Update/Create Service).
service.LaunchType = serviceDefinition.LaunchType
service.NetworkConfiguration = serviceDefinition.NetworkConfiguration

// Get current PRIMARY task set.
prevPrimaryTaskSet, err := client.GetPrimaryTaskSet(ctx, *service)
// Ignore error in case it's not found error, the prevPrimaryTaskSet doesn't exist for newly created Service.
Expand All @@ -152,7 +161,7 @@ func build(ctx context.Context, in *executor.Input, client provider.Client, task
}

// Create a task set in the specified cluster and service.
taskSet, err := client.CreateTaskSet(ctx, *service, *td)
taskSet, err := client.CreateTaskSet(ctx, *service, *td, taskSetDefinition)
if err != nil {
in.LogPersister.Errorf("Failed to create ECS task set %s: %v", *serviceDefinition.ServiceName, err)
return false
Expand Down
10 changes: 7 additions & 3 deletions pkg/app/piped/executor/ecs/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,19 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
if !ok {
return model.StageStatus_STAGE_FAILURE
}
taskSetDefinition, ok := loadTaskSetDefinition(&e.Input, deployCfg.Input.TaskSetDefinitionFile, runningDS)
if !ok {
return model.StageStatus_STAGE_FAILURE
}

if !rollback(ctx, &e.Input, cloudProviderName, cloudProviderCfg, taskDefinition, serviceDefinition) {
if !rollback(ctx, &e.Input, cloudProviderName, cloudProviderCfg, taskDefinition, serviceDefinition, taskSetDefinition) {
return model.StageStatus_STAGE_FAILURE
}

return model.StageStatus_STAGE_SUCCESS
}

func rollback(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderECSConfig, taskDefinition types.TaskDefinition, serviceDefinition types.Service) bool {
func rollback(ctx context.Context, in *executor.Input, cloudProviderName string, cloudProviderCfg *config.CloudProviderECSConfig, taskDefinition types.TaskDefinition, serviceDefinition types.Service, taskSetDefinition types.TaskSet) bool {
in.LogPersister.Infof("Start rollback the ECS service and task family: %s and %s to original stage", *serviceDefinition.ServiceName, *taskDefinition.Family)
client, err := provider.DefaultRegistry().Client(cloudProviderName, cloudProviderCfg, in.Logger)
if err != nil {
Expand All @@ -107,7 +111,7 @@ func rollback(ctx context.Context, in *executor.Input, cloudProviderName string,
return false
}

if _, err := client.CreateTaskSet(ctx, serviceDefinition, taskDefinition); err != nil {
if _, err := client.CreateTaskSet(ctx, serviceDefinition, taskDefinition, taskSetDefinition); err != nil {
in.LogPersister.Errorf("Failed to create ECS task set %s: %v", *serviceDefinition.ServiceName, err)
return false
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/config/deployment_ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ func (s *ECSDeploymentSpec) Validate() error {
type ECSDeploymentInput struct {
// The name of service definition file placing in application directory.
// Default is servicedef.yaml
ServiceDefinitionFile string `json:"serviceDefinitionFile"`
ServiceDefinitionFile string `json:"serviceDefinitionFile" default:"servicedef.yaml"`
// The name of task definition file placing in application directory.
// Default is taskdef.yaml
TaskDefinitionFile string `json:"taskDefinitionFile"`
TaskDefinitionFile string `json:"taskDefinitionFile" default:"taskdef.yaml"`
// The name of task set definition file placing in application directory.
// Default is tasksetdef.yaml
TaskSetDefinitionFile string `json:"taskSetDefinitionFile" default:"tasksetdef.yaml"`
// Automatically reverts all changes from all stages when one of them failed.
// Default is true.
AutoRollback bool `json:"autoRollback" default:"true"`
Expand Down
1 change: 1 addition & 0 deletions pkg/config/deployment_ecs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestECSDeploymentConfig(t *testing.T) {
Input: ECSDeploymentInput{
ServiceDefinitionFile: "/path/to/servicedef.yaml",
TaskDefinitionFile: "/path/to/taskdef.yaml",
TaskSetDefinitionFile: "/path/to/tasksetdef.yaml",
AutoRollback: true,
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/config/testdata/application/ecs-app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ spec:
input:
serviceDefinitionFile: /path/to/servicedef.yaml
taskDefinitionFile: /path/to/taskdef.yaml
taskSetDefinitionFile: /path/to/tasksetdef.yaml

0 comments on commit 6549919

Please sign in to comment.