Skip to content

Commit

Permalink
Make platformProviders works as piped deployment provider (#3815)
Browse files Browse the repository at this point in the history
* Resolve conflict

* Remove PipedSpec.CloudProvider type and make all cloudProviders configuration load as platformProviders

* Update pkg/app/piped/cmd/piped/piped.go

Co-authored-by: nghialv <nghialv2607@gmail.com>

* Remove duplicate

* Fix error

* Piped api update

* Add cloudprovider

* Update unmarshalJSON logic

* Resolve conflict

Co-authored-by: nghialv <nghialv2607@gmail.com>
  • Loading branch information
khanhtc1202 and nghialv committed Jul 25, 2022
1 parent 8bb824a commit 0fb6e72
Show file tree
Hide file tree
Showing 27 changed files with 107 additions and 90 deletions.
18 changes: 9 additions & 9 deletions pkg/app/piped/cmd/piped/piped.go
Expand Up @@ -116,7 +116,7 @@ func NewCommand() *cobra.Command {
cmd.Flags().IntVar(&p.adminPort, "admin-port", p.adminPort, "The port number used to run a HTTP server for admin tasks such as metrics, healthz.")

cmd.Flags().StringVar(&p.toolsDir, "tools-dir", p.toolsDir, "The path to directory where to install needed tools such as kubectl, helm, kustomize.")
cmd.Flags().BoolVar(&p.enableDefaultKubernetesCloudProvider, "enable-default-kubernetes-cloud-provider", p.enableDefaultKubernetesCloudProvider, "Whether the default kubernetes provider is enabled or not.")
cmd.Flags().BoolVar(&p.enableDefaultKubernetesCloudProvider, "enable-default-kubernetes-cloud-provider", p.enableDefaultKubernetesCloudProvider, "Whether the default kubernetes provider is enabled or not. This feature is deprecated.")
cmd.Flags().BoolVar(&p.addLoginUserToPasswd, "add-login-user-to-passwd", p.addLoginUserToPasswd, "Whether to add login user to $HOME/passwd. This is typically for applications running as a random user ID.")
cmd.Flags().DurationVar(&p.gracePeriod, "grace-period", p.gracePeriod, "How long to wait for graceful shutdown.")

Expand Down Expand Up @@ -542,7 +542,7 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) {
return nil, fmt.Errorf("wrong configuration kind for piped: %v", cfg.Kind)
}
if p.enableDefaultKubernetesCloudProvider {
cfg.PipedSpec.EnableDefaultKubernetesCloudProvider()
cfg.PipedSpec.EnableDefaultKubernetesPlatformProvider()
}
return cfg.PipedSpec, nil
}
Expand Down Expand Up @@ -638,17 +638,17 @@ func (p *piped) sendPipedMeta(ctx context.Context, client pipedservice.Client, c

var (
req = &pipedservice.ReportPipedMetaRequest{
Version: version.Get().Version,
Config: string(maskedCfg),
Repositories: repos,
CloudProviders: make([]*model.Piped_CloudProvider, 0, len(cfg.CloudProviders)),
Version: version.Get().Version,
Config: string(maskedCfg),
Repositories: repos,
PlatformProviders: make([]*model.Piped_PlatformProvider, 0, len(cfg.PlatformProviders)),
}
retry = pipedservice.NewRetry(5)
)

// Configure the list of specified cloud providers.
for _, cp := range cfg.CloudProviders {
req.CloudProviders = append(req.CloudProviders, &model.Piped_CloudProvider{
// Configure the list of specified platform providers.
for _, cp := range cfg.PlatformProviders {
req.PlatformProviders = append(req.PlatformProviders, &model.Piped_PlatformProvider{
Name: cp.Name,
Type: cp.Type.String(),
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/controller/scheduler.go
Expand Up @@ -434,8 +434,8 @@ func (s *scheduler) executeStage(sig executor.StopSignal, ps model.PipelineStage
}

// Check the existence of the specified cloud provider.
if !s.pipedConfig.HasCloudProvider(s.deployment.CloudProvider, s.deployment.Kind) {
lp.Errorf("This piped is not having the specified cloud provider in this deployment: %v", s.deployment.CloudProvider)
if !s.pipedConfig.HasPlatformProvider(s.deployment.PlatformProvider, s.deployment.Kind) {
lp.Errorf("This piped is not having the specified platform provider in this deployment: %v", s.deployment.PlatformProvider)
if err := s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires); err != nil {
s.logger.Error("failed to report stage status", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/driftdetector/detector.go
Expand Up @@ -87,14 +87,14 @@ func NewDetector(

d := &detector{
apiClient: apiClient,
detectors: make([]providerDetector, 0, len(cfg.CloudProviders)),
detectors: make([]providerDetector, 0, len(cfg.PlatformProviders)),
syncStates: make(map[string]model.ApplicationSyncState),
logger: logger.Named("drift-detector"),
}

const format = "unable to find live state getter for cloud provider: %s"
const format = "unable to find live state getter for platform provider: %s"

for _, cp := range cfg.CloudProviders {
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/cloudrun/cloudrun.go
Expand Up @@ -58,16 +58,16 @@ func loadServiceManifest(in *executor.Input, serviceManifestFile string, ds *dep
return sm, true
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderCloudRunConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderCloudRunConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Error("Missing the CloudProvider name in the application configuration")
in.LogPersister.Error("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_CLOUDRUN)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_CLOUDRUN)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/cloudrun/deploy.go
Expand Up @@ -52,7 +52,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
return model.StageStatus_STAGE_FAILURE
}

cpName, cpCfg, found := findCloudProvider(&e.Input)
cpName, cpCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/cloudrun/rollback.go
Expand Up @@ -34,7 +34,7 @@ func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
status model.StageStatus
)

cpName, cpCfg, found := findCloudProvider(&e.Input)
cpName, cpCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/ecs/deploy.go
Expand Up @@ -48,7 +48,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}

var found bool
e.cloudProviderName, e.cloudProviderCfg, found = findCloudProvider(&e.Input)
e.cloudProviderName, e.cloudProviderCfg, found = findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/ecs/ecs.go
Expand Up @@ -66,16 +66,16 @@ func Register(r registerer) {
})
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderECSConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderECSConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Errorf("Missing the CloudProvider name in the application configuration")
in.LogPersister.Errorf("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_ECS)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_ECS)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/ecs/rollback.go
Expand Up @@ -68,7 +68,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderName, cloudProviderCfg, found := findCloudProvider(&e.Input)
cloudProviderName, cloudProviderCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/executor/kubernetes/applier_group.go
Expand Up @@ -35,7 +35,7 @@ type applierGroup struct {
}

func newApplierGroup(defaultProvider string, appCfg config.KubernetesApplicationSpec, pipedCfg *config.PipedSpec, logger *zap.Logger) (*applierGroup, error) {
cp, ok := pipedCfg.FindCloudProvider(defaultProvider, model.ApplicationKind_KUBERNETES)
cp, ok := pipedCfg.FindPlatformProvider(defaultProvider, model.ApplicationKind_KUBERNETES)
if !ok {
return nil, fmt.Errorf("provider %s was not found", defaultProvider)
}
Expand All @@ -56,7 +56,7 @@ func newApplierGroup(defaultProvider string, appCfg config.KubernetesApplication
continue
}

cp, ok := pipedCfg.FindCloudProvider(r.Provider, model.ApplicationKind_KUBERNETES)
cp, ok := pipedCfg.FindPlatformProvider(r.Provider, model.ApplicationKind_KUBERNETES)
if !ok {
return nil, fmt.Errorf("provider %s specified in resourceRoutes was not found", r.Provider)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/kubernetes/kubernetes.go
Expand Up @@ -94,7 +94,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}
}

e.applierGetter, err = newApplierGroup(e.Deployment.CloudProvider, *e.appCfg, e.PipedConfig, e.Logger)
e.applierGetter, err = newApplierGroup(e.Deployment.PlatformProvider, *e.appCfg, e.PipedConfig, e.Logger)
if err != nil {
e.LogPersister.Error(err.Error())
return model.StageStatus_STAGE_FAILURE
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/kubernetes/rollback.go
Expand Up @@ -135,7 +135,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

ag, err := newApplierGroup(e.Deployment.CloudProvider, *appCfg, e.PipedConfig, e.Logger)
ag, err := newApplierGroup(e.Deployment.PlatformProvider, *appCfg, e.PipedConfig, e.Logger)
if err != nil {
e.LogPersister.Error(err.Error())
return model.StageStatus_STAGE_FAILURE
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/lambda/deploy.go
Expand Up @@ -53,7 +53,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}

var found bool
e.cloudProviderName, e.cloudProviderCfg, found = findCloudProvider(&e.Input)
e.cloudProviderName, e.cloudProviderCfg, found = findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/lambda/lambda.go
Expand Up @@ -55,16 +55,16 @@ func Register(r registerer) {
})
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderLambdaConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderLambdaConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Errorf("Missing the CloudProvider name in the application configuration")
in.LogPersister.Errorf("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_LAMBDA)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_LAMBDA)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/lambda/rollback.go
Expand Up @@ -65,7 +65,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderName, cloudProviderCfg, found := findCloudProvider(&e.Input)
cloudProviderName, cloudProviderCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/executor/terraform/deploy.go
Expand Up @@ -34,7 +34,7 @@ type deployExecutor struct {
}

func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
cloudProviderCfg, found := findCloudProvider(&e.Input)
providerCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand All @@ -55,8 +55,8 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
e.repoDir = ds.RepoDir
e.appDir = ds.AppDir

e.vars = make([]string, 0, len(cloudProviderCfg.Vars)+len(e.appCfg.Input.Vars))
e.vars = append(e.vars, cloudProviderCfg.Vars...)
e.vars = make([]string, 0, len(providerCfg.Vars)+len(e.appCfg.Input.Vars))
e.vars = append(e.vars, providerCfg.Vars...)
e.vars = append(e.vars, e.appCfg.Input.Vars...)

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/executor/terraform/rollback.go
Expand Up @@ -52,7 +52,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderCfg, found := findCloudProvider(&e.Input)
providerCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand All @@ -74,8 +74,8 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

vars := make([]string, 0, len(cloudProviderCfg.Vars)+len(appCfg.Input.Vars))
vars = append(vars, cloudProviderCfg.Vars...)
vars := make([]string, 0, len(providerCfg.Vars)+len(appCfg.Input.Vars))
vars = append(vars, providerCfg.Vars...)
vars = append(vars, appCfg.Input.Vars...)

e.LogPersister.Infof("Start rolling back to the state defined at commit %s", e.Deployment.RunningCommitHash)
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/terraform/terraform.go
Expand Up @@ -80,16 +80,16 @@ func findTerraform(ctx context.Context, version string, lp executor.LogPersister
return path, true
}

func findCloudProvider(in *executor.Input) (cfg *config.PlatformProviderTerraformConfig, found bool) {
var name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (cfg *config.PlatformProviderTerraformConfig, found bool) {
var name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Error("Missing the CloudProvider name in the application configuration")
in.LogPersister.Error("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_TERRAFORM)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_TERRAFORM)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/livestatereporter/reporter.go
Expand Up @@ -58,24 +58,24 @@ type providerReporter interface {

func NewReporter(appLister applicationLister, stateGetter livestatestore.Getter, apiClient apiClient, cfg *config.PipedSpec, logger *zap.Logger) Reporter {
r := &reporter{
reporters: make([]providerReporter, 0, len(cfg.CloudProviders)),
reporters: make([]providerReporter, 0, len(cfg.PlatformProviders)),
logger: logger.Named("live-state-reporter"),
}

for _, cp := range cfg.CloudProviders {
errFmt := fmt.Sprintf("unable to find live state getter for cloud provider: %s", cp.Name)
const errFmt = "unable to find live state getter for platform provider: %s"
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
if !ok {
r.logger.Error(errFmt)
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, kubernetes.NewReporter(cp, appLister, sg, apiClient, logger))
case model.PlatformProviderCloudRun:
sg, ok := stateGetter.CloudRunGetter(cp.Name)
if !ok {
r.logger.Error(errFmt)
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, cloudrun.NewReporter(cp, appLister, sg, apiClient, logger))
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/livestatestore/livestatestore.go
Expand Up @@ -103,7 +103,7 @@ func NewStore(ctx context.Context, cfg *config.PipedSpec, appLister applicationL
gracePeriod: gracePeriod,
logger: logger,
}
for _, cp := range cfg.CloudProviders {
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
store := kubernetes.NewStore(cp.KubernetesConfig, cfg, cp.Name, logger)
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/planpreview/terraformdiff.go
Expand Up @@ -33,9 +33,9 @@ func (b *builder) terraformDiff(
buf *bytes.Buffer,
) (*diffResult, error) {

cp, ok := b.pipedCfg.FindCloudProvider(app.CloudProvider, model.ApplicationKind_TERRAFORM)
cp, ok := b.pipedCfg.FindPlatformProvider(app.PlatformProvider, model.ApplicationKind_TERRAFORM)
if !ok {
err := fmt.Errorf("cloud provider %s was not found in Piped config", app.CloudProvider)
err := fmt.Errorf("platform provider %s was not found in Piped config", app.PlatformProvider)
fmt.Fprintln(buf, err.Error())
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/app/server/grpcapi/piped_api.go
Expand Up @@ -69,7 +69,7 @@ type pipedApiDeploymentChainStore interface {

type pipedApiPipedStore interface {
Get(ctx context.Context, id string) (*model.Piped, error)
UpdateMetadata(ctx context.Context, id, version, config string, cps []*model.Piped_CloudProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
UpdateMetadata(ctx context.Context, id, version, config string, pps []*model.Piped_PlatformProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
}

type pipedApiEventStore interface {
Expand Down Expand Up @@ -170,13 +170,22 @@ func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.Report
return nil, err
}

platformProviders := make([]*model.Piped_PlatformProvider, 0, len(req.CloudProviders)+len(req.PlatformProviders))
for _, cp := range req.CloudProviders {
platformProviders = append(platformProviders, &model.Piped_PlatformProvider{
Name: cp.Name,
Type: cp.Type,
})
}
platformProviders = append(platformProviders, req.PlatformProviders...)

now := time.Now().Unix()
if err = a.pipedStore.UpdateMetadata(
ctx,
pipedID,
req.Version,
req.Config,
req.CloudProviders,
platformProviders,
req.Repositories,
req.SecretEncryption,
now,
Expand Down

0 comments on commit 0fb6e72

Please sign in to comment.