Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make platformProviders works as piped deployment provider #3815

Merged
merged 9 commits into from Jul 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/app/piped/cmd/piped/piped.go
Expand Up @@ -116,7 +116,7 @@ func NewCommand() *cobra.Command {
cmd.Flags().IntVar(&p.adminPort, "admin-port", p.adminPort, "The port number used to run a HTTP server for admin tasks such as metrics, healthz.")

cmd.Flags().StringVar(&p.toolsDir, "tools-dir", p.toolsDir, "The path to directory where to install needed tools such as kubectl, helm, kustomize.")
cmd.Flags().BoolVar(&p.enableDefaultKubernetesCloudProvider, "enable-default-kubernetes-cloud-provider", p.enableDefaultKubernetesCloudProvider, "Whether the default kubernetes provider is enabled or not.")
cmd.Flags().BoolVar(&p.enableDefaultKubernetesCloudProvider, "enable-default-kubernetes-cloud-provider", p.enableDefaultKubernetesCloudProvider, "Whether the default kubernetes provider is enabled or not. This feature is deprecated.")
cmd.Flags().BoolVar(&p.addLoginUserToPasswd, "add-login-user-to-passwd", p.addLoginUserToPasswd, "Whether to add login user to $HOME/passwd. This is typically for applications running as a random user ID.")
cmd.Flags().DurationVar(&p.gracePeriod, "grace-period", p.gracePeriod, "How long to wait for graceful shutdown.")

Expand Down Expand Up @@ -542,7 +542,7 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) {
return nil, fmt.Errorf("wrong configuration kind for piped: %v", cfg.Kind)
}
if p.enableDefaultKubernetesCloudProvider {
cfg.PipedSpec.EnableDefaultKubernetesCloudProvider()
cfg.PipedSpec.EnableDefaultKubernetesPlatformProvider()
}
return cfg.PipedSpec, nil
}
Expand Down Expand Up @@ -638,17 +638,17 @@ func (p *piped) sendPipedMeta(ctx context.Context, client pipedservice.Client, c

var (
req = &pipedservice.ReportPipedMetaRequest{
Version: version.Get().Version,
Config: string(maskedCfg),
Repositories: repos,
CloudProviders: make([]*model.Piped_CloudProvider, 0, len(cfg.CloudProviders)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it ok to remove this field in gRPC?
As far as I see only CloudProvider is being used in the implementation of that RPC.
https://github.com/pipe-cd/pipecd/blob/master/pkg/app/server/grpcapi/piped_api.go#L167-L194

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch, the current implementation pass from the test because I updated the ReportPipedMetaRequest previously (in here).

The API update for that is included by another PR

--- a/pkg/app/server/grpcapi/piped_api.go
+++ b/pkg/app/server/grpcapi/piped_api.go
@@ -69,7 +69,7 @@ type pipedApiDeploymentChainStore interface {
 
 type pipedApiPipedStore interface {
        Get(ctx context.Context, id string) (*model.Piped, error)
-       UpdateMetadata(ctx context.Context, id, version, config string, cps []*model.Piped_CloudProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
+       UpdateMetadata(ctx context.Context, id, version, config string, pps []*model.Piped_PlatformProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
 }
 
 type pipedApiEventStore interface {
@@ -176,7 +176,7 @@ func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.Report
                pipedID,
                req.Version,
                req.Config,
-               req.CloudProviders,
+               req.PlatformProviders,
                req.Repositories,
                req.SecretEncryption,
                now,

but you're right, maybe we should include the change for that in this PR as well 🤔

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should include the change for that in this PR as well

Yes, that is better. Or the PR for that API change should be merged before this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, lets me revise this. Thank you 🙏

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by 0272ba2

Version: version.Get().Version,
Config: string(maskedCfg),
Repositories: repos,
PlatformProviders: make([]*model.Piped_PlatformProvider, 0, len(cfg.PlatformProviders)),
}
retry = pipedservice.NewRetry(5)
)

// Configure the list of specified cloud providers.
for _, cp := range cfg.CloudProviders {
req.CloudProviders = append(req.CloudProviders, &model.Piped_CloudProvider{
// Configure the list of specified platform providers.
for _, cp := range cfg.PlatformProviders {
req.PlatformProviders = append(req.PlatformProviders, &model.Piped_PlatformProvider{
Name: cp.Name,
Type: cp.Type.String(),
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/controller/scheduler.go
Expand Up @@ -434,8 +434,8 @@ func (s *scheduler) executeStage(sig executor.StopSignal, ps model.PipelineStage
}

// Check the existence of the specified cloud provider.
if !s.pipedConfig.HasCloudProvider(s.deployment.CloudProvider, s.deployment.Kind) {
lp.Errorf("This piped is not having the specified cloud provider in this deployment: %v", s.deployment.CloudProvider)
if !s.pipedConfig.HasPlatformProvider(s.deployment.PlatformProvider, s.deployment.Kind) {
lp.Errorf("This piped is not having the specified platform provider in this deployment: %v", s.deployment.PlatformProvider)
if err := s.reportStageStatus(ctx, ps.Id, model.StageStatus_STAGE_FAILURE, ps.Requires); err != nil {
s.logger.Error("failed to report stage status", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/driftdetector/detector.go
Expand Up @@ -87,14 +87,14 @@ func NewDetector(

d := &detector{
apiClient: apiClient,
detectors: make([]providerDetector, 0, len(cfg.CloudProviders)),
detectors: make([]providerDetector, 0, len(cfg.PlatformProviders)),
syncStates: make(map[string]model.ApplicationSyncState),
logger: logger.Named("drift-detector"),
}

const format = "unable to find live state getter for cloud provider: %s"
const format = "unable to find live state getter for platform provider: %s"

for _, cp := range cfg.CloudProviders {
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/cloudrun/cloudrun.go
Expand Up @@ -58,16 +58,16 @@ func loadServiceManifest(in *executor.Input, serviceManifestFile string, ds *dep
return sm, true
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderCloudRunConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderCloudRunConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Error("Missing the CloudProvider name in the application configuration")
in.LogPersister.Error("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_CLOUDRUN)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_CLOUDRUN)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/cloudrun/deploy.go
Expand Up @@ -52,7 +52,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
return model.StageStatus_STAGE_FAILURE
}

cpName, cpCfg, found := findCloudProvider(&e.Input)
cpName, cpCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/cloudrun/rollback.go
Expand Up @@ -34,7 +34,7 @@ func (e *rollbackExecutor) Execute(sig executor.StopSignal) model.StageStatus {
status model.StageStatus
)

cpName, cpCfg, found := findCloudProvider(&e.Input)
cpName, cpCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/ecs/deploy.go
Expand Up @@ -48,7 +48,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}

var found bool
e.cloudProviderName, e.cloudProviderCfg, found = findCloudProvider(&e.Input)
e.cloudProviderName, e.cloudProviderCfg, found = findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/ecs/ecs.go
Expand Up @@ -66,16 +66,16 @@ func Register(r registerer) {
})
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderECSConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderECSConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Errorf("Missing the CloudProvider name in the application configuration")
in.LogPersister.Errorf("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_ECS)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_ECS)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/ecs/rollback.go
Expand Up @@ -68,7 +68,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderName, cloudProviderCfg, found := findCloudProvider(&e.Input)
cloudProviderName, cloudProviderCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/executor/kubernetes/applier_group.go
Expand Up @@ -35,7 +35,7 @@ type applierGroup struct {
}

func newApplierGroup(defaultProvider string, appCfg config.KubernetesApplicationSpec, pipedCfg *config.PipedSpec, logger *zap.Logger) (*applierGroup, error) {
cp, ok := pipedCfg.FindCloudProvider(defaultProvider, model.ApplicationKind_KUBERNETES)
cp, ok := pipedCfg.FindPlatformProvider(defaultProvider, model.ApplicationKind_KUBERNETES)
if !ok {
return nil, fmt.Errorf("provider %s was not found", defaultProvider)
}
Expand All @@ -56,7 +56,7 @@ func newApplierGroup(defaultProvider string, appCfg config.KubernetesApplication
continue
}

cp, ok := pipedCfg.FindCloudProvider(r.Provider, model.ApplicationKind_KUBERNETES)
cp, ok := pipedCfg.FindPlatformProvider(r.Provider, model.ApplicationKind_KUBERNETES)
if !ok {
return nil, fmt.Errorf("provider %s specified in resourceRoutes was not found", r.Provider)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/kubernetes/kubernetes.go
Expand Up @@ -94,7 +94,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}
}

e.applierGetter, err = newApplierGroup(e.Deployment.CloudProvider, *e.appCfg, e.PipedConfig, e.Logger)
e.applierGetter, err = newApplierGroup(e.Deployment.PlatformProvider, *e.appCfg, e.PipedConfig, e.Logger)
if err != nil {
e.LogPersister.Error(err.Error())
return model.StageStatus_STAGE_FAILURE
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/kubernetes/rollback.go
Expand Up @@ -135,7 +135,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

ag, err := newApplierGroup(e.Deployment.CloudProvider, *appCfg, e.PipedConfig, e.Logger)
ag, err := newApplierGroup(e.Deployment.PlatformProvider, *appCfg, e.PipedConfig, e.Logger)
if err != nil {
e.LogPersister.Error(err.Error())
return model.StageStatus_STAGE_FAILURE
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/lambda/deploy.go
Expand Up @@ -53,7 +53,7 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
}

var found bool
e.cloudProviderName, e.cloudProviderCfg, found = findCloudProvider(&e.Input)
e.cloudProviderName, e.cloudProviderCfg, found = findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/lambda/lambda.go
Expand Up @@ -55,16 +55,16 @@ func Register(r registerer) {
})
}

func findCloudProvider(in *executor.Input) (name string, cfg *config.PlatformProviderLambdaConfig, found bool) {
name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (name string, cfg *config.PlatformProviderLambdaConfig, found bool) {
name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Errorf("Missing the CloudProvider name in the application configuration")
in.LogPersister.Errorf("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_LAMBDA)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_LAMBDA)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/executor/lambda/rollback.go
Expand Up @@ -65,7 +65,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderName, cloudProviderCfg, found := findCloudProvider(&e.Input)
cloudProviderName, cloudProviderCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/executor/terraform/deploy.go
Expand Up @@ -34,7 +34,7 @@ type deployExecutor struct {
}

func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
cloudProviderCfg, found := findCloudProvider(&e.Input)
providerCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand All @@ -55,8 +55,8 @@ func (e *deployExecutor) Execute(sig executor.StopSignal) model.StageStatus {
e.repoDir = ds.RepoDir
e.appDir = ds.AppDir

e.vars = make([]string, 0, len(cloudProviderCfg.Vars)+len(e.appCfg.Input.Vars))
e.vars = append(e.vars, cloudProviderCfg.Vars...)
e.vars = make([]string, 0, len(providerCfg.Vars)+len(e.appCfg.Input.Vars))
e.vars = append(e.vars, providerCfg.Vars...)
e.vars = append(e.vars, e.appCfg.Input.Vars...)

var (
Expand Down
6 changes: 3 additions & 3 deletions pkg/app/piped/executor/terraform/rollback.go
Expand Up @@ -52,7 +52,7 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

cloudProviderCfg, found := findCloudProvider(&e.Input)
providerCfg, found := findPlatformProvider(&e.Input)
if !found {
return model.StageStatus_STAGE_FAILURE
}
Expand All @@ -74,8 +74,8 @@ func (e *rollbackExecutor) ensureRollback(ctx context.Context) model.StageStatus
return model.StageStatus_STAGE_FAILURE
}

vars := make([]string, 0, len(cloudProviderCfg.Vars)+len(appCfg.Input.Vars))
vars = append(vars, cloudProviderCfg.Vars...)
vars := make([]string, 0, len(providerCfg.Vars)+len(appCfg.Input.Vars))
vars = append(vars, providerCfg.Vars...)
vars = append(vars, appCfg.Input.Vars...)

e.LogPersister.Infof("Start rolling back to the state defined at commit %s", e.Deployment.RunningCommitHash)
Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/executor/terraform/terraform.go
Expand Up @@ -80,16 +80,16 @@ func findTerraform(ctx context.Context, version string, lp executor.LogPersister
return path, true
}

func findCloudProvider(in *executor.Input) (cfg *config.PlatformProviderTerraformConfig, found bool) {
var name = in.Application.CloudProvider
func findPlatformProvider(in *executor.Input) (cfg *config.PlatformProviderTerraformConfig, found bool) {
var name = in.Application.PlatformProvider
if name == "" {
in.LogPersister.Error("Missing the CloudProvider name in the application configuration")
in.LogPersister.Error("Missing the PlatformProvider name in the application configuration")
return
}

cp, ok := in.PipedConfig.FindCloudProvider(name, model.ApplicationKind_TERRAFORM)
cp, ok := in.PipedConfig.FindPlatformProvider(name, model.ApplicationKind_TERRAFORM)
if !ok {
in.LogPersister.Errorf("The specified cloud provider %q was not found in piped configuration", name)
in.LogPersister.Errorf("The specified platform provider %q was not found in piped configuration", name)
return
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/app/piped/livestatereporter/reporter.go
Expand Up @@ -58,24 +58,24 @@ type providerReporter interface {

func NewReporter(appLister applicationLister, stateGetter livestatestore.Getter, apiClient apiClient, cfg *config.PipedSpec, logger *zap.Logger) Reporter {
r := &reporter{
reporters: make([]providerReporter, 0, len(cfg.CloudProviders)),
reporters: make([]providerReporter, 0, len(cfg.PlatformProviders)),
logger: logger.Named("live-state-reporter"),
}

for _, cp := range cfg.CloudProviders {
errFmt := fmt.Sprintf("unable to find live state getter for cloud provider: %s", cp.Name)
const errFmt = "unable to find live state getter for platform provider: %s"
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
sg, ok := stateGetter.KubernetesGetter(cp.Name)
if !ok {
r.logger.Error(errFmt)
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, kubernetes.NewReporter(cp, appLister, sg, apiClient, logger))
case model.PlatformProviderCloudRun:
sg, ok := stateGetter.CloudRunGetter(cp.Name)
if !ok {
r.logger.Error(errFmt)
r.logger.Error(fmt.Sprintf(errFmt, cp.Name))
continue
}
r.reporters = append(r.reporters, cloudrun.NewReporter(cp, appLister, sg, apiClient, logger))
Expand Down
2 changes: 1 addition & 1 deletion pkg/app/piped/livestatestore/livestatestore.go
Expand Up @@ -103,7 +103,7 @@ func NewStore(ctx context.Context, cfg *config.PipedSpec, appLister applicationL
gracePeriod: gracePeriod,
logger: logger,
}
for _, cp := range cfg.CloudProviders {
for _, cp := range cfg.PlatformProviders {
switch cp.Type {
case model.PlatformProviderKubernetes:
store := kubernetes.NewStore(cp.KubernetesConfig, cfg, cp.Name, logger)
Expand Down
4 changes: 2 additions & 2 deletions pkg/app/piped/planpreview/terraformdiff.go
Expand Up @@ -33,9 +33,9 @@ func (b *builder) terraformDiff(
buf *bytes.Buffer,
) (*diffResult, error) {

cp, ok := b.pipedCfg.FindCloudProvider(app.CloudProvider, model.ApplicationKind_TERRAFORM)
cp, ok := b.pipedCfg.FindPlatformProvider(app.PlatformProvider, model.ApplicationKind_TERRAFORM)
if !ok {
err := fmt.Errorf("cloud provider %s was not found in Piped config", app.CloudProvider)
err := fmt.Errorf("platform provider %s was not found in Piped config", app.PlatformProvider)
fmt.Fprintln(buf, err.Error())
return nil, err
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/app/server/grpcapi/piped_api.go
Expand Up @@ -69,7 +69,7 @@ type pipedApiDeploymentChainStore interface {

type pipedApiPipedStore interface {
Get(ctx context.Context, id string) (*model.Piped, error)
UpdateMetadata(ctx context.Context, id, version, config string, cps []*model.Piped_CloudProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
UpdateMetadata(ctx context.Context, id, version, config string, pps []*model.Piped_PlatformProvider, repos []*model.ApplicationGitRepository, se *model.Piped_SecretEncryption, startedAt int64) error
}

type pipedApiEventStore interface {
Expand Down Expand Up @@ -170,13 +170,22 @@ func (a *PipedAPI) ReportPipedMeta(ctx context.Context, req *pipedservice.Report
return nil, err
}

platformProviders := make([]*model.Piped_PlatformProvider, 0, len(req.CloudProviders)+len(req.PlatformProviders))
for _, cp := range req.CloudProviders {
platformProviders = append(platformProviders, &model.Piped_PlatformProvider{
Name: cp.Name,
Type: cp.Type,
})
}
platformProviders = append(platformProviders, req.PlatformProviders...)

now := time.Now().Unix()
if err = a.pipedStore.UpdateMetadata(
ctx,
pipedID,
req.Version,
req.Config,
req.CloudProviders,
khanhtc1202 marked this conversation as resolved.
Show resolved Hide resolved
platformProviders,
req.Repositories,
req.SecretEncryption,
now,
Expand Down