Skip to content

Commit

Permalink
Refactor and redesign insight collector (#1980)
Browse files Browse the repository at this point in the history
**What this PR does / why we need it**:

**Which issue(s) this PR fixes**:

Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
-->
```release-note
NONE
```

This PR was merged by Kapetanios.
  • Loading branch information
nghialv committed May 20, 2021
1 parent a31e638 commit 5e732c7
Show file tree
Hide file tree
Showing 18 changed files with 255 additions and 1,185 deletions.
2 changes: 0 additions & 2 deletions cmd/pipecd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"//pkg/app/ops/insightcollector:go_default_library",
"//pkg/app/ops/mysqlensurer:go_default_library",
"//pkg/app/ops/orphancommandcleaner:go_default_library",
"//pkg/backoff:go_default_library",
"//pkg/cache/rediscache:go_default_library",
"//pkg/cli:go_default_library",
"//pkg/config:go_default_library",
Expand All @@ -45,7 +44,6 @@ go_library(
"//pkg/version:go_default_library",
"@com_github_dgrijalva_jwt_go//:go_default_library",
"@com_github_nytimes_gziphandler//:go_default_library",
"@com_github_robfig_cron_v3//:go_default_library",
"@com_github_spf13_cobra//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
"@org_uber_go_zap//:go_default_library",
Expand Down
71 changes: 6 additions & 65 deletions cmd/pipecd/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net/http"
"time"

"github.com/robfig/cron/v3"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/pipe-cd/pipe/pkg/app/ops/insightcollector"
"github.com/pipe-cd/pipe/pkg/app/ops/mysqlensurer"
"github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner"
"github.com/pipe-cd/pipe/pkg/backoff"
"github.com/pipe-cd/pipe/pkg/cli"
"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/datastore"
Expand Down Expand Up @@ -61,7 +59,6 @@ func NewOpsCommand() *cobra.Command {
cmd.Flags().IntVar(&s.httpPort, "http-port", s.httpPort, "The port number used to run http server.")
cmd.Flags().IntVar(&s.adminPort, "admin-port", s.adminPort, "The port number used to run a HTTP server for admin tasks such as metrics, healthz.")
cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.")
cmd.Flags().BoolVar(&s.enableInsightCollector, "enableInsightCollector-insight-collector", s.enableInsightCollector, "Enable insight collector.")
cmd.Flags().StringVar(&s.configFile, "config-file", s.configFile, "The path to the configuration file.")
cmd.Flags().StringVar(&s.gcloudPath, "gcloud-path", s.gcloudPath, "The path to the gcloud command executable.")
return cmd
Expand Down Expand Up @@ -126,27 +123,17 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
}
}()

// Starting orphan commands cleaner
// Start running command cleaner.
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})

// Starting a cron job for insight collector.
if s.enableInsightCollector {
insightCfg := cfg.InsightCollector
mode := loadCollectorMode(insightCfg)
collector := insightcollector.NewInsightCollector(ds, fs, mode, t.Logger)

c := cron.New(cron.WithLocation(time.UTC))
_, err := c.AddFunc(insightCfg.Schedule, func() {
s.runDeploymentCollector(ctx, collector, insightCfg, t.Logger)
})
if err != nil {
t.Logger.Error("failed to configure cron job for collecting insight data about deployment", zap.Error(err))
}
c.Start()
}
// Start running insight collector.
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, t.Logger)
group.Go(func() error {
return ic.Run(ctx)
})

// Start running HTTP server.
{
Expand Down Expand Up @@ -187,52 +174,6 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
return nil
}

func (s *ops) runDeploymentCollector(ctx context.Context, col *insightcollector.InsightCollector, cfg config.ControlPlaneInsightCollector, logger *zap.Logger) {
var doneNewlyCompleted, doneNewlyCreated bool
retry := backoff.NewRetry(
cfg.RetryTime,
backoff.NewConstant(time.Duration(cfg.RetryIntervalHour)*time.Hour),
)

for retry.WaitNext(ctx) {
if !doneNewlyCompleted {
start := time.Now()
if err := col.ProcessNewlyCompletedDeployments(ctx); err != nil {
logger.Error("failed to process the newly completed deployments while accumulating insight data", zap.Error(err))
} else {
logger.Info("successfully processed the newly completed deployments while accumulating insight data", zap.Duration("duration", time.Since(start)))
doneNewlyCompleted = true
}
}

if !doneNewlyCreated {
start := time.Now()
if err := col.ProcessNewlyCreatedDeployments(ctx); err != nil {
logger.Error("failed to process the newly created deployments while accumulating insight data", zap.Error(err))
} else {
logger.Info("successfully processed the newly created deployments while accumulating insight data", zap.Duration("duration", time.Since(start)))
doneNewlyCreated = true
}
}

if doneNewlyCompleted && doneNewlyCreated {
return
}
logger.Info("will do another try to collect insight data")
}
}

func loadCollectorMode(cfg config.ControlPlaneInsightCollector) insightcollector.CollectorMetrics {
metrics := insightcollector.NewCollectorMetrics()
if !cfg.DisabledMetrics.DeploymentFrequency {
metrics.Enable(insightcollector.DevelopmentFrequency)
}
if !cfg.DisabledMetrics.ChangeFailureRate {
metrics.Enable(insightcollector.ChangeFailureRate)
}
return metrics
}

