Skip to content

Commit

Permalink
Merge pull request #15585 from adshmh/13084-statusreconciler-persist-…
Browse files Browse the repository at this point in the history
…state

Persist state in statusreconciler
  • Loading branch information
k8s-ci-robot committed Dec 13, 2019
2 parents 4b77d97 + 2bd9505 commit 014dcb6
Show file tree
Hide file tree
Showing 7 changed files with 510 additions and 32 deletions.
1 change: 1 addition & 0 deletions prow/cmd/status-reconciler/BUILD.bazel
Expand Up @@ -21,6 +21,7 @@ go_library(
visibility = ["//visibility:private"],
deps = [
"//pkg/flagutil:go_default_library",
"//pkg/io:go_default_library",
"//prow/config:go_default_library",
"//prow/config/secret:go_default_library",
"//prow/flagutil:go_default_library",
Expand Down
31 changes: 26 additions & 5 deletions prow/cmd/status-reconciler/main.go
Expand Up @@ -20,15 +20,17 @@ import (
"context"
"flag"
"os"
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/test-infra/prow/interrupts"

"k8s.io/test-infra/pkg/flagutil"
"k8s.io/test-infra/pkg/io"
"k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/config/secret"
prowflagutil "k8s.io/test-infra/prow/flagutil"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/logrusutil"
"k8s.io/test-infra/prow/pjutil"
"k8s.io/test-infra/prow/plugins"
Expand All @@ -53,6 +55,15 @@ type options struct {

tokenBurst int
tokensPerHour int

// The following are used for reading/writing to GCS.
gcsCredentialsFile string
// statusURI where Status-reconciler stores last known state, i.e. configuration.
// Can be a /local/path or gs://path/to/object.
// GCS writes will use the bucket's default acl for new objects. Ensure both that
// a) the gcs credentials can write to this bucket
// b) the default acls do not expose any private info
statusURI string
}

func gatherOptions() options {
Expand All @@ -62,6 +73,7 @@ func gatherOptions() options {
fs.StringVar(&o.configPath, "config-path", "/etc/config/config.yaml", "Path to config.yaml.")
fs.StringVar(&o.jobConfigPath, "job-config-path", "", "Path to prow job configs.")
fs.StringVar(&o.pluginConfig, "plugin-config", "/etc/plugins/plugins.yaml", "Path to plugin config file.")
fs.StringVar(&o.statusURI, "status-path", "", "The /local/path or gs://path/to/object to store status controller state. GCS writes will use the default object ACL for the bucket.")

fs.BoolVar(&o.continueOnError, "continue-on-error", false, "Indicates that the migration should continue if context migration fails for an individual PR.")
fs.Var(&o.addedPresubmitBlacklist, "blacklist", "Org or org/repo to ignore new added presubmits for, set more than once to add more.")
Expand Down Expand Up @@ -102,8 +114,6 @@ func main() {
if err := configAgent.Start(o.configPath, o.jobConfigPath); err != nil {
logrus.WithError(err).Fatal("Error starting config agent.")
}
changes := make(chan config.Delta)
configAgent.Subscribe(changes)

secretAgent := &secret.Agent{}
if o.github.TokenPath != "" {
Expand All @@ -130,8 +140,19 @@ func main() {
logrus.WithError(err).Fatal("Error getting kube client.")
}

c := statusreconciler.NewController(o.continueOnError, sets.NewString(o.addedPresubmitBlacklist.Strings()...), prowJobClient, githubClient, configAgent, pluginAgent)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
opener, err := io.NewOpener(ctx, o.gcsCredentialsFile)
if err != nil {
entry := logrus.WithError(err)
if p := o.gcsCredentialsFile; p != "" {
entry = entry.WithField("gcs-credentials-file", p)
}
entry.Fatal("Cannot create opener")
}

c := statusreconciler.NewController(o.continueOnError, sets.NewString(o.addedPresubmitBlacklist.Strings()...), opener, o.configPath, o.jobConfigPath, o.statusURI, prowJobClient, githubClient, pluginAgent)
interrupts.Run(func(ctx context.Context) {
c.Run(ctx, changes)
c.Run(ctx)
})
}
51 changes: 30 additions & 21 deletions prow/config/agent.go
Expand Up @@ -40,46 +40,54 @@ type Agent struct {
subscriptions []DeltaChan
}

func lastConfigModTime(prowConfig, jobConfig string) (time.Time, error) {
// Check if the file changed to see if it needs to be re-read.
// os.Stat follows symbolic links, which is how ConfigMaps work.
prowStat, err := os.Stat(prowConfig)
if err != nil {
logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
return time.Time{}, err
}
recentModTime := prowStat.ModTime()
// TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
jobConfigStat, err := os.Stat(jobConfig)
if err != nil {
logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
return time.Time{}, err
}

if jobConfigStat.ModTime().After(recentModTime) {
recentModTime = jobConfigStat.ModTime()
}
}
return recentModTime, nil
}

// Start will begin polling the config file at the path. If the first load
// fails, Start will return the error and abort. Future load failures will log
// the failure message but continue attempting to load.
func (ca *Agent) Start(prowConfig, jobConfig string) error {
lastModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
lastModTime = time.Time{}
}
c, err := Load(prowConfig, jobConfig)
if err != nil {
return err
}
ca.Set(c)
go func() {
var lastModTime time.Time
// Rarely, if two changes happen in the same second, mtime will
// be the same for the second change, and an mtime-based check would
// fail. Reload periodically just in case.
skips := 0
for range time.Tick(1 * time.Second) {
if skips < 600 {
// Check if the file changed to see if it needs to be re-read.
// os.Stat follows symbolic links, which is how ConfigMaps work.
prowStat, err := os.Stat(prowConfig)
recentModTime, err := lastConfigModTime(prowConfig, jobConfig)
if err != nil {
logrus.WithField("prowConfig", prowConfig).WithError(err).Error("Error loading prow config.")
continue
}

recentModTime := prowStat.ModTime()

// TODO(krzyzacy): allow empty jobConfig till fully migrate config to subdirs
if jobConfig != "" {
jobConfigStat, err := os.Stat(jobConfig)
if err != nil {
logrus.WithField("jobConfig", jobConfig).WithError(err).Error("Error loading job configs.")
continue
}

if jobConfigStat.ModTime().After(recentModTime) {
recentModTime = jobConfigStat.ModTime()
}
}

if !recentModTime.After(lastModTime) {
skips++
continue // file hasn't been modified
Expand Down Expand Up @@ -120,6 +128,7 @@ func (ca *Agent) Config() *Config {
}

// Set sets the config. Useful for testing.
// Also used by statusreconciler to load last known config
func (ca *Agent) Set(c *Config) {
ca.mut.Lock()
defer ca.mut.Unlock()
Expand Down
11 changes: 10 additions & 1 deletion prow/statusreconciler/BUILD.bazel
Expand Up @@ -5,11 +5,13 @@ go_library(
srcs = [
"controller.go",
"doc.go",
"status.go",
],
importpath = "k8s.io/test-infra/prow/statusreconciler",
visibility = ["//visibility:public"],
deps = [
"//maintenance/migratestatus/migrator:go_default_library",
"//pkg/io:go_default_library",
"//prow/client/clientset/versioned/typed/prowjobs/v1:go_default_library",
"//prow/config:go_default_library",
"//prow/errorutil:go_default_library",
Expand All @@ -19,16 +21,23 @@ go_library(
"//prow/plugins/trigger:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_apimachinery//pkg/util/sets:go_default_library",
"@io_k8s_sigs_yaml//:go_default_library",
],
)

go_test(
name = "go_default_test",
srcs = ["controller_test.go"],
srcs = [
"controller_test.go",
"status_test.go",
],
embed = [":go_default_library"],
deps = [
"//pkg/io:go_default_library",
"//prow/config:go_default_library",
"//prow/github:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_k8s_api//core/v1:go_default_library",
"@io_k8s_apimachinery//pkg/util/diff:go_default_library",
"@io_k8s_apimachinery//pkg/util/sets:go_default_library",
"@io_k8s_sigs_yaml//:go_default_library",
Expand Down
28 changes: 23 additions & 5 deletions prow/statusreconciler/controller.go
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/test-infra/pkg/io"
prowv1 "k8s.io/test-infra/prow/client/clientset/versioned/typed/prowjobs/v1"
"k8s.io/test-infra/prow/pjutil"

Expand All @@ -36,14 +37,22 @@ import (
)

// NewController constructs a new controller to reconcile stauses on config change
func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, prowJobClient prowv1.ProwJobInterface, githubClient github.Client, configAgent *config.Agent, pluginAgent *plugins.ConfigAgent) *Controller {
func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, opener io.Opener, configPath, jobConfigPath, statusURI string, prowJobClient prowv1.ProwJobInterface, githubClient github.Client, pluginAgent *plugins.ConfigAgent) *Controller {
sc := &statusController{
logger: logrus.WithField("client", "statusController"),
opener: opener,
statusURI: statusURI,
configPath: configPath,
jobConfigPath: jobConfigPath,
}

return &Controller{
continueOnError: continueOnError,
addedPresubmitBlacklist: addedPresubmitBlacklist,
prowJobTriggerer: &kubeProwJobTriggerer{
prowJobClient: prowJobClient,
githubClient: githubClient,
configAgent: configAgent,
configGetter: sc.Config,
pluginAgent: pluginAgent,
},
githubClient: githubClient,
Expand All @@ -55,6 +64,7 @@ func NewController(continueOnError bool, addedPresubmitBlacklist sets.String, pr
githubClient: githubClient,
pluginAgent: pluginAgent,
},
statusClient: sc,
}
}

Expand Down Expand Up @@ -89,7 +99,7 @@ type prowJobTriggerer interface {
type kubeProwJobTriggerer struct {
prowJobClient prowv1.ProwJobInterface
githubClient github.Client
configAgent *config.Agent
configGetter config.Getter
pluginAgent *plugins.ConfigAgent
}

Expand All @@ -103,7 +113,7 @@ func (t *kubeProwJobTriggerer) runAndSkip(pr *github.PullRequest, requestedJobs,
trigger.Client{
GitHubClient: t.githubClient,
ProwJobClient: t.prowJobClient,
Config: t.configAgent.Config(),
Config: t.configGetter(),
Logger: logrus.WithField("client", "trigger"),
},
pr, baseSHA, requestedJobs, skippedJobs, "none", *t.pluginAgent.Config().TriggerFor(org, repo).ElideSkippedContexts,
Expand Down Expand Up @@ -141,11 +151,18 @@ type Controller struct {
githubClient githubClient
statusMigrator statusMigrator
trustedChecker trustedChecker
statusClient statusClient
}

// Run monitors the incoming configuration changes to determine when statuses need to be
// reconciled on PRs in flight when blocking presubmits change
func (c *Controller) Run(ctx context.Context, changes <-chan config.Delta) {
func (c *Controller) Run(ctx context.Context) {
changes, err := c.statusClient.Load()
if err != nil {
logrus.WithError(err).Error("Error loading saved status.")
return
}

for {
select {
case change := <-changes:
Expand All @@ -154,6 +171,7 @@ func (c *Controller) Run(ctx context.Context, changes <-chan config.Delta) {
logrus.WithError(err).Error("Error reconciling statuses.")
}
logrus.WithField("duration", fmt.Sprintf("%v", time.Since(start))).Info("Statuses reconciled")
c.statusClient.Save()
case <-ctx.Done():
logrus.Info("status-reconciler is shutting down...")
return
Expand Down

0 comments on commit 014dcb6

Please sign in to comment.