Skip to content

Commit

Permalink
Empower all commands to handle input to stdin (#2598)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:
This PR makes it possible to pass the input to stdin to all commands.
Due to this, it renamed `cli.Telemetry` to `cli.Input`. If you think about it, originally logger isn't in the telemetry context so not odd to change. This brought flexibility to us for a variety of other inputs to be propagated to each command.

The initial reason why I looked to make this change is so that we can pass the plaintext file to `pipectl encrypt` command via stdin.

**Which issue(s) this PR fixes**:

Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
-->
```release-note
NONE
```

This PR was merged by Kapetanios.
  • Loading branch information
nakabonne committed Oct 7, 2021
1 parent bb437d0 commit 4d3d2a7
Show file tree
Hide file tree
Showing 17 changed files with 163 additions and 160 deletions.
42 changes: 21 additions & 21 deletions cmd/pipecd/ops.go
Expand Up @@ -76,13 +76,13 @@ func NewOpsCommand() *cobra.Command {
return cmd
}

func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
func (s *ops) run(ctx context.Context, input cli.Input) error {
group, ctx := errgroup.WithContext(ctx)

// Load control plane configuration from the specified file.
cfg, err := loadConfig(s.configFile)
if err != nil {
t.Logger.Error("failed to load control-plane configuration",
input.Logger.Error("failed to load control-plane configuration",
zap.String("config-file", s.configFile),
zap.Error(err),
)
Expand All @@ -91,8 +91,8 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {

// Prepare sql database.
if cfg.Datastore.Type == model.DataStoreMySQL {
if err := ensureSQLDatabase(ctx, cfg, t.Logger); err != nil {
t.Logger.Error("failed to ensure prepare SQL database", zap.Error(err))
if err := ensureSQLDatabase(ctx, cfg, input.Logger); err != nil {
input.Logger.Error("failed to ensure prepare SQL database", zap.Error(err))
return err
}
}
Expand All @@ -104,94 +104,94 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
cfg.Datastore.FirestoreConfig.Project,
cfg.Datastore.FirestoreConfig.CredentialsFile,
cfg.Datastore.FirestoreConfig.CollectionNamePrefix,
t.Logger,
input.Logger,
)
group.Go(func() error {
return ensurer.CreateIndexes(ctx)
})
}

// Connect to the data store.
ds, err := createDatastore(ctx, cfg, t.Logger)
ds, err := createDatastore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create datastore", zap.Error(err))
input.Logger.Error("failed to create datastore", zap.Error(err))
return err
}
defer func() {
if err := ds.Close(); err != nil {
t.Logger.Error("failed to close datastore client", zap.Error(err))
input.Logger.Error("failed to close datastore client", zap.Error(err))
}
}()

// Connect to the file store.
fs, err := createFilestore(ctx, cfg, t.Logger)
fs, err := createFilestore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create filestore", zap.Error(err))
input.Logger.Error("failed to create filestore", zap.Error(err))
return err
}
defer func() {
if err := fs.Close(); err != nil {
t.Logger.Error("failed to close filestore client", zap.Error(err))
input.Logger.Error("failed to close filestore client", zap.Error(err))
}
}()

// Connect to the cache.
rd := redis.NewRedis(s.cacheAddress, "")
defer func() {
if err := rd.Close(); err != nil {
t.Logger.Error("failed to close redis client", zap.Error(err))
input.Logger.Error("failed to close redis client", zap.Error(err))
}
}()
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)

// Start running staled piped stat cleaner.
{
cleaner := staledpipedstatcleaner.NewStaledPipedStatCleaner(statCache, t.Logger)
cleaner := staledpipedstatcleaner.NewStaledPipedStatCleaner(statCache, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running command cleaner.
{
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger)
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running planpreview output cleaner.
{
cleaner := planpreviewoutputcleaner.NewCleaner(fs, t.Logger)
cleaner := planpreviewoutputcleaner.NewCleaner(fs, input.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})
}

// Start running insight collector.
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, t.Logger)
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, input.Logger)
group.Go(func() error {
return ic.Run(ctx)
})
insightMetricsCollector := insightmetrics.NewInsightMetricsCollector(insightstore.NewStore(fs), datastore.NewProjectStore(ds))

// Start running HTTP server.
{
handler := handler.NewHandler(s.httpPort, datastore.NewProjectStore(ds), insightstore.NewStore(fs), cfg.SharedSSOConfigs, s.gracePeriod, t.Logger)
handler := handler.NewHandler(s.httpPort, datastore.NewProjectStore(ds), insightstore.NewStore(fs), cfg.SharedSSOConfigs, s.gracePeriod, input.Logger)
group.Go(func() error {
return handler.Run(ctx)
})
}

