Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and redesign insight collector #1980

Merged
merged 1 commit into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 0 additions & 2 deletions cmd/pipecd/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_library(
"//pkg/app/ops/insightcollector:go_default_library",
"//pkg/app/ops/mysqlensurer:go_default_library",
"//pkg/app/ops/orphancommandcleaner:go_default_library",
"//pkg/backoff:go_default_library",
"//pkg/cache/rediscache:go_default_library",
"//pkg/cli:go_default_library",
"//pkg/config:go_default_library",
Expand All @@ -45,7 +44,6 @@ go_library(
"//pkg/version:go_default_library",
"@com_github_dgrijalva_jwt_go//:go_default_library",
"@com_github_nytimes_gziphandler//:go_default_library",
"@com_github_robfig_cron_v3//:go_default_library",
"@com_github_spf13_cobra//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
"@org_uber_go_zap//:go_default_library",
Expand Down
71 changes: 6 additions & 65 deletions cmd/pipecd/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"net/http"
"time"

"github.com/robfig/cron/v3"
"github.com/spf13/cobra"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -30,7 +29,6 @@ import (
"github.com/pipe-cd/pipe/pkg/app/ops/insightcollector"
"github.com/pipe-cd/pipe/pkg/app/ops/mysqlensurer"
"github.com/pipe-cd/pipe/pkg/app/ops/orphancommandcleaner"
"github.com/pipe-cd/pipe/pkg/backoff"
"github.com/pipe-cd/pipe/pkg/cli"
"github.com/pipe-cd/pipe/pkg/config"
"github.com/pipe-cd/pipe/pkg/datastore"
Expand Down Expand Up @@ -61,7 +59,6 @@ func NewOpsCommand() *cobra.Command {
cmd.Flags().IntVar(&s.httpPort, "http-port", s.httpPort, "The port number used to run http server.")
cmd.Flags().IntVar(&s.adminPort, "admin-port", s.adminPort, "The port number used to run a HTTP server for admin tasks such as metrics, healthz.")
cmd.Flags().DurationVar(&s.gracePeriod, "grace-period", s.gracePeriod, "How long to wait for graceful shutdown.")
cmd.Flags().BoolVar(&s.enableInsightCollector, "enableInsightCollector-insight-collector", s.enableInsightCollector, "Enable insight collector.")
cmd.Flags().StringVar(&s.configFile, "config-file", s.configFile, "The path to the configuration file.")
cmd.Flags().StringVar(&s.gcloudPath, "gcloud-path", s.gcloudPath, "The path to the gcloud command executable.")
return cmd
Expand Down Expand Up @@ -126,27 +123,17 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
}
}()

// Starting orphan commands cleaner
// Start running command cleaner.
cleaner := orphancommandcleaner.NewOrphanCommandCleaner(ds, t.Logger)
group.Go(func() error {
return cleaner.Run(ctx)
})

// Starting a cron job for insight collector.
if s.enableInsightCollector {
insightCfg := cfg.InsightCollector
mode := loadCollectorMode(insightCfg)
collector := insightcollector.NewInsightCollector(ds, fs, mode, t.Logger)

c := cron.New(cron.WithLocation(time.UTC))
_, err := c.AddFunc(insightCfg.Schedule, func() {
s.runDeploymentCollector(ctx, collector, insightCfg, t.Logger)
})
if err != nil {
t.Logger.Error("failed to configure cron job for collecting insight data about deployment", zap.Error(err))
}
c.Start()
}
// Start running insight collector.
ic := insightcollector.NewCollector(ds, fs, cfg.InsightCollector, t.Logger)
group.Go(func() error {
return ic.Run(ctx)
})

// Start running HTTP server.
{
Expand Down Expand Up @@ -187,52 +174,6 @@ func (s *ops) run(ctx context.Context, t cli.Telemetry) error {
return nil
}

func (s *ops) runDeploymentCollector(ctx context.Context, col *insightcollector.InsightCollector, cfg config.ControlPlaneInsightCollector, logger *zap.Logger) {
var doneNewlyCompleted, doneNewlyCreated bool
retry := backoff.NewRetry(
cfg.RetryTime,
backoff.NewConstant(time.Duration(cfg.RetryIntervalHour)*time.Hour),
)

for retry.WaitNext(ctx) {
if !doneNewlyCompleted {
start := time.Now()
if err := col.ProcessNewlyCompletedDeployments(ctx); err != nil {
logger.Error("failed to process the newly completed deployments while accumulating insight data", zap.Error(err))
} else {
logger.Info("successfully processed the newly completed deployments while accumulating insight data", zap.Duration("duration", time.Since(start)))
doneNewlyCompleted = true
}
}

if !doneNewlyCreated {
start := time.Now()
if err := col.ProcessNewlyCreatedDeployments(ctx); err != nil {
logger.Error("failed to process the newly created deployments while accumulating insight data", zap.Error(err))
} else {
logger.Info("successfully processed the newly created deployments while accumulating insight data", zap.Duration("duration", time.Since(start)))
doneNewlyCreated = true
}
}

if doneNewlyCompleted && doneNewlyCreated {
return
}
logger.Info("will do another try to collect insight data")
}
}

func loadCollectorMode(cfg config.ControlPlaneInsightCollector) insightcollector.CollectorMetrics {
metrics := insightcollector.NewCollectorMetrics()
if !cfg.DisabledMetrics.DeploymentFrequency {
metrics.Enable(insightcollector.DevelopmentFrequency)
}
if !cfg.DisabledMetrics.ChangeFailureRate {
metrics.Enable(insightcollector.ChangeFailureRate)
}
return metrics
}

func ensureSQLDatabase(ctx context.Context, cfg *config.ControlPlaneSpec, logger *zap.Logger) error {
mysqlEnsurer, err := mysqlensurer.NewMySQLEnsurer(
cfg.Datastore.MySQLConfig.URL,
Expand Down
12 changes: 4 additions & 8 deletions pkg/app/ops/insightcollector/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,33 @@ go_library(
"applications.go",
"collector.go",
"deployments.go",
"metrics.go",
],
importpath = "github.com/pipe-cd/pipe/pkg/app/ops/insightcollector",
visibility = ["//visibility:public"],
deps = [
"//pkg/backoff:go_default_library",
"//pkg/config:go_default_library",
"//pkg/datastore:go_default_library",
"//pkg/filestore:go_default_library",
"//pkg/insight:go_default_library",
"//pkg/insight/insightstore:go_default_library",
"//pkg/model:go_default_library",
"@com_github_robfig_cron_v3//:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
)

go_test(
name = "go_default_test",
size = "small",
srcs = [
"applications_test.go",
"deployments_test.go",
"metrics_test.go",
],
srcs = ["deployments_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/datastore:go_default_library",
"//pkg/datastore/datastoretest:go_default_library",
"//pkg/filestore:go_default_library",
"//pkg/filestore/filestoretest:go_default_library",
"//pkg/insight:go_default_library",
"//pkg/insight/insightstore:go_default_library",
"//pkg/insight/insightstore/insightstoretest:go_default_library",
"//pkg/model:go_default_library",
"@com_github_golang_mock//gomock:go_default_library",
"@com_github_stretchr_testify//assert:go_default_library",
Expand Down
67 changes: 32 additions & 35 deletions pkg/app/ops/insightcollector/applications.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,86 +22,83 @@ import (
"go.uber.org/zap"

"github.com/pipe-cd/pipe/pkg/datastore"
"github.com/pipe-cd/pipe/pkg/filestore"
"github.com/pipe-cd/pipe/pkg/insight"
"github.com/pipe-cd/pipe/pkg/model"
)

// collectApplicationCount collects application count data.
func (i *InsightCollector) collectApplicationCount(ctx context.Context, apps []*model.Application, target time.Time) error {
func (c *Collector) collectApplicationCount(ctx context.Context, apps []*model.Application, target time.Time) error {
var lastErr error
appmap := groupApplicationsByProjectID(apps)
var updateErr error

for pid, apps := range appmap {
if err := i.updateApplicationCount(ctx, apps, pid, target); err != nil {
updateErr = err
if err := c.updateApplicationCounts(ctx, pid, apps, target); err != nil {
c.logger.Error("failed to update ApplicationCounts data",
zap.String("project", pid),
zap.Error(err),
)
lastErr = err
}
}
return updateErr
return lastErr
}

func (i *InsightCollector) updateApplicationCount(ctx context.Context, apps []*model.Application, pid string, target time.Time) error {
a, err := i.insightstore.LoadApplicationCount(ctx, pid)
func (c *Collector) updateApplicationCounts(ctx context.Context, projectID string, apps []*model.Application, target time.Time) error {
counts, err := insight.MakeApplicationCounts(apps, target)
if err != nil {
if err == filestore.ErrNotFound {
a = insight.NewApplicationCount()
oldestApp := findOldestApplication(apps)
a.AccumulatedFrom = oldestApp.CreatedAt
} else {
return fmt.Errorf("load application count: %w", err)
}
return fmt.Errorf("failed to make application counts: %w", err)
}

a.MigrateApplicationCount()

// update application count
a.UpdateCount(apps)

a.AccumulatedTo = target.Unix()

if err := i.insightstore.PutApplicationCount(ctx, a, pid); err != nil {
return fmt.Errorf("put application count: %w", err)
if err := c.insightstore.PutApplicationCounts(ctx, projectID, counts); err != nil {
return fmt.Errorf("failed to put application counts: %w", err)
}

return nil
}

func (i *InsightCollector) getApplications(ctx context.Context, to time.Time) ([]*model.Application, error) {
func (c *Collector) listApplications(ctx context.Context, to time.Time) ([]*model.Application, error) {
nghialv marked this conversation as resolved.
Show resolved Hide resolved
const limit = 100
var cursor string
var applications []*model.Application
maxCreatedAt := to.Unix()

for {
apps, _, err := i.applicationStore.ListApplications(ctx, datastore.ListOptions{
Limit: limit,
apps, next, err := c.applicationStore.ListApplications(ctx, datastore.ListOptions{
Filters: []datastore.ListFilter{
{
Field: "CreatedAt",
Operator: "<",
Value: maxCreatedAt,
Field: "Deleted",
Operator: "==",
Value: false,
},
},
Orders: []datastore.Order{
{
Field: "CreatedAt",
Direction: datastore.Desc,
},
{
Field: "Id",
Direction: datastore.Asc,
},
},
Cursor: cursor,
Limit: limit,
})
if err != nil {
i.logger.Error("failed to fetch applications", zap.Error(err))
return nil, err
}

applications = append(applications, apps...)
if len(apps) < limit {
if next == "" {
break
}
maxCreatedAt = apps[len(apps)-1].CreatedAt
cursor = next
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🆒 😂

}
return applications, nil
}

// groupApplicationsByProjectID groups applications by projectID
func groupApplicationsByProjectID(applications []*model.Application) map[string][]*model.Application {
apps := map[string][]*model.Application{}
apps := make(map[string][]*model.Application)
for _, a := range applications {
apps[a.ProjectId] = append(apps[a.ProjectId], a)
}
Expand Down