func ensureSQLDatabase(ctx context.Context, cfg *config.ControlPlaneSpec, logger *zap.Logger) error {
mysqlEnsurer, err := mysqlensurer.NewMySQLEnsurer(
cfg.Datastore.MySQLConfig.URL,
Expand Down
12 changes: 4 additions & 8 deletions pkg/app/ops/insightcollector/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,33 @@ go_library(
"applications.go",
"collector.go",
"deployments.go",
"metrics.go",
],
importpath = "github.com/pipe-cd/pipe/pkg/app/ops/insightcollector",
visibility = ["//visibility:public"],
deps = [
"//pkg/backoff:go_default_library",
"//pkg/config:go_default_library",
"//pkg/datastore:go_default_library",
"//pkg/filestore:go_default_library",
"//pkg/insight:go_default_library",
"//pkg/insight/insightstore:go_default_library",
"//pkg/model:go_default_library",
"@com_github_robfig_cron_v3//:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)

go_test(
name = "go_default_test",
size = "small",
srcs = [
"applications_test.go",
"deployments_test.go",
"metrics_test.go",
],
srcs = ["deployments_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/datastore:go_default_library",
"//pkg/datastore/datastoretest:go_default_library",
"//pkg/filestore:go_default_library",
"//pkg/filestore/filestoretest:go_default_library",
"//pkg/insight:go_default_library",
"//pkg/insight/insightstore:go_default_library",
"//pkg/insight/insightstore/insightstoretest:go_default_library",
"//pkg/model:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
Expand Down
67 changes: 32 additions & 35 deletions pkg/app/ops/insightcollector/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,86 +22,83 @@ import (
"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/datastore"
"github.com/pipe-cd/pipe/pkg/filestore"
"github.com/pipe-cd/pipe/pkg/insight"
"github.com/pipe-cd/pipe/pkg/model"
)

// collectApplicationCount collects application count data.
func (i *InsightCollector) collectApplicationCount(ctx context.Context, apps []*model.Application, target time.Time) error {
func (c *Collector) collectApplicationCount(ctx context.Context, apps []*model.Application, target time.Time) error {
var lastErr error
appmap := groupApplicationsByProjectID(apps)
var updateErr error

for pid, apps := range appmap {
if err := i.updateApplicationCount(ctx, apps, pid, target); err != nil {
updateErr = err
if err := c.updateApplicationCounts(ctx, pid, apps, target); err != nil {
c.logger.Error("failed to update ApplicationCounts data",
zap.String("project", pid),
zap.Error(err),
)
lastErr = err
}
}
return updateErr
return lastErr
}

func (i *InsightCollector) updateApplicationCount(ctx context.Context, apps []*model.Application, pid string, target time.Time) error {
a, err := i.insightstore.LoadApplicationCount(ctx, pid)
func (c *Collector) updateApplicationCounts(ctx context.Context, projectID string, apps []*model.Application, target time.Time) error {
counts, err := insight.MakeApplicationCounts(apps, target)
if err != nil {
if err == filestore.ErrNotFound {
a = insight.NewApplicationCount()
oldestApp := findOldestApplication(apps)
a.AccumulatedFrom = oldestApp.CreatedAt
} else {
return fmt.Errorf("load application count: %w", err)
}
return fmt.Errorf("failed to make application counts: %w", err)
}

a.MigrateApplicationCount()

// update application count
a.UpdateCount(apps)

a.AccumulatedTo = target.Unix()

if err := i.insightstore.PutApplicationCount(ctx, a, pid); err != nil {
return fmt.Errorf("put application count: %w", err)
if err := c.insightstore.PutApplicationCounts(ctx, projectID, counts); err != nil {
return fmt.Errorf("failed to put application counts: %w", err)
}

return nil
}

func (i *InsightCollector) getApplications(ctx context.Context, to time.Time) ([]*model.Application, error) {
func (c *Collector) listApplications(ctx context.Context, to time.Time) ([]*model.Application, error) {
const limit = 100
var cursor string
var applications []*model.Application
maxCreatedAt := to.Unix()

for {
apps, _, err := i.applicationStore.ListApplications(ctx, datastore.ListOptions{
Limit: limit,
apps, next, err := c.applicationStore.ListApplications(ctx, datastore.ListOptions{
Filters: []datastore.ListFilter{
{
Field: "CreatedAt",
Operator: "<",
Value: maxCreatedAt,
Field: "Deleted",
Operator: "==",
Value: false,
},
},
Orders: []datastore.Order{
{
Field: "CreatedAt",
Direction: datastore.Desc,
},
{
Field: "Id",
Direction: datastore.Asc,
},
},
Cursor: cursor,
Limit: limit,
})
if err != nil {
i.logger.Error("failed to fetch applications", zap.Error(err))
return nil, err
}

applications = append(applications, apps...)
if len(apps) < limit {
if next == "" {
break
}
maxCreatedAt = apps[len(apps)-1].CreatedAt
cursor = next
}
return applications, nil
}

// groupApplicationsByProjectID groups applications by projectID
func groupApplicationsByProjectID(applications []*model.Application) map[string][]*model.Application {
apps := map[string][]*model.Application{}
apps := make(map[string][]*model.Application)
for _, a := range applications {
apps[a.ProjectId] = append(apps[a.ProjectId], a)
}
Expand Down

0 comments on commit 5e732c7

Please sign in to comment.