Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist state in statusreconciler #15585

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions prow/cmd/status-reconciler/BUILD.bazel
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/flagutil:go_default_library",
"//pkg/io:go_default_library",
"//prow/config:go_default_library",
"//prow/config/secret:go_default_library",
"//prow/flagutil:go_default_library",
Expand Down
31 changes: 26 additions & 5 deletions prow/cmd/status-reconciler/main.go
Expand Up @@ -20,15 +20,17 @@ import (
"context"
"flag"
"os"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/test-infra/prow/interrupts"

"k8s.io/test-infra/pkg/flagutil"
"k8s.io/test-infra/pkg/io"
"k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/config/secret"
prowflagutil "k8s.io/test-infra/prow/flagutil"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/logrusutil"
"k8s.io/test-infra/prow/pjutil"
"k8s.io/test-infra/prow/plugins"
Expand All @@ -53,6 +55,15 @@ type options struct {

tokenBurst int
tokensPerHour int

// The following are used for reading/writing to GCS.
gcsCredentialsFile string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these two and their associated flags something that could be DRYed out into a struct and embedded in all the controllers that do something similar? (tide, deck, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds great. Could I submit this change as a separate PR once this one is merged, as it will change multiple controllers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@adshmh I think we forgot to expose this as a flag in the first place 🤦‍♂️

Perhaps we can fix that with the patch to share this code between components that mean to write to GCS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching that. Yes, I will include that in the patch.

// statusURI where Status-reconciler stores last known state, i.e. configuration.
// Can be a /local/path or gs://path/to/object.
// GCS writes will use the bucket's default acl for new objects. Ensure both that
// a) the gcs credentials can write to this bucket
// b) the default acls do not expose any private info
statusURI string
}

func gatherOptions() options {
Expand All @@ -62,6 +73,7 @@ func gatherOptions() options {
fs.StringVar(&o.configPath, "config-path", "/etc/config/config.yaml", "Path to config.yaml.")
fs.StringVar(&o.jobConfigPath, "job-config-path", "", "Path to prow job configs.")
fs.StringVar(&o.pluginConfig, "plugin-config", "/etc/plugins/plugins.yaml", "Path to plugin config file.")
fs.StringVar(&o.statusURI, "status-path", "", "The /local/path or gs://path/to/object to store status controller state. GCS writes will use the default object ACL for the bucket.")

fs.BoolVar(&o.continueOnError, "continue-on-error", false, "Indicates that the migration should continue if context migration fails for an individual PR.")
fs.Var(&o.addedPresubmitBlacklist, "blacklist", "Org or org/repo to ignore new added presubmits for, set more than once to add more.")
Expand Down Expand Up @@ -102,8 +114,6 @@ func main() {
if err := configAgent.Start(o.configPath, o.jobConfigPath); err != nil {
logrus.WithError(err).Fatal("Error starting config agent.")
}
changes := make(chan config.Delta)
configAgent.Subscribe(changes)

secretAgent := &secret.Agent{}
if o.github.TokenPath != "" {
Expand All @@ -130,8 +140,19 @@ func main() {
logrus.WithError(err).Fatal("Error getting kube client.")
}

c := statusreconciler.NewController(o.continueOnError, sets.NewString(o.addedPresubmitBlacklist.Strings()...), prowJobClient, githubClient, configAgent, pluginAgent)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
opener, err := io.NewOpener(ctx, o.gcsCredentialsFile)
if err != nil {
entry := logrus.WithError(err)
if p := o.gcsCredentialsFile; p != "" {
entry = entry.WithField("gcs-credentials-file", p)
}
entry.Fatal("Cannot create opener")
}

c := statusreconciler.NewController(o.continueOnError, sets.NewString(o.addedPresubmitBlacklist.Strings()...), opener, o.configPath, o.jobConfigPath, o.statusURI, prowJobClient, githubClient, pluginAgent)
interrupts.Run(func(ctx context.Context) {
c.Run(ctx, changes)
c.Run(ctx)
})
}
51 changes: 30 additions & 21 deletions prow/config/agent.go
Expand Up @@ -40,46 +40,54 @@ type Agent struct {
subscriptions []DeltaChan
}

func lastConfigModTime(prowConfig, jobConfig string) (time.Time, error) {
// Check if the file changed to see if it needs to be re-read.
// os.Stat follows symbolic links, which is how ConfigMaps work.
prowStat, err := os.Stat(prowConfig)
if err != nil {
logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
return time.Time{}, err
}
recentModTime := prowStat.ModTime()
// TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
jobConfigStat, err := os.Stat(jobConfig)
if err != nil {
logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
return time.Time{}, err
}

if jobConfigStat.ModTime().After(recentModTime) {
recentModTime = jobConfigStat.ModTime()
}
}
return recentModTime, nil
}

// Start will begin polling the config file at the path. If the first load
// fails, Start will return the error and abort. Future load failures will log
// the failure message but continue attempting to load.
func (ca *Agent) Start(prowConfig, jobConfig string) error {
lastModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
lastModTime = time.Time{}
}
c, err := Load(prowConfig, jobConfig)
if err != nil {
return err
}
ca.Set(c)
go func() {
var lastModTime time.Time
// Rarely, if two changes happen in the same second, mtime will
// be the same for the second change, and an mtime-based check would
// fail. Reload periodically just in case.
skips := 0
for range time.Tick(1 * time.Second) {
if skips < 600 {
// Check if the file changed to see if it needs to be re-read.
// os.Stat follows symbolic links, which is how ConfigMaps work.
prowStat, err := os.Stat(prowConfig)
recentModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
continue
}

recentModTime := prowStat.ModTime()

// TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
jobConfigStat, err := os.Stat(jobConfig)
if err != nil {
logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
continue
}

if jobConfigStat.ModTime().After(recentModTime) {
recentModTime = jobConfigStat.ModTime()
}
}

if !recentModTime.After(lastModTime) {
skips++
continue // file hasn't been modified
Expand Down Expand Up @@ -120,6 +128,7 @@ func (ca *Agent) Config() *Config {
}

// Set sets the config. Useful for testing.
// Also used by statusreconciler to load last known config
func (ca *Agent) Set(c *Config) {
ca.mut.Lock()
defer ca.mut.Unlock()
Expand Down
11 changes: 10 additions & 1 deletion prow/statusreconciler/BUILD.bazel
Expand Up @@ -5,11 +5,13 @@ go_library(
srcs = [
"controller.go",
"doc.go",
"status.go",
],
importpath = "k8s.io/test-infra/prow/statusreconciler",
visibility = ["//visibility:public"],
deps = [
"//maintenance/migratestatus/migrator:go_default_library",
"//pkg/io:go_default_library",
"//prow/client/clientset/versioned/typed/prowjobs/v1:go_default_library",
"//prow/config:go_default_library",
"//prow/errorutil:go_default_library",
Expand All @@ -19,16 +21,23 @@ go_library(
"//prow/plugins/trigger:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_apimachinery//pkg/util/sets:go_default_library",
"@io_k8s_sigs_yaml//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["controller_test.go"],
srcs = [
"controller_test.go",
"status_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/io:go_default_library",
"//prow/config:go_default_library",
"//prow/github:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_apimachinery//pkg/util/diff:go_default_library",
"@io_k8s_apimachinery//pkg/util/sets:go_default_library",
"@io_k8s_sigs_yaml//:go_default_library",
Expand Down
28 changes: 23 additions & 5 deletions prow/statusreconciler/controller.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/test-infra/pkg/io"
prowv1 "k8s.io/test-infra/prow/client/clientset/versioned/typed/prowjobs/v1"
"k8s.io/test-infra/prow/pjutil"

Expand All @@ -36,14 +37,22 @@ import (
)

// NewController constructs a new controller to reconcile stauses on config change
func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, prowJobClient prowv1.ProwJobInterface, githubClient github.Client, configAgent *config.Agent, pluginAgent *plugins.ConfigAgent) *Controller {
func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, opener io.Opener, configPath, jobConfigPath, statusURI string, prowJobClient prowv1.ProwJobInterface, githubClient github.Client, pluginAgent *plugins.ConfigAgent) *Controller {
sc := &statusController{
logger: logrus.WithField("client", "statusController"),
opener: opener,
statusURI: statusURI,
configPath: configPath,
jobConfigPath: jobConfigPath,
}

return &Controller{
continueOnError: continueOnError,
addedPresubmitBlacklist: addedPresubmitBlacklist,
prowJobTriggerer: &kubeProwJobTriggerer{
prowJobClient: prowJobClient,
githubClient: githubClient,
configAgent: configAgent,
configGetter: sc.Config,
pluginAgent: pluginAgent,
},
githubClient: githubClient,
Expand All @@ -55,6 +64,7 @@ func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, pr
githubClient: githubClient,
pluginAgent: pluginAgent,
},
statusClient: sc,
}
}

Expand Down Expand Up @@ -89,7 +99,7 @@ type prowJobTriggerer interface {
type kubeProwJobTriggerer struct {
prowJobClient prowv1.ProwJobInterface
githubClient github.Client
configAgent *config.Agent
configGetter config.Getter
pluginAgent *plugins.ConfigAgent
}

Expand All @@ -103,7 +113,7 @@ func (t *kubeProwJobTriggerer) runAndSkip(pr *github.PullRequest, requestedJobs,
trigger.Client{
GitHubClient: t.githubClient,
ProwJobClient: t.prowJobClient,
Config: t.configAgent.Config(),
Config: t.configGetter(),
Logger: logrus.WithField("client", "trigger"),
},
pr, baseSHA, requestedJobs, skippedJobs, "none", *t.pluginAgent.Config().TriggerFor(org, repo).ElideSkippedContexts,
Expand Down Expand Up @@ -141,11 +151,18 @@ type Controller struct {
githubClient githubClient
statusMigrator statusMigrator
trustedChecker trustedChecker
statusClient statusClient
}

// Run monitors the incoming configuration changes to determine when statuses need to be
// reconciled on PRs in flight when blocking presubmits change
func (c *Controller) Run(ctx context.Context, changes <-chan config.Delta) {
func (c *Controller) Run(ctx context.Context) {
changes, err := c.statusClient.Load()
if err != nil {
logrus.WithError(err).Error("Error loading saved status.")
return
}

for {
select {
case change := <-changes:
Expand All @@ -154,6 +171,7 @@ func (c *Controller) Run(ctx context.Context, changes <-chan config.Delta) {
logrus.WithError(err).Error("Error reconciling statuses.")
}
logrus.WithField("duration", fmt.Sprintf("%v", time.Since(start))).Info("Statuses reconciled")
c.statusClient.Save()
case <-ctx.Done():
logrus.Info("status-reconciler is shutting down...")
return
Expand Down