Skip to content

Commit

Permalink
Update controller to use new planner logic (#4825)
Browse files Browse the repository at this point in the history
* Update controller to use new planner logic

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>

* Update proto path

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>

* Fix typo

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>

* Fix typo

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>

* Update planner logic to call proto instead of self executing

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>

---------

Signed-off-by: khanhtc1202 <khanhtc1202@gmail.com>
  • Loading branch information
khanhtc1202 committed Mar 13, 2024
1 parent a98961d commit 85fbb16
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 84 deletions.
47 changes: 46 additions & 1 deletion pkg/app/pipedv1/cmd/piped/piped.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/piped/apistore/eventstore"
"github.com/pipe-cd/pipecd/pkg/app/piped/appconfigreporter"
"github.com/pipe-cd/pipecd/pkg/app/piped/chartrepo"
"github.com/pipe-cd/pipecd/pkg/app/piped/controller"
"github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/driftdetector"
"github.com/pipe-cd/pipecd/pkg/app/piped/eventwatcher"
Expand All @@ -63,6 +62,7 @@ import (
"github.com/pipe-cd/pipecd/pkg/app/piped/statsreporter"
"github.com/pipe-cd/pipecd/pkg/app/piped/toolregistry"
"github.com/pipe-cd/pipecd/pkg/app/piped/trigger"
"github.com/pipe-cd/pipecd/pkg/app/pipedv1/controller"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cache/memorycache"
"github.com/pipe-cd/pipecd/pkg/cli"
Expand Down Expand Up @@ -392,6 +392,12 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
})
}

cfgData, err := p.loadConfigByte(ctx)
if err != nil {
input.Logger.Error("failed to load piped configuration", zap.Error(err))
return err
}

