Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empower all commands to handle input to stdin #2598

Merged
merged 2 commits into from Oct 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 21 additions & 21 deletions cmd/pipecd/ops.go
Expand Up @@ -76,13 +76,13 @@ func NewOpsCommand() *cobra.Command {
return cmd
}

func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
func (s *ops) run(ctx context.Context, input cli.Input) error {
group, ctx := errgroup.WithContext(ctx)

// Load control plane configuration from the specified file.
cfg, err := loadConfig(s.configFile)
if err != nil {
t.Logger.Error("failed to load control-plane configuration",
input.Logger.Error("failed to load control-plane configuration",
zap.String("config-file", s.configFile),
zap.Error(err),
)
Expand All @@ -91,8 +91,8 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {

// Prepare sql database.
if cfg.Datastore.Type == model.DataStoreMySQL {
if err := ensureSQLDatabase(ctx, cfg, t.Logger); err != nil {
t.Logger.Error("failed to ensure prepare SQL database", zap.Error(err))
if err := ensureSQLDatabase(ctx, cfg, input.Logger); err != nil {
input.Logger.Error("failed to ensure prepare SQL database", zap.Error(err))
return err
}
}
Expand All @@ -104,94 +104,94 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
cfg.Datastore.FirestoreConfig.Project,
cfg.Datastore.FirestoreConfig.CredentialsFile,
cfg.Datastore.FirestoreConfig.CollectionNamePrefix,
t.Logger,
input.Logger,
)
group.Go(func() error {
return ensurer.CreateIndexes(ctx)
})
}

// Connect to the data store.
ds, err := createDatastore(ctx, cfg, t.Logger)
ds, err := createDatastore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create datastore", zap.Error(err))
input.Logger.Error("failed to create datastore", zap.Error(err))
return err
}
defer func() {
if err := ds.Close(); err != nil {
t.Logger.Error("failed to close datastore client", zap.Error(err))
input.Logger.Error("failed to close datastore client", zap.Error(err))
}
}()

// Connect to the file store.
fs, err := createFilestore(ctx, cfg, t.Logger)
fs, err := createFilestore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create filestore", zap.Error(err))
input.Logger.Error("failed to create filestore", zap.Error(err))
return err
}
defer func() {
if err := fs.Close(); err != nil {
t.Logger.Error("failed to close filestore client", zap.Error(err))
input.Logger.Error("failed to close filestore client", zap.Error(err))
}
}()

// Connect to the cache.
rd := redis.NewRedis(s.cacheAddress, "")
defer func() {
if err := rd.Close(); err != nil {
t.Logger.Error("failed to close redis client", zap.Error(err))
input.Logger.Error("failed to close redis client", zap.Error(err))
}
}()
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)

