Skip to content

Commit

Permalink
Adding a commandline graph pruner
Browse files Browse the repository at this point in the history
Extracting the load/save logic into their own functions

Updating commandline options for pruning graph

Code review changes

Adding items that are in both the To and From lists
  • Loading branch information
bradmwilliams committed Mar 2, 2021
1 parent f1d3175 commit 79f31e9
Show file tree
Hide file tree
Showing 2 changed files with 197 additions and 40 deletions.
17 changes: 15 additions & 2 deletions cmd/release-controller/main.go
Expand Up @@ -89,6 +89,10 @@ type options struct {
softDeleteReleaseTags bool

ReleaseArchitecture string

PruneGraph bool
PrintPrunedGraph string
ConfirmPruneGraph bool
}

func main() {
Expand All @@ -106,6 +110,8 @@ func main() {
ListenAddr: ":8080",

ToolsImageStreamTag: ":tests",

PrintPrunedGraph: PruneGraphPrintSecret,
}
cmd := &cobra.Command{
Run: func(cmd *cobra.Command, arguments []string) {
Expand Down Expand Up @@ -156,6 +162,10 @@ func main() {

flagset.StringVar(&opt.ReleaseArchitecture, "release-architecture", opt.ReleaseArchitecture, "The architecture of the releases to be created (defaults to 'amd64' if not specified).")

flagset.BoolVar(&opt.PruneGraph, "prune-graph", opt.PruneGraph, "Reads the upgrade graph, prunes edges, and prints the result")
flagset.StringVar(&opt.PrintPrunedGraph, "print-pruned-graph", opt.PrintPrunedGraph, "Print the result of pruning the graph. Valid options are: <|secret|debug>. The default, 'secret', is the base64 encoded secret payload. The 'debug' option will pretty print the json payload")
flagset.BoolVar(&opt.ConfirmPruneGraph, "confirm-prune-graph", opt.ConfirmPruneGraph, "Persist the pruned graph")

goFlagSet := flag.NewFlagSet("prowflags", flag.ContinueOnError)
opt.github.AddFlags(goFlagSet)
opt.bugzilla.AddFlags(goFlagSet)
Expand All @@ -175,7 +185,6 @@ func (o *options) Run() error {
if o.validateConfigs != "" {
return validateConfigs(o.validateConfigs)
}

tagParts := strings.Split(o.ToolsImageStreamTag, ":")
if len(tagParts) != 2 || len(tagParts[1]) == 0 {
return fmt.Errorf("--tools-image-stream-tag must be STREAM:TAG or :TAG (default STREAM is the oldest release stream)")
Expand All @@ -196,7 +205,9 @@ func (o *options) Run() error {
if len(o.ReleaseArchitecture) > 0 {
architecture = o.ReleaseArchitecture
}

if len(o.PrintPrunedGraph) > 0 && o.PrintPrunedGraph != PruneGraphPrintSecret && o.PrintPrunedGraph != PruneGraphPrintDebug {
return fmt.Errorf("--print-prune-graph must be \"%s\" or \"%s\"", PruneGraphPrintSecret, PruneGraphPrintDebug)
}
inClusterCfg, err := loadClusterConfig()
if err != nil {
return fmt.Errorf("failed to load incluster config: %w", err)
Expand Down Expand Up @@ -461,6 +472,8 @@ func (o *options) Run() error {
cache.WaitForCacheSync(stopCh, hasSynced...)

switch {
case o.PruneGraph:
return c.pruneGraph(releasesClient.CoreV1().Secrets(releaseNamespace), o.ReleaseNamespaces[0], "release-upgrade-graph", o.PrintPrunedGraph, o.ConfirmPruneGraph)
case o.DryRun:
klog.Infof("Dry run mode (no changes will be made)")

Expand Down
220 changes: 182 additions & 38 deletions cmd/release-controller/upgrades.go
Expand Up @@ -4,8 +4,11 @@ import (
"bytes"
"compress/gzip"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"k8s.io/apimachinery/pkg/labels"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -248,23 +251,7 @@ func (g *UpgradeGraph) Records() []UpgradeRecord {
}

func (g *UpgradeGraph) Save(w io.Writer) error {
records := g.Records()

// put the records into a stable order
sort.Slice(records, func(i, j int) bool {
a, b := records[i], records[j]
if a.To == b.To {
return a.From < b.From
}
return a.To < b.To
})
for _, record := range records {
sort.Slice(record.Results, func(i, j int) bool {
return record.Results[i].URL < record.Results[j].URL
})
}

data, err := json.Marshal(records)
data, err := json.Marshal(g.orderedRecords())
if err != nil {
return err
}
Expand Down Expand Up @@ -295,6 +282,24 @@ func (g *UpgradeGraph) Load(r io.Reader) error {
}

func syncGraphToSecret(graph *UpgradeGraph, update bool, secretClient kv1core.SecretInterface, ns, name string, stopCh <-chan struct{}) {
loadUpgradeGraph(graph, secretClient, ns, name, stopCh)

if !update {
return
}

// wait a bit of time to let any other loops load what they can
time.Sleep(15 * time.Second)

// keep the secret up to date
buf := &bytes.Buffer{}
wait.Until(func() {
buf.Reset()
saveUpgradeGraph(buf, graph, secretClient, ns, name)
}, 5*time.Minute, stopCh)
}

func loadUpgradeGraph(graph *UpgradeGraph, secretClient kv1core.SecretInterface, ns, name string, stopCh <-chan struct{}) {
// read initial state
wait.PollImmediateUntil(5*time.Second, func() (bool, error) {
secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{})
Expand All @@ -317,34 +322,173 @@ func syncGraphToSecret(graph *UpgradeGraph, update bool, secretClient kv1core.Se
}
return true, nil
}, stopCh)
}

if !update {
func saveUpgradeGraph(buf *bytes.Buffer, graph *UpgradeGraph, secretClient kv1core.SecretInterface, ns, name string) {
if err := graph.Save(buf); err != nil {
klog.Errorf("Unable to calculate graph state: %v", err)
return
}
secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Can't read latest secret %s/%s: %v", ns, name, err)
return
}
if secret.Data == nil {
secret.Data = make(map[string][]byte)
}
secret.Data["latest"] = buf.Bytes()
if _, err := secretClient.Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Can't save state to secret %s/%s: %v", ns, name, err)
}
klog.V(2).Infof("Saved upgrade graph state to %s/%s", ns, name)
}

// wait a bit of time to let any other loops load what they can
time.Sleep(15 * time.Second)
func (g *UpgradeGraph) orderedRecords() []UpgradeRecord {
records := g.Records()
// put the records into a stable order
sort.Slice(records, func(i, j int) bool {
a, b := records[i], records[j]
if a.To == b.To {
return a.From < b.From
}
return a.To < b.To
})
for _, record := range records {
sort.Slice(record.Results, func(i, j int) bool {
return record.Results[i].URL < record.Results[j].URL
})
}
return records
}

// keep the secret up to date
buf := &bytes.Buffer{}
wait.Until(func() {
buf.Reset()
if err := graph.Save(buf); err != nil {
klog.Errorf("Unable to calculate graph state: %v", err)
return
const (
PruneGraphPrintSecret = "secret"
PruneGraphPrintDebug = "debug"
)

func (c *Controller) pruneGraph(secretClient kv1core.SecretInterface, ns, name string, printOption string, confirm bool) error {
stopCh := wait.NeverStop

loadUpgradeGraph(c.graph, secretClient, ns, name, stopCh)

imageStreams, err := c.releaseLister.ImageStreams(ns).List(labels.Everything())
if err != nil {
return err
}

var stableTagList []string

for _, imageStream := range imageStreams {
r, ok, err := c.releaseDefinition(imageStream)
if err != nil || !ok {
continue
}
secret, err := secretClient.Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
klog.Errorf("Can't read latest secret %s/%s: %v", ns, name, err)
return
if r.Config.As == releaseConfigModeStable {
for _, tag := range imageStream.Spec.Tags {
stableTagList = append(stableTagList, tag.Name)
}
}
}

// To tags that are not present in the stable tags
toMissingList := make([]string, 0, len(c.graph.to))
for tag := range c.graph.to {
if ! stringInSlice(tag, stableTagList) {
toMissingList = append(toMissingList, tag)
}
}

// From tags that are not present in the stable tags
fromMissingList := make([]string, 0, len(c.graph.from))
for tag := range c.graph.from {
if ! stringInSlice(tag, stableTagList) {
fromMissingList = append(fromMissingList, tag)
}
if secret.Data == nil {
secret.Data = make(map[string][]byte)
}

// To tags that are not in the From tags
pruneTagList := make([]string, 0, len(c.graph.to))
for _, tag := range toMissingList {
if ! stringInSlice(tag, fromMissingList) {
pruneTagList = append(pruneTagList, tag)
}
secret.Data["latest"] = buf.Bytes()
if _, err := secretClient.Update(context.TODO(), secret, metav1.UpdateOptions{}); err != nil {
klog.Errorf("Can't save state to secret %s/%s: %v", ns, name, err)
}

// From tags that are not in the To tags
for _, tag := range fromMissingList {
if ! stringInSlice(tag, toMissingList) {
pruneTagList = append(pruneTagList, tag)
}
klog.V(2).Infof("Saved upgrade graph state to %s/%s", ns, name)
}, 5*time.Minute, stopCh)
}

// Tags that are in both the From and To lists
for _, tag := range toMissingList {
if stringInSlice(tag, fromMissingList) {
pruneTagList = append(pruneTagList, tag)
}
}

klog.V(2).Infof("Pruning %d/%d tags from release controller graph\n", len(pruneTagList), len(c.graph.to))

// Prune graph
c.graph.lock.Lock()
for _, toTag := range pruneTagList {
for fromTag := range c.graph.to[toTag] {
c.graph.removeWithLock(fromTag, toTag)
}
}
c.graph.lock.Unlock()

if confirm {
buf := &bytes.Buffer{}
saveUpgradeGraph(buf, c.graph, secretClient, ns, name)
} else {
switch printOption {
case PruneGraphPrintDebug:
c.graph.prettyPrint()
case PruneGraphPrintSecret:
c.graph.printSecretPayload()
}
}

return nil
}

func (g *UpgradeGraph) removeWithLock(fromTag, toTag string) {
delete(g.to[toTag], fromTag)
if len(g.to[toTag]) == 0 {
delete(g.to, toTag)
}
g.from[fromTag].Delete(toTag)
if g.from[fromTag].Len() == 0 {
delete(g.from, fromTag)
}
}

func (g *UpgradeGraph) prettyPrint() {
json, err := json.MarshalIndent(g.orderedRecords(), "", " ")
if err != nil {
klog.V(1).Infof("Unable to marshal graph: %v", err)
}
fmt.Printf("%s\n", json)
}

func (g *UpgradeGraph) printSecretPayload() {
buf := &bytes.Buffer{}
if err := g.Save(buf); err != nil {
klog.Errorf("Unable to calculate graph state: %v", err)
return
}
str := base64.StdEncoding.EncodeToString(buf.Bytes())
fmt.Println(str)
}

func stringInSlice(a string, list []string) bool {
for _, b := range list {
if b == a {
return true
}
}
return false
}

0 comments on commit 79f31e9

Please sign in to comment.