psb := pipedstatsbuilder.NewPipedStatsBuilder(statCache, t.Logger)
psb := pipedstatsbuilder.NewPipedStatsBuilder(statCache, input.Logger)

// Register all pipecd ops metrics collectors.
reg := registerOpsMetrics(insightMetricsCollector)
// Start running admin server.
{
var (
ver = []byte(version.Get().Version)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, t.Logger)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, input.Logger)
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -200,7 +200,7 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.Handle("/metrics", t.CustomMetricsHandlerFor(reg, psb))
admin.Handle("/metrics", input.CustomMetricsHandlerFor(reg, psb))

group.Go(func() error {
return admin.Run(ctx)
Expand All @@ -212,7 +212,7 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
// could trigger the finish of server.
// This ensures that all components are good or no one.
if err := group.Wait(); err != nil {
t.Logger.Error("failed while running", zap.Error(err))
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil
Expand Down
85 changes: 43 additions & 42 deletions cmd/pipecd/server.go
Expand Up @@ -132,7 +132,7 @@ func NewServerCommand() *cobra.Command {
return cmd
}

func (s *server) run(ctx context.Context, t cli.Telemetry) error {
func (s *server) run(ctx context.Context, input cli.Input) error {
// Register all metrics.
reg := registerMetrics()

Expand All @@ -141,52 +141,52 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
// Load control plane configuration from the specified file.
cfg, err := loadConfig(s.configFile)
if err != nil {
t.Logger.Error("failed to load control-plane configuration",
input.Logger.Error("failed to load control-plane configuration",
zap.String("config-file", s.configFile),
zap.Error(err),
)
return err
}
t.Logger.Info("successfully loaded control-plane configuration")
input.Logger.Info("successfully loaded control-plane configuration")

ds, err := createDatastore(ctx, cfg, t.Logger)
ds, err := createDatastore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create datastore", zap.Error(err))
input.Logger.Error("failed to create datastore", zap.Error(err))
return err
}
defer func() {
if err := ds.Close(); err != nil {
t.Logger.Error("failed to close datastore client", zap.Error(err))
input.Logger.Error("failed to close datastore client", zap.Error(err))

}
}()
t.Logger.Info("succesfully connected to data store")
input.Logger.Info("succesfully connected to data store")

fs, err := createFilestore(ctx, cfg, t.Logger)
fs, err := createFilestore(ctx, cfg, input.Logger)
if err != nil {
t.Logger.Error("failed to create filestore", zap.Error(err))
input.Logger.Error("failed to create filestore", zap.Error(err))
return err
}
defer func() {
if err := fs.Close(); err != nil {
t.Logger.Error("failed to close filestore client", zap.Error(err))
input.Logger.Error("failed to close filestore client", zap.Error(err))
}
}()
t.Logger.Info("successfully connected to file store")
input.Logger.Info("successfully connected to file store")

rd := redis.NewRedis(s.cacheAddress, "")
defer func() {
if err := rd.Close(); err != nil {
t.Logger.Error("failed to close redis client", zap.Error(err))
input.Logger.Error("failed to close redis client", zap.Error(err))
}
}()
cache := rediscache.NewTTLCache(rd, cfg.Cache.TTLDuration())
sls := stagelogstore.NewStore(fs, cache, t.Logger)
alss := applicationlivestatestore.NewStore(fs, cache, t.Logger)
las := analysisresultstore.NewStore(fs, t.Logger)
cmds := commandstore.NewStore(ds, cache, t.Logger)
sls := stagelogstore.NewStore(fs, cache, input.Logger)
alss := applicationlivestatestore.NewStore(fs, cache, input.Logger)
las := analysisresultstore.NewStore(fs, input.Logger)
cmds := commandstore.NewStore(ds, cache, input.Logger)
is := insightstore.NewStore(fs)
cmdOutputStore := commandoutputstore.NewStore(fs, t.Logger)
cmdOutputStore := commandoutputstore.NewStore(fs, input.Logger)
statCache := rediscache.NewHashCache(rd, defaultPipedStatHashKey)

// Start a gRPC server for handling PipedAPI requests.
Expand All @@ -197,15 +197,15 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
cfg,
datastore.NewProjectStore(ds),
datastore.NewPipedStore(ds),
t.Logger,
input.Logger,
)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, t.Logger)
service = grpcapi.NewPipedAPI(ctx, ds, sls, alss, las, cmds, statCache, cmdOutputStore, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.pipedAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithPipedTokenAuthUnaryInterceptor(verifier, t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithPipedTokenAuthUnaryInterceptor(verifier, input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
)
Expand All @@ -215,7 +215,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -231,22 +231,23 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
verifier = apikeyverifier.NewVerifier(
ctx,
datastore.NewAPIKeyStore(ds),
t.Logger,
input.Logger,
)
service = grpcapi.NewAPI(ctx, ds, cmds, cmdOutputStore, cfg.Address, t.Logger)

service = grpcapi.NewAPI(ctx, ds, cmds, cmdOutputStore, cfg.Address, input.Logger)
opts = []rpc.Option{
rpc.WithPort(s.apiPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithAPIKeyAuthUnaryInterceptor(verifier, t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithAPIKeyAuthUnaryInterceptor(verifier, input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
)
if s.tls {
opts = append(opts, rpc.WithTLS(s.certFile, s.keyFile))
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -258,25 +259,25 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {

encryptDecrypter, err := crypto.NewAESEncryptDecrypter(s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new AES EncryptDecrypter", zap.Error(err))
input.Logger.Error("failed to create a new AES EncryptDecrypter", zap.Error(err))
return err
}

// Start a gRPC server for handling WebAPI requests.
{
verifier, err := jwt.NewVerifier(defaultSigningMethod, s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new JWT verifier", zap.Error(err))
input.Logger.Error("failed to create a new JWT verifier", zap.Error(err))
return err
}

service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, t.Logger)
service := grpcapi.NewWebAPI(ctx, ds, fs, sls, alss, cmds, is, rd, cfg.ProjectMap(), encryptDecrypter, input.Logger)
opts := []rpc.Option{
rpc.WithPort(s.webAPIPort),
rpc.WithGracePeriod(s.gracePeriod),
rpc.WithLogger(t.Logger),
rpc.WithLogUnaryInterceptor(t.Logger),
rpc.WithJWTAuthUnaryInterceptor(verifier, webservice.NewRBACAuthorizer(), t.Logger),
rpc.WithLogger(input.Logger),
rpc.WithLogUnaryInterceptor(input.Logger),
rpc.WithJWTAuthUnaryInterceptor(verifier, webservice.NewRBACAuthorizer(), input.Logger),
rpc.WithRequestValidationUnaryInterceptor(),
}
if s.tls {
Expand All @@ -285,7 +286,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
if s.enableGRPCReflection {
opts = append(opts, rpc.WithGRPCReflection())
}
if t.Flags.Metrics {
if input.Flags.Metrics {
opts = append(opts, rpc.WithPrometheusUnaryInterceptor())
}

Expand All @@ -301,7 +302,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
{
signer, err := jwt.NewSigner(defaultSigningMethod, s.encryptionKeyFile)
if err != nil {
t.Logger.Error("failed to create a new signer", zap.Error(err))
input.Logger.Error("failed to create a new signer", zap.Error(err))
return err
}

Expand All @@ -315,23 +316,23 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
cfg.SharedSSOConfigMap(),
datastore.NewProjectStore(ds),
!s.insecureCookie,
t.Logger,
input.Logger,
)
httpServer := &http.Server{
Addr: fmt.Sprintf(":%d", s.httpPort),
Handler: h,
}

group.Go(func() error {
return runHTTPServer(ctx, httpServer, s.gracePeriod, t.Logger)
return runHTTPServer(ctx, httpServer, s.gracePeriod, input.Logger)
})
}

// Start running admin server.
{
var (
ver = []byte(version.Get().Version)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, t.Logger)
admin = admin.NewAdmin(s.adminPort, s.gracePeriod, input.Logger)
)

admin.HandleFunc("/version", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -340,7 +341,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
admin.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("ok"))
})
admin.Handle("/metrics", t.PrometheusMetricsHandlerFor(reg))
admin.Handle("/metrics", input.PrometheusMetricsHandlerFor(reg))

group.Go(func() error {
return admin.Run(ctx)
Expand All @@ -352,7 +353,7 @@ func (s *server) run(ctx context.Context, t cli.Telemetry) error {
// could trigger the finish of server.
// This ensures that all components are good or no one.
if err := group.Wait(); err != nil {
t.Logger.Error("failed while running", zap.Error(err))
input.Logger.Error("failed while running", zap.Error(err))
return err
}
return nil
Expand Down

0 comments on commit 4d3d2a7

Please sign in to comment.