From 0fb314e5ecfedefe7753ba02c2bcf95c697b49e4 Mon Sep 17 00:00:00 2001 From: Brad Williams Date: Thu, 25 Feb 2021 23:08:07 -0500 Subject: [PATCH] Adding a commandline graph pruner Extracting the load/save logic into their own functions Updating commandline options for pruning graph Code review changes --- cmd/release-controller/main.go | 17 ++- cmd/release-controller/upgrades.go | 213 ++++++++++++++++++++++++----- 2 files changed, 190 insertions(+), 40 deletions(-) diff --git a/cmd/release-controller/main.go b/cmd/release-controller/main.go index 1c9002992..e8863facd 100644 --- a/cmd/release-controller/main.go +++ b/cmd/release-controller/main.go @@ -89,6 +89,10 @@ type options struct { softDeleteReleaseTags bool ReleaseArchitecture string + + PruneGraph bool + PrintPrunedGraph string + ConfirmPruneGraph bool } func main() { @@ -106,6 +110,8 @@ func main() { ListenAddr: ":8080", ToolsImageStreamTag: ":tests", + + PrintPrunedGraph: PruneGraphPrintSecret, } cmd := &cobra.Command{ Run: func(cmd *cobra.Command, arguments []string) { @@ -156,6 +162,10 @@ func main() { flagset.StringVar(&opt.ReleaseArchitecture, "release-architecture", opt.ReleaseArchitecture, "The architecture of the releases to be created (defaults to 'amd64' if not specified).") + flagset.BoolVar(&opt.PruneGraph, "prune-graph", opt.PruneGraph, "Reads the upgrade graph, prunes edges, and prints the result") + flagset.StringVar(&opt.PrintPrunedGraph, "print-pruned-graph", opt.PrintPrunedGraph, "Print the result of pruning the graph. Valid options are: <|secret|debug>. The default, 'secret', is the base64 encoded secret payload. The 'debug' option will pretty print the json payload") + flagset.BoolVar(&opt.ConfirmPruneGraph, "confirm-prune-graph", opt.ConfirmPruneGraph, "Persist the pruned graph") + goFlagSet := flag.NewFlagSet("prowflags", flag.ContinueOnError) opt.github.AddFlags(goFlagSet) opt.bugzilla.AddFlags(goFlagSet) @@ -175,7 +185,6 @@ func (o *options) Run() error { if o.validateConfigs != "" { return validateConfigs(o.validateConfigs) } - tagParts := strings.Split(o.ToolsImageStreamTag, ":") if len(tagParts) != 2 || len(tagParts[1]) == 0 { return fmt.Errorf("--tools-image-stream-tag must be STREAM:TAG or :TAG (default STREAM is the oldest release stream)") @@ -196,7 +205,9 @@ func (o *options) Run() error { if len(o.ReleaseArchitecture) > 0 { architecture = o.ReleaseArchitecture } - + if len(o.PrintPrunedGraph) > 0 && o.PrintPrunedGraph != PruneGraphPrintSecret && o.PrintPrunedGraph != PruneGraphPrintDebug { + return fmt.Errorf("--print-prune-graph must be \"%s\" or \"%s\"", PruneGraphPrintSecret, PruneGraphPrintDebug) + } inClusterCfg, err := loadClusterConfig() if err != nil { return fmt.Errorf("failed to load incluster config: %w", err) @@ -461,6 +472,8 @@ func (o *options) Run() error { cache.WaitForCacheSync(stopCh, hasSynced...) switch { + case o.PruneGraph: + return c.pruneGraph(releasesClient.CoreV1().Secrets(releaseNamespace), o.ReleaseNamespaces[0], "release-upgrade-graph", o.PrintPrunedGraph, o.ConfirmPruneGraph) case o.DryRun: klog.Infof("Dry run mode (no changes will be made)") diff --git a/cmd/release-controller/upgrades.go b/cmd/release-controller/upgrades.go index 53fbdc420..0eb5a4451 100644 --- a/cmd/release-controller/upgrades.go +++ b/cmd/release-controller/upgrades.go @@ -4,8 +4,11 @@ import ( "bytes" "compress/gzip" "context" + "encoding/base64" "encoding/json" + "fmt" "io" + "k8s.io/apimachinery/pkg/labels" "sort" "sync" "time" @@ -248,23 +251,7 @@ func (g *UpgradeGraph) Records() []UpgradeRecord { } func (g *UpgradeGraph) Save(w io.Writer) error { - records := g.Records() - - // put the records into a stable order - sort.Slice(records, func(i, j int) bool { - a, b := records[i], records[j] - if a.To == b.To { - return a.From < b.From - } - return a.To < b.To - }) - for _, record := range records { - sort.Slice(record.Results, func(i, j int) bool { - return record.Results[i].URL < record.Results[j].URL - }) - } - - data, err := json.Marshal(records) + data, err := json.Marshal(g.orderedRecords()) if err != nil { return err } @@ -295,6 +282,24 @@ func (g *UpgradeGraph) Load(r io.Reader) error { } func syncGraphToSecret(graph *UpgradeGraph, update bool, secretClient kv1core.SecretInterface, ns, name string, stopCh <-chan struct{}) { + loadUpgradeGraph(graph, secretClient, ns, name, stopCh) + + if !update { + return + } + + // wait a bit of time to let any other loops load what they can + time.Sleep(15 * time.Second) + + // keep the secret up to date + buf := &bytes.Buffer{} + wait.Until(func() { + buf.Reset() + saveUpgradeGraph(buf, graph, secretClient, ns, name) + }, 5*time.Minute, stopCh) +} + +func loadUpgradeGraph(graph *UpgradeGraph, secretClient kv1core.SecretInterface, ns, name string, stopCh <-chan struct{}) { // read initial state wait.PollImmediateUntil(5*time.Second, func() (bool, error) { secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{}) @@ -317,34 +322,166 @@ func syncGraphToSecret(graph *UpgradeGraph, update bool, secretClient kv1core.Se } return true, nil }, stopCh) +} - if !update { +func saveUpgradeGraph(buf *bytes.Buffer, graph *UpgradeGraph, secretClient kv1core.SecretInterface, ns, name string) { + if err := graph.Save(buf); err != nil { + klog.Errorf("Unable to calculate graph state: %v", err) + return + } + secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + klog.Errorf("Can't read latest secret %s/%s: %v", ns, name, err) return } + if secret.Data == nil { + secret.Data = make(map[string][]byte) + } + secret.Data["latest"] = buf.Bytes() + if _, err := secretClient.Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { + klog.Errorf("Can't save state to secret %s/%s: %v", ns, name, err) + } + klog.V(2).Infof("Saved upgrade graph state to %s/%s", ns, name) +} - // wait a bit of time to let any other loops load what they can - time.Sleep(15 * time.Second) +func (g *UpgradeGraph) orderedRecords() []UpgradeRecord { + records := g.Records() + // put the records into a stable order + sort.Slice(records, func(i, j int) bool { + a, b := records[i], records[j] + if a.To == b.To { + return a.From < b.From + } + return a.To < b.To + }) + for _, record := range records { + sort.Slice(record.Results, func(i, j int) bool { + return record.Results[i].URL < record.Results[j].URL + }) + } + return records +} - // keep the secret up to date - buf := &bytes.Buffer{} - wait.Until(func() { - buf.Reset() - if err := graph.Save(buf); err != nil { - klog.Errorf("Unable to calculate graph state: %v", err) - return +const ( + PruneGraphPrintSecret = "secret" + PruneGraphPrintDebug = "debug" +) + +func (c *Controller) pruneGraph(secretClient kv1core.SecretInterface, ns, name string, printOption string, confirm bool) error { + stopCh := wait.NeverStop + + loadUpgradeGraph(c.graph, secretClient, ns, name, stopCh) + + imageStreams, err := c.releaseLister.ImageStreams(ns).List(labels.Everything()) + if err != nil { + return err + } + + var stableTagList []string + + for _, imageStream := range imageStreams { + r, ok, err := c.releaseDefinition(imageStream) + if err != nil || !ok { + continue } - secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - klog.Errorf("Can't read latest secret %s/%s: %v", ns, name, err) - return + if r.Config.Name == "4-stable" { + for _, tag := range imageStream.Spec.Tags { + stableTagList = append(stableTagList, tag.Name) + } + } + } + + // To tags that are not present in the stable tags + toMissingList := make([]string, 0, len(c.graph.to)) + for tag := range c.graph.to { + if ! stringInSlice(tag, stableTagList) { + toMissingList = append(toMissingList, tag) } - if secret.Data == nil { - secret.Data = make(map[string][]byte) + } + + // From tags that are not present in the stable tags + fromMissingList := make([]string, 0, len(c.graph.from)) + for tag := range c.graph.from { + if ! stringInSlice(tag, stableTagList) { + fromMissingList = append(fromMissingList, tag) } - secret.Data["latest"] = buf.Bytes() - if _, err := secretClient.Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil { - klog.Errorf("Can't save state to secret %s/%s: %v", ns, name, err) + } + + // To tags that are not in the From tags + pruneTagList := make([]string, 0, len(c.graph.to)) + for _, tag := range toMissingList { + if ! stringInSlice(tag, fromMissingList) { + pruneTagList = append(pruneTagList, tag) } - klog.V(2).Infof("Saved upgrade graph state to %s/%s", ns, name) - }, 5*time.Minute, stopCh) + } + + // From tags that are not in the To tags + for _, tag := range fromMissingList { + if ! stringInSlice(tag, toMissingList) { + pruneTagList = append(pruneTagList, tag) + } + } + + klog.V(2).Infof("Pruning %d/%d tags from release controller graph\n", len(pruneTagList), len(c.graph.to)) + + // Prune graph + c.graph.lock.Lock() + for _, toTag := range pruneTagList { + for fromTag := range c.graph.to[toTag] { + c.graph.removeWithLock(fromTag, toTag) + } + } + c.graph.lock.Unlock() + + if confirm { + buf := &bytes.Buffer{} + saveUpgradeGraph(buf, c.graph, secretClient, ns, name) + } else { + switch printOption { + case PruneGraphPrintDebug: + c.graph.prettyPrint() + case PruneGraphPrintSecret: + c.graph.printSecretPayload() + } + } + + return nil +} + +func (g *UpgradeGraph) removeWithLock(fromTag, toTag string) { + delete(g.to[toTag], fromTag) + if len(g.to[toTag]) == 0 { + delete(g.to, toTag) + } + g.from[fromTag].Delete(toTag) + if g.from[fromTag].Len() == 0 { + delete(g.from, fromTag) + } +} + +func (g *UpgradeGraph) prettyPrint() { + json, err := json.MarshalIndent(g.orderedRecords(), "", " ") + if err != nil { + klog.V(1).Infof("Unable to marshal graph: %v", err) + } + fmt.Printf("%s\n", json) +} + +func (g *UpgradeGraph) printSecretPayload() { + buf := &bytes.Buffer{} + if err := g.Save(buf); err != nil { + klog.Errorf("Unable to calculate graph state: %v", err) + return + } + str := base64.StdEncoding.EncodeToString(buf.Bytes()) + fmt.Println(str) +} + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false }