// Start running staled piped stat cleaner.
{
cleaner := staledpipedstatcleaner.NewStaledPipedStatCleaner(statCache, t.Logger)
cleaner := staledpipedstatcleaner.NewStaledPipedStatCleaner(statCache, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running command cleaner.
{
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger)
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running planpreview output cleaner.
{
cleaner := planpreviewoutputcleaner.NewCleaner(fs, t.Logger)
cleaner := planpreviewoutputcleaner.NewCleaner(fs, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running insight collector.
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, t.Logger)
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, input.Logger)
group.Go(func() error {
return ic.Run(ctx)
})
insightMetricsCollector := insightmetrics.NewInsightMetricsCollector(insightstore.NewStore(fs), datastore.NewProjectStore(ds))

// Start running HTTP server.
{
handler := handler.NewHandler(s.httpPort, datastore.NewProjectStore(ds), insightstore.NewStore(fs), cfg.SharedSSOConfigs, s.gracePeriod, t.Logger)
handler := handler.NewHandler(s.httpPort, datastore.NewProjectStore(ds), insightstore.NewStore(fs), cfg.SharedSSOConfigs, s.gracePeriod, input.Logger)
group.Go(func() error {
return handler.Run(ctx)
})
}

psb := pipedstatsbuilder.NewPipedStatsBuilder(statCache, t.Logger)
psb := pipedstatsbuilder.NewPipedStatsBuilder(statCache, input.Logger)

// Register all pipecd ops metrics collectors.
reg := registerOpsMetrics(insightMetricsCollector)
// Start running admin server.
{
var (
ver = []byte(version.Get().Version)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, t.Logger)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, input.Logger)
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -200,7 +200,7 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.Handle("/metrics", t.CustomMetricsHandlerFor(reg, psb))
admin.Handle("/metrics", input.CustomMetricsHandlerFor(reg, psb))

group.Go(func() error {
return admin.Run(ctx)
Expand All @@ -212,7 +212,7 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
// could trigger the finish of server.
// This ensures that all components are good or no one.
if err := group.Wait(); err != nil {
t.Logger.Error("failed while running", zap.Error(err))
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil
Expand Down
85 changes: 43 additions & 42 deletions cmd/pipecd/server.go
Expand Up @@ -132,7 +132,7 @@ func NewServerCommand() *cobra.Command {
return cmd
}

func (s *server) run(ctx context.Context, t cli.Telemetry) error {
func (s *server) run(ctx context.Context, input cli.Input) error {
// Register all metrics.
reg := registerMetrics()

Expand All @@ -141,52 +141,52 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
// Load control plane configuration from the specified file.
cfg, err := loadConfig(s.configFile)
if err != nil {
t.Logger.Error("failed to load control-plane configuration",
input.Logger.Error("failed to load control-plane configuration",
zap.String("config-file", s.configFile),
zap.Error(err),
)
return err
}
t.Logger.Info("successfully loaded control-plane configuration")
input.Logger.Info("successfully loaded control-plane configuration")

ds, err := createDatastore(ctx, cfg, t.Logger)
ds, err := createDatastore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create datastore", zap.Error(err))
input.Logger.Error("failed to create datastore", zap.Error(err))
return err
}
defer func() {
if err := ds.Close(); err != nil {
t.Logger.Error("failed to close datastore client", zap.Error(err))
input.Logger.Error("failed to close datastore client", zap.Error(err))

}
}()
t.Logger.Info("succesfully connected to data store")
input.Logger.Info("succesfully connected to data store")

fs, err := createFilestore(ctx, cfg, t.Logger)
fs, err := createFilestore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create filestore", zap.Error(err))
input.Logger.Error("failed to create filestore", zap.Error(err))
return err
}
defer func() {
if err := fs.Close(); err != nil {
t.Logger.Error("failed to close filestore client", zap.Error(err))
input.Logger.Error("failed to close filestore client", zap.Error(err))
}
}()
t.Logger.Info("successfully connected to file store")
input.Logger.Info("successfully connected to file store")

rd := redis.NewRedis(s.cacheAddress, "")
defer func() {
if err := rd.Close(); err != nil {
t.Logger.Error("failed to close redis client", zap.Error(err))
input.Logger.Error("failed to close redis client", zap.Error(err))
}
}()
cache := rediscache.NewTTLCache(rd, cfg.Cache.TTLDuration())
sls := stagelogstore.NewStore(fs, cache, t.Logger)
alss := applicationlivestatestore.NewStore(fs, cache, t.Logger)
las := analysisresultstore.NewStore(fs, t.Logger)
cmds := commandstore.NewStore(ds, cache, t.Logger)
sls := stagelogstore.NewStore(fs, cache, input.Logger)
alss := applicationlivestatestore.NewStore(fs, cache, input.Logger)
las := analysisresultstore.NewStore(fs, input.Logger)
cmds := commandstore.NewStore(ds, cache, input.Logger)
is := insightstore.NewStore(fs)
cmdOutputStore := commandoutputstore.NewStore(fs, t.Logger)
cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)

// Start a gRPC server for handling PipedAPI requests.
Expand All @@ -197,15 +197,15 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
cfg,
datastore.NewProjectStore(ds),
datastore.NewPipedStore(ds),
t.Logger,
input.Logger,
)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, t.Logger)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.pipedAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithPipedTokenAuthUnaryInterceptor(verifier, t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithPipedTokenAuthUnaryInterceptor(verifier, input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
)
Expand All @@ -215,7 +215,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -231,22 +231,23 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
verifier = apikeyverifier.NewVerifier(
ctx,
datastore.NewAPIKeyStore(ds),
t.Logger,
input.Logger,
)
service = grpcapi.NewAPI(ctx, ds, cmds, cmdOutputStore, cfg.Address, t.Logger)

service = grpcapi.NewAPI(ctx, ds, cmds, cmdOutputStore, cfg.Address, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.apiPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithAPIKeyAuthUnaryInterceptor(verifier, t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithAPIKeyAuthUnaryInterceptor(verifier, input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
)
if s.tls {
opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile))
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -258,25 +259,25 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {

encryptDecrypter, err := crypto.NewAESEncryptDecrypter(s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new AES EncryptDecrypter", zap.Error(err))
input.Logger.Error("failed to create a new AES EncryptDecrypter", zap.Error(err))
return err
}

// Start a gRPC server for handling WebAPI requests.
{
verifier, err := jwt.NewVerifier(defaultSigningMethod, s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new JWT verifier", zap.Error(err))
input.Logger.Error("failed to create a new JWT verifier", zap.Error(err))
return err
}

service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, t.Logger)
service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger)
opts := []rpc.Option{
rpc.WithPort(s.webAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithJWTAuthUnaryInterceptor(verifier, webservice.NewRBACAuthorizer(), t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithJWTAuthUnaryInterceptor(verifier, webservice.NewRBACAuthorizer(), input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
if s.tls {
Expand All @@ -285,7 +286,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -301,7 +302,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
{
signer, err := jwt.NewSigner(defaultSigningMethod, s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new signer", zap.Error(err))
input.Logger.Error("failed to create a new signer", zap.Error(err))
return err
}

Expand All @@ -315,23 +316,23 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
cfg.SharedSSOConfigMap(),
datastore.NewProjectStore(ds),
!s.insecureCookie,
t.Logger,
input.Logger,
)
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", s.httpPort),
Handler: h,
}

group.Go(func() error {
return runHTTPServer(ctx, httpServer, s.gracePeriod, t.Logger)
return runHTTPServer(ctx, httpServer, s.gracePeriod, input.Logger)
})
}

// Start running admin server.
{
var (
ver = []byte(version.Get().Version)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, t.Logger)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, input.Logger)
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -340,7 +341,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.Handle("/metrics", t.PrometheusMetricsHandlerFor(reg))
admin.Handle("/metrics", input.PrometheusMetricsHandlerFor(reg))

group.Go(func() error {
return admin.Run(ctx)
Expand All @@ -352,7 +353,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
// could trigger the finish of server.
// This ensures that all components are good or no one.
if err := group.Wait(); err != nil {
t.Logger.Error("failed while running", zap.Error(err))
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil
Expand Down