// Start running deployment controller.
{
c := controller.NewController(
Expand All @@ -405,6 +411,7 @@ func (p *piped) run(ctx context.Context, input cli.Input) (runErr error) {
notifier,
decrypter,
cfg,
cfgData,
appManifestsCache,
p.gracePeriod,
input.Logger,
Expand Down Expand Up @@ -641,6 +648,44 @@ func (p *piped) loadConfig(ctx context.Context) (*config.PipedSpec, error) {
return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")
}

// loadConfig reads the Piped configuration data from the specified source.
func (p *piped) loadConfigByte(ctx context.Context) ([]byte, error) {
// HACK: When the version of cobra is updated to >=v1.8.0, this should be replaced with https://pkg.go.dev/github.com/spf13/cobra#Command.MarkFlagsMutuallyExclusive.
if err := p.hasTooManyConfigFlags(); err != nil {
return nil, err
}

if p.configFile != "" {
return os.ReadFile(p.configFile)
}

if p.configData != "" {
data, err := base64.StdEncoding.DecodeString(p.configData)
if err != nil {
return nil, fmt.Errorf("the given config-data isn't base64 encoded: %w", err)
}
return data, nil
}

if p.configGCPSecret != "" {
data, err := p.getConfigDataFromSecretManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load config from SecretManager (%w)", err)
}
return data, nil
}

if p.configAWSSecret != "" {
data, err := p.getConfigDataFromAWSSecretsManager(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load config from AWS Secrets Manager (%w)", err)
}
return data, nil
}

return nil, fmt.Errorf("one of config-file, config-gcp-secret or config-aws-secret must be set")
}

func (p *piped) initializeSecretDecrypter(cfg *config.PipedSpec) (crypto.Decrypter, error) {
sm := cfg.SecretManagement
if sm == nil {
Expand Down
23 changes: 16 additions & 7 deletions pkg/app/pipedv1/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,15 +106,17 @@ var (

type controller struct {
apiClient apiClient
pluginRegistry PluginRegistry
gitClient gitClient
deploymentLister deploymentLister
commandLister commandLister
applicationLister applicationLister
liveResourceLister liveResourceLister
analysisResultStore analysisResultStore
notifier notifier
secretDecrypter secretDecrypter
pipedConfig *config.PipedSpec
pipedConfig []byte
secretDecrypter secretDecrypter // TODO: Remove this
pipedCfg *config.PipedSpec // TODO: Remove this, use pipedConfig instead
appManifestsCache cache.Cache
logPersister logpersister.Persister

Expand Down Expand Up @@ -155,7 +157,8 @@ func NewController(
analysisResultStore analysisResultStore,
notifier notifier,
sd secretDecrypter,
pipedConfig *config.PipedSpec,
pipedCfg *config.PipedSpec,
pipedConfig []byte,
appManifestsCache cache.Cache,
gracePeriod time.Duration,
logger *zap.Logger,
Expand All @@ -167,6 +170,7 @@ func NewController(
)
return &controller{
apiClient: apiClient,
pluginRegistry: DefaultPluginRegistry(),
gitClient: gitClient,
deploymentLister: deploymentLister,
commandLister: commandLister,
Expand All @@ -176,6 +180,7 @@ func NewController(
notifier: notifier,
secretDecrypter: sd,
appManifestsCache: appManifestsCache,
pipedCfg: pipedCfg,
pipedConfig: pipedConfig,
logPersister: lp,

Expand Down Expand Up @@ -468,17 +473,21 @@ func (c *controller) startNewPlanner(ctx context.Context, d *model.Deployment) (
}
}

pluginClient, ok := c.pluginRegistry.Plugin(d.Kind)
if !ok {
logger.Error("no plugin client for the application kind", zap.String("kind", d.Kind.String()))
return nil, fmt.Errorf("no plugin client for the application kind %s", d.Kind.String())
}

planner := newPlanner(
d,
commitHash,
configFilename,
workingDir,
pluginClient,
c.apiClient,
c.gitClient,
c.notifier,
c.secretDecrypter,
c.pipedConfig,
c.appManifestsCache,
c.logger,
)

Expand Down Expand Up @@ -621,7 +630,7 @@ func (c *controller) startNewScheduler(ctx context.Context, d *model.Deployment)
c.logPersister,
c.notifier,
c.secretDecrypter,
c.pipedConfig,
c.pipedCfg,
c.appManifestsCache,
c.logger,
)
Expand Down
110 changes: 34 additions & 76 deletions pkg/app/pipedv1/controller/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,41 @@ import (
"context"
"encoding/json"
"fmt"
"path/filepath"
"time"

"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/pipe-cd/pipecd/pkg/app/piped/controller/controllermetrics"
"github.com/pipe-cd/pipecd/pkg/app/piped/deploysource"
"github.com/pipe-cd/pipecd/pkg/app/piped/metadatastore"
pln "github.com/pipe-cd/pipecd/pkg/app/piped/planner"
"github.com/pipe-cd/pipecd/pkg/app/piped/planner/registry"
"github.com/pipe-cd/pipecd/pkg/app/server/service/pipedservice"
"github.com/pipe-cd/pipecd/pkg/cache"
"github.com/pipe-cd/pipecd/pkg/config"
"github.com/pipe-cd/pipecd/pkg/model"
"github.com/pipe-cd/pipecd/pkg/regexpool"
"github.com/pipe-cd/pipecd/pkg/plugin/api/v1alpha1/platform"
)

// What planner does:
// - Wait until there is no PLANNED or RUNNING deployment
// - Pick the oldest PENDING deployment to plan its pipeline
// - Compare with the last successful commit
// - Decide the pipeline should be executed (scale, progressive, rollback)
// - Update the pipeline stages and change the deployment status to PLANNED
type planner struct {
// Readonly deployment model.
deployment *model.Deployment
lastSuccessfulCommitHash string
lastSuccessfulConfigFilename string
workingDir string
apiClient apiClient
gitClient gitClient
metadataStore metadatastore.MetadataStore
notifier notifier
secretDecrypter secretDecrypter
plannerRegistry registry.Registry
pipedConfig *config.PipedSpec
appManifestsCache cache.Cache
logger *zap.Logger
pipedConfig []byte

// The pluginClient is used to call pluggin that actually
// performs planning deployment.
pluginClient platform.PlatformPluginClient

// The apiClient is used to report the deployment status.
apiClient apiClient

// The notifier and metadataStore are used for
// notification features.
notifier notifier
metadataStore metadatastore.MetadataStore

// TODO: Find a way to show log from pluggin's planner
logger *zap.Logger

done atomic.Bool
doneTimestamp time.Time
Expand All @@ -72,12 +68,10 @@ func newPlanner(
lastSuccessfulCommitHash string,
lastSuccessfulConfigFilename string,
workingDir string,
pluginClient platform.PlatformPluginClient,
apiClient apiClient,
gitClient gitClient,
notifier notifier,
sd secretDecrypter,
pipedConfig *config.PipedSpec,
appManifestsCache cache.Cache,
pipedConfig []byte,
logger *zap.Logger,
) *planner {

Expand All @@ -94,14 +88,11 @@ func newPlanner(
lastSuccessfulCommitHash: lastSuccessfulCommitHash,
lastSuccessfulConfigFilename: lastSuccessfulConfigFilename,
workingDir: workingDir,
pluginClient: pluginClient,
apiClient: apiClient,
gitClient: gitClient,
metadataStore: metadatastore.NewMetadataStore(apiClient, d),
notifier: notifier,
secretDecrypter: sd,
pipedConfig: pipedConfig,
plannerRegistry: registry.DefaultRegistry(),
appManifestsCache: appManifestsCache,
doneDeploymentStatus: d.Status,
cancelledCh: make(chan *model.ReportableCommand, 1),
nowFunc: time.Now,
Expand Down Expand Up @@ -142,6 +133,11 @@ func (p *planner) Cancel(cmd model.ReportableCommand) {
close(p.cancelledCh)
}

// What planner does:
// - Wait until there is no PLANNED or RUNNING deployment
// - Pick the oldest PENDING deployment to plan its pipeline
// - <*> Perform planning a deployment by calling the pluggin's planner
// - Update the deployment status to PLANNED or not based on the result
func (p *planner) Run(ctx context.Context) error {
p.logger.Info("start running planner")

Expand All @@ -150,56 +146,19 @@ func (p *planner) Run(ctx context.Context) error {
p.done.Store(true)
}()

repoCfg := config.PipedRepository{
RepoID: p.deployment.GitPath.Repo.Id,
Remote: p.deployment.GitPath.Repo.Remote,
Branch: p.deployment.GitPath.Repo.Branch,
}

in := pln.Input{
ApplicationID: p.deployment.ApplicationId,
ApplicationName: p.deployment.ApplicationName,
GitPath: *p.deployment.GitPath,
Trigger: *p.deployment.Trigger,
MostRecentSuccessfulCommitHash: p.lastSuccessfulCommitHash,
PipedConfig: p.pipedConfig,
AppManifestsCache: p.appManifestsCache,
RegexPool: regexpool.DefaultPool(),
GitClient: p.gitClient,
Logger: p.logger,
}

in.TargetDSP = deploysource.NewProvider(
filepath.Join(p.workingDir, "target-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "target", p.deployment.Trigger.Commit.Hash),
*p.deployment.GitPath,
p.secretDecrypter,
)

if p.lastSuccessfulCommitHash != "" {
gp := *p.deployment.GitPath
gp.ConfigFilename = p.lastSuccessfulConfigFilename

in.RunningDSP = deploysource.NewProvider(
filepath.Join(p.workingDir, "running-deploysource"),
deploysource.NewGitSourceCloner(p.gitClient, repoCfg, "running", p.lastSuccessfulCommitHash),
gp,
p.secretDecrypter,
)
}

defer func() {
controllermetrics.UpdateDeploymentStatus(p.deployment, p.doneDeploymentStatus)
}()

planner, ok := p.plannerRegistry.Planner(p.deployment.Kind)
if !ok {
p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_FAILURE
p.reportDeploymentFailed(ctx, "Unable to find the planner for this application kind")
return fmt.Errorf("unable to find the planner for application %v", p.deployment.Kind)
in := &platform.BuildPlanRequest{
Deployment: p.deployment,
WorkingDir: p.workingDir,
LastSuccessfulCommitHash: p.lastSuccessfulCommitHash,
LastSuccessfulConfigFileName: p.lastSuccessfulConfigFilename,
PipedConfig: p.pipedConfig,
}

out, err := planner.Plan(ctx, in)
out, err := p.pluginClient.BuildPlan(ctx, in)

// If the deployment was already cancelled, we ignore the plan result.
select {
Expand All @@ -219,10 +178,10 @@ func (p *planner) Run(ctx context.Context) error {
}

p.doneDeploymentStatus = model.DeploymentStatus_DEPLOYMENT_PLANNED
return p.reportDeploymentPlanned(ctx, out)
return p.reportDeploymentPlanned(ctx, out.Plan)
}

func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) error {
func (p *planner) reportDeploymentPlanned(ctx context.Context, out *platform.DeploymentPlan) error {
var (
err error
retry = pipedservice.NewRetry(10)
Expand All @@ -232,7 +191,6 @@ func (p *planner) reportDeploymentPlanned(ctx context.Context, out pln.Output) e
StatusReason: "The deployment has been planned",
RunningCommitHash: p.lastSuccessfulCommitHash,
RunningConfigFilename: p.lastSuccessfulConfigFilename,
Version: out.Version,
Versions: out.Versions,
Stages: out.Stages,
DeploymentChainId: p.deployment.DeploymentChainId,
Expand Down
Loading

0 comments on commit 85fbb16

Please sign in to comment.