From 1b01f908e3ada686024ad144637ea0dd0ab64762 Mon Sep 17 00:00:00 2001 From: James Shubin Date: Sun, 20 Mar 2016 01:17:35 -0400 Subject: [PATCH] Add resource auto grouping Sorry for the size of this patch, I was busy hacking and plumbing away and it got out of hand! I'm allowing this because there doesn't seem to be anyone hacking away on parts of the code that this would break, since the resource code is fairly stable in this change. In particular, it revisits and refreshes some areas of the code that didn't see anything new or innovative since the project first started. I've gotten rid of a lot of cruft, and in particular cleaned up some things that I didn't know how to do better before! Here's hoping I'll continue to learn and have more to improve upon in the future! (Well let's not hope _too_ hard though!) The logical goal of this patch was to make logical grouping of resources possible. For example, it might be more efficient to group three package installations into a single transaction, instead of having to run three separate transactions. This is because a package installation typically has an initial one-time per run cost which shouldn't need to be repeated. Another future goal would be to group file resources sharing a common base path under a common recursive fanotify watcher. Since this depends on fanotify capabilities first, this hasn't been implemented yet, but could be a useful method of reducing the number of separate watches needed, since there is a finite limit. It's worth mentioning that grouping resources typically _reduces_ the parallel execution capability of a particular graph, but depending on the cost/benefit tradeoff, this might be preferential. I'd submit it's almost universally beneficial for pkg resources. This monster patch includes: * the autogroup feature * the grouping interface * a placeholder algorithm * an extensive test case infrastructure to test grouping algorithms * a move of some base resource methods into pgraph refactoring * some config/compile clean ups to remove code duplication * b64 encoding/decoding improvements * a rename of the yaml "res" entries to "kind" (more logical) * some docs * small fixes * and more! --- DOCUMENTATION.md | 19 +- config.go | 492 ++++++++++++++++-------- configwatch.go | 2 +- etcd.go | 10 +- examples/etcd1a.yaml | 2 +- examples/etcd1b.yaml | 2 +- examples/etcd1c.yaml | 2 +- examples/exec1.yaml | 8 +- examples/exec1a.yaml | 4 +- examples/exec1b.yaml | 4 +- examples/exec1c.yaml | 4 +- examples/exec2.yaml | 16 +- examples/file1.yaml | 8 +- examples/graph0.yaml | 4 +- examples/graph10.yaml | 24 +- examples/graph1a.yaml | 4 +- examples/graph1b.yaml | 4 +- examples/graph3a.yaml | 2 +- examples/graph3b.yaml | 2 +- examples/graph3c.yaml | 2 +- examples/graph4.yaml | 2 +- examples/graph5.yaml | 2 +- examples/graph9.yaml | 12 +- examples/svc1.yaml | 8 +- exec.go | 11 +- file.go | 16 +- main.go | 33 +- misc.go | 41 +- misc_test.go | 55 +-- noop.go | 9 +- pgraph.go | 321 ++++++++++++++-- pgraph_test.go | 858 +++++++++++++++++++++++++++++++++++++++++- pkg.go | 9 +- resources.go | 186 +++------ resources_test.go | 105 ++++++ svc.go | 11 +- 36 files changed, 1785 insertions(+), 509 deletions(-) create mode 100644 resources_test.go diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index d6260fb3c..549c710ef 100644 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -32,6 +32,7 @@ along with this program. If not, see . 3. [Setup - Getting started with mgmt](#setup) 4. [Features - All things mgmt can do](#features) * [Autoedges - Automatic resource relationships](#autoedges) + * [Autogrouping - Automatic resource grouping](#autogrouping) 5. [Usage/FAQ - Notes on usage and frequently asked questions](#usage-and-frequently-asked-questions) 6. [Reference - Detailed reference](#reference) * [Graph definition file](#graph-definition-file) @@ -94,8 +95,22 @@ order to handle this situation you can disable autoedges per resource and explicitly declare that you want `my.cnf` to be written to disk before the installation of the `mysql-server` package. -You can disable autoedges for a resource by setting the `autoedge` key for -the meta attributes of a resource to `false`. +You can disable autoedges for a resource by setting the `autoedge` key on +the meta attributes of that resource to `false`. + +###Autogrouping + +Automatic grouping or AutoGroup is the mechanism in mgmt by which it will +automatically group multiple resource vertices into a single one. This is +particularly useful for grouping multiple package resources into a single +resource, since the multiple installations can happen together in a single +transaction, which saves a lot of time because package resources typically have +a large fixed cost to running (downloading and verifying the package repo) and +if they are grouped they share this fixed cost. This grouping feature can be +used for other use cases too. + +You can disable autogrouping for a resource by setting the `autogroup` key on +the meta attributes of that resource to `false`. ##Usage and frequently asked questions (Send your questions as a patch to this FAQ! I'll review it, merge it, and diff --git a/config.go b/config.go index 3465fe7dc..fe42b785a 100644 --- a/config.go +++ b/config.go @@ -23,16 +23,17 @@ import ( "gopkg.in/yaml.v2" "io/ioutil" "log" + "reflect" "strings" ) type collectorResConfig struct { - Res string `yaml:"res"` + Kind string `yaml:"kind"` Pattern string `yaml:"pattern"` // XXX: Not Implemented } type vertexConfig struct { - Res string `yaml:"res"` + Kind string `yaml:"kind"` Name string `yaml:"name"` } @@ -82,105 +83,79 @@ func ParseConfigFromFile(filename string) *GraphConfig { return &config } -// XXX: we need to fix this function so that it either fails without modifying -// the graph, passes successfully and modifies it, or basically panics i guess -// this way an invalid compilation can leave the old graph running, and we we -// don't modify a partial graph. so we really need to validate, and then perform -// whatever actions are necessary -// finding some way to do this on a copy of the graph, and then do a graph diff -// and merge the new data into the old graph would be more appropriate, in -// particular if we can ensure the graph merge can't fail. As for the putting -// of stuff into etcd, we should probably store the operations to complete in -// the new graph, and keep retrying until it succeeds, thus blocking any new -// etcd operations until that time. -func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO *EtcdWObject) bool { - - var NoopMap = make(map[string]*Vertex) - var PkgMap = make(map[string]*Vertex) - var FileMap = make(map[string]*Vertex) - var SvcMap = make(map[string]*Vertex) - var ExecMap = make(map[string]*Vertex) +// NewGraphFromConfig returns a new graph from existing input, such as from the +// existing graph, and a GraphConfig struct. +func (g *Graph) NewGraphFromConfig(config *GraphConfig, etcdO *EtcdWObject, hostname string) (*Graph, error) { + + var graph *Graph // new graph to return + if g == nil { // FIXME: how can we check for an empty graph? + graph = NewGraph("Graph") // give graph a default name + } else { + graph = g.Copy() // same vertices, since they're pointers! + } var lookup = make(map[string]map[string]*Vertex) - lookup["noop"] = NoopMap - lookup["pkg"] = PkgMap - lookup["file"] = FileMap - lookup["svc"] = SvcMap - lookup["exec"] = ExecMap //log.Printf("%+v", config) // debug - g.SetName(config.Graph) // set graph name + // TODO: if defined (somehow)... + graph.SetName(config.Graph) // set graph name var keep []*Vertex // list of vertex which are the same in new graph - for _, obj := range config.Resources.Noop { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge - } - NoopMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - - for _, obj := range config.Resources.Pkg { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + // use reflection to avoid duplicating code... better options welcome! + value := reflect.Indirect(reflect.ValueOf(config.Resources)) + vtype := value.Type() + for i := 0; i < vtype.NumField(); i++ { // number of fields in struct + name := vtype.Field(i).Name // string of field name + field := value.FieldByName(name) + iface := field.Interface() // interface type of value + slice := reflect.ValueOf(iface) + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := FirstToUpper(name) + if DEBUG { + log.Printf("Config: Processing: %v...", kind) } - PkgMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - - for _, obj := range config.Resources.File { - // XXX: should we export based on a @@ prefix, or a metaparam - // like exported => true || exported => (host pattern)||(other pattern?) - if strings.HasPrefix(obj.Name, "@@") { // exported resource - // add to etcd storage... - obj.Name = obj.Name[2:] //slice off @@ - if !etcdO.EtcdPut(hostname, obj.Name, "file", obj) { - log.Printf("Problem exporting file resource %v.", obj.Name) - continue + for j := 0; j < slice.Len(); j++ { // loop through resources of same kind + x := slice.Index(j).Interface() + obj, ok := x.(Res) // convert to Res type + if !ok { + return nil, fmt.Errorf("Error: Config: Can't convert: %v of type: %T to Res.", x, x) } - } else { - // XXX: we don't have a way of knowing if any of the - // metaparams are undefined, and as a result to set the - // defaults that we want! I hate the go yaml parser!!! - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*Vertex) } - FileMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } - } + // XXX: should we export based on a @@ prefix, or a metaparam + // like exported => true || exported => (host pattern)||(other pattern?) + if !strings.HasPrefix(obj.GetName(), "@@") { // exported resource + // XXX: we don't have a way of knowing if any of the + // metaparams are undefined, and as a result to set the + // defaults that we want! I hate the go yaml parser!!! + v := graph.GetVertexMatch(obj) + if v == nil { // no match found + obj.Init() + v = NewVertex(obj) + graph.AddVertex(v) // call standalone in case not part of an edge + } + lookup[kind][obj.GetName()] = v // used for constructing edges + keep = append(keep, v) // append - for _, obj := range config.Resources.Svc { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge - } - SvcMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append - } + } else { + // XXX: do this in a different function... + // add to etcd storage... + obj.SetName(obj.GetName()[2:]) //slice off @@ + + data, err := ResToB64(obj) + if err != nil { + return nil, fmt.Errorf("Config: Could not encode %v resource: %v, error: %v", kind, obj.GetName(), err) + } - for _, obj := range config.Resources.Exec { - v := g.GetVertexMatch(obj) - if v == nil { // no match found - obj.Init() - v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + if !etcdO.EtcdPut(hostname, obj.GetName(), kind, data) { + return nil, fmt.Errorf("Config: Could not export %v resource: %v", kind, obj.GetName()) + } + } } - ExecMap[obj.Name] = v // used for constructing edges - keep = append(keep, v) // append } // lookup from etcd graph @@ -189,61 +164,111 @@ func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO nodes, ok := etcdO.EtcdGet() if ok { for _, t := range config.Collector { - // XXX: use t.Res and optionally t.Pattern to collect from etcd storage - log.Printf("Collect: %v; Pattern: %v", t.Res, t.Pattern) - - for _, x := range etcdO.EtcdGetProcess(nodes, "file") { - var obj *FileRes - if B64ToObj(x, &obj) != true { - log.Printf("Collect: File: %v not collected!", x) + // XXX: should we just drop these everywhere and have the kind strings be all lowercase? + kind := FirstToUpper(t.Kind) + + // use t.Kind and optionally t.Pattern to collect from etcd storage + log.Printf("Collect: %v; Pattern: %v", kind, t.Pattern) + for _, str := range etcdO.EtcdGetProcess(nodes, kind) { + obj, err := B64ToRes(str) + if err != nil { + log.Printf("B64ToRes failed to decode: %v", err) + log.Printf("Collect: %v: not collected!", kind) continue } - if t.Pattern != "" { // XXX: currently the pattern for files can only override the Dirname variable :P - obj.Dirname = t.Pattern + + if t.Pattern != "" { // XXX: simplistic for now + obj.CollectPattern(t.Pattern) // obj.Dirname = t.Pattern } - log.Printf("Collect: File: %v collected!", obj.GetName()) + log.Printf("Collect: %v[%v]: collected!", kind, obj.GetName()) - // XXX: similar to file add code: - v := g.GetVertexMatch(obj) + // XXX: similar to other resource add code: + if _, exists := lookup[kind]; !exists { + lookup[kind] = make(map[string]*Vertex) + } + v := graph.GetVertexMatch(obj) if v == nil { // no match found obj.Init() // initialize go channels or things won't work!!! v = NewVertex(obj) - g.AddVertex(v) // call standalone in case not part of an edge + graph.AddVertex(v) // call standalone in case not part of an edge } - FileMap[obj.GetName()] = v // used for constructing edges - keep = append(keep, v) // append - + lookup[kind][obj.GetName()] = v // used for constructing edges + keep = append(keep, v) // append } } } // get rid of any vertices we shouldn't "keep" (that aren't in new graph) - for _, v := range g.GetVertices() { - if !HasVertex(v, keep) { + for _, v := range graph.GetVertices() { + if !VertexContains(v, keep) { // wait for exit before starting new graph! - v.Res.SendEvent(eventExit, true, false) - g.DeleteVertex(v) + v.SendEvent(eventExit, true, false) + graph.DeleteVertex(v) } } for _, e := range config.Edges { - if _, ok := lookup[e.From.Res]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.From.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'from' resource!") } - if _, ok := lookup[e.To.Res]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.To.Kind)]; !ok { + return nil, fmt.Errorf("Can't find 'to' resource!") } - if _, ok := lookup[e.From.Res][e.From.Name]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.From.Kind)][e.From.Name]; !ok { + return nil, fmt.Errorf("Can't find 'from' name!") } - if _, ok := lookup[e.To.Res][e.To.Name]; !ok { - return false + if _, ok := lookup[FirstToUpper(e.To.Kind)][e.To.Name]; !ok { + return nil, fmt.Errorf("Can't find 'to' name!") } - g.AddEdge(lookup[e.From.Res][e.From.Name], lookup[e.To.Res][e.To.Name], NewEdge(e.Name)) + graph.AddEdge(lookup[FirstToUpper(e.From.Kind)][e.From.Name], lookup[FirstToUpper(e.To.Kind)][e.To.Name], NewEdge(e.Name)) } - // add auto edges + return graph, nil +} + +// add edges to the vertex in a graph based on if it matches a uuid list +func (g *Graph) addEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { + // search for edges and see what matches! + var result []bool + + // loop through each uuid, and see if it matches any vertex + for _, uuid := range uuids { + var found = false + // uuid is a ResUUID object + for _, vv := range g.GetVertices() { // search + if v == vv { // skip self + continue + } + if DEBUG { + log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName()) + } + // we must match to an effective UUID for the resource, + // that is to say, the name value of a res is a helpful + // handle, but it is not necessarily a unique identity! + // remember, resources can return multiple UUID's each! + if UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) { + // add edge from: vv -> v + if uuid.Reversed() { + txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName()) + log.Printf("Compile: Adding %v", txt) + g.AddEdge(vv, v, NewEdge(txt)) + } else { // edges go the "normal" way, eg: pkg resource + txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName()) + log.Printf("Compile: Adding %v", txt) + g.AddEdge(v, vv, NewEdge(txt)) + } + found = true + break + } + } + result = append(result, found) + } + return result +} + +// add auto edges to graph +func (g *Graph) AutoEdges() { log.Println("Compile: Adding AutoEdges...") for _, v := range g.GetVertices() { // for each vertexes autoedges if !v.GetMeta().AutoEdge { // is the metaparam true? @@ -269,7 +294,7 @@ func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO } // match and add edges - result := g.AddEdgesByMatchingUUIDS(v, uuids) + result := g.addEdgesByMatchingUUIDS(v, uuids) // report back, and find out if we should continue if !autoEdgeObj.Test(result) { @@ -277,48 +302,211 @@ func UpdateGraphFromConfig(config *GraphConfig, hostname string, g *Graph, etcdO } } } +} - return true +// AutoGrouper is the required interface to implement for an autogroup algorithm +type AutoGrouper interface { + // listed in the order these are typically called in... + name() string // friendly identifier + init(*Graph) error // only call once + vertexNext() (*Vertex, *Vertex, error) // mostly algorithmic + vertexCmp(*Vertex, *Vertex) error // can we merge these ? + vertexMerge(*Vertex, *Vertex) (*Vertex, error) // vertex merge fn to use + edgeMerge(*Edge, *Edge) *Edge // edge merge fn to use + vertexTest(bool) (bool, error) // call until false } -// add edges to the vertex in a graph based on if it matches a uuid list -func (g *Graph) AddEdgesByMatchingUUIDS(v *Vertex, uuids []ResUUID) []bool { - // search for edges and see what matches! - var result []bool +// baseGrouper is the base type for implementing the AutoGrouper interface +type baseGrouper struct { + graph *Graph // store a pointer to the graph + vertices []*Vertex // cached list of vertices + i int + j int + done bool +} - // loop through each uuid, and see if it matches any vertex - for _, uuid := range uuids { - var found = false - // uuid is a ResUUID object - for _, vv := range g.GetVertices() { // search - if v == vv { // skip self - continue +// name provides a friendly name for the logs to see +func (ag *baseGrouper) name() string { + return "baseGrouper" +} + +// init is called only once and before using other AutoGrouper interface methods +// the name method is the only exception: call it any time without side effects! +func (ag *baseGrouper) init(g *Graph) error { + if ag.graph != nil { + return fmt.Errorf("The init method has already been called!") + } + ag.graph = g // pointer + ag.vertices = ag.graph.GetVertices() // cache + ag.i = 0 + ag.j = 0 + if len(ag.vertices) == 0 { // empty graph + ag.done = true + return nil + } + return nil +} + +// vertexNext is a simple iterator that loops through vertex (pair) combinations +// an intelligent algorithm would selectively offer only valid pairs of vertices +// these should satisfy logical grouping requirements for the autogroup designs! +// the desired algorithms can override, but keep this method as a base iterator! +func (ag *baseGrouper) vertexNext() (v1, v2 *Vertex, err error) { + // this does a for v... { for w... { return v, w }} but stepwise! + l := len(ag.vertices) + if ag.i < l { + v1 = ag.vertices[ag.i] + } + if ag.j < l { + v2 = ag.vertices[ag.j] + } + + // in case the vertex was deleted + if !ag.graph.HasVertex(v1) { + v1 = nil + } + if !ag.graph.HasVertex(v2) { + v2 = nil + } + + // two nested loops... + if ag.j < l { + ag.j++ + } + if ag.j == l { + ag.j = 0 + if ag.i < l { + ag.i++ + } + if ag.i == l { + ag.done = true + } + } + + return +} + +func (ag *baseGrouper) vertexCmp(v1, v2 *Vertex) error { + if v1 == nil || v2 == nil { + return fmt.Errorf("Vertex is nil!") + } + if v1 == v2 { // skip yourself + return fmt.Errorf("Vertices are the same!") + } + if v1.Kind() != v2.Kind() { // we must group similar kinds + // TODO: maybe future resources won't need this limitation? + return fmt.Errorf("The two resources aren't the same kind!") + } + // someone doesn't want to group! + if !v1.GetMeta().AutoGroup || !v2.GetMeta().AutoGroup { + return fmt.Errorf("One of the autogroup flags is false!") + } + if v1.Res.IsGrouped() { // already grouped! + return fmt.Errorf("Already grouped!") + } + if len(v2.Res.GetGroup()) > 0 { // already has children grouped! + return fmt.Errorf("Already has groups!") + } + if !v1.Res.GroupCmp(v2.Res) { // resource groupcmp failed! + return fmt.Errorf("The GroupCmp failed!") + } + return nil // success +} + +func (ag *baseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { + // NOTE: it's important to use w.Res instead of w, b/c + // the w by itself is the *Vertex obj, not the *Res obj + // which is contained within it! They both satisfy the + // Res interface, which is why both will compile! :( + err = v1.Res.GroupRes(v2.Res) // GroupRes skips stupid groupings + return // success or fail, and no need to merge the actual vertices! +} + +func (ag *baseGrouper) edgeMerge(e1, e2 *Edge) *Edge { + return e1 // noop +} + +// vertexTest processes the results of the grouping for the algorithm to know +// return an error if something went horribly wrong, and bool false to stop +func (ag *baseGrouper) vertexTest(b bool) (bool, error) { + // NOTE: this particular baseGrouper version doesn't track what happens + // because since we iterate over every pair, we don't care which merge! + if ag.done { + return false, nil + } + return true, nil +} + +type algorithmNameGrouper struct { // XXX rename me! + baseGrouper // "inherit" what we want, and reimplement the rest +} + +func (ag *algorithmNameGrouper) name() string { + log.Fatal("Not implemented!") // XXX + return "algorithmNameGrouper" +} + +func (ag *algorithmNameGrouper) vertexNext() (v1, v2 *Vertex, err error) { + log.Fatal("Not implemented!") // XXX + // NOTE: you can even build this like this: + //v1, v2, err = ag.baseGrouper.vertexNext() // get all iterable pairs + // ... + //ag.baseGrouper.vertexTest(...) + //return + return nil, nil, fmt.Errorf("Not implemented!") +} + +// autoGroup is the mechanical auto group "runner" that runs the interface spec +func (g *Graph) autoGroup(ag AutoGrouper) chan string { + strch := make(chan string) // output log messages here + go func(strch chan string) { + strch <- fmt.Sprintf("Compile: Grouping: Algorithm: %v...", ag.name()) + if err := ag.init(g); err != nil { + log.Fatalf("Error running autoGroup(init): %v", err) + } + + for { + var v, w *Vertex + v, w, err := ag.vertexNext() // get pair to compare + if err != nil { + log.Fatalf("Error running autoGroup(vertexNext): %v", err) } - if DEBUG { - log.Printf("Compile: AutoEdge: Match: %v[%v] with UUID: %v[%v]", vv.Kind(), vv.GetName(), uuid.Kind(), uuid.GetName()) + merged := false + // save names since they change during the runs + vStr := fmt.Sprintf("%s", v) // valid even if it is nil + wStr := fmt.Sprintf("%s", w) + + if err := ag.vertexCmp(v, w); err != nil { // cmp ? + strch <- fmt.Sprintf("Compile: Grouping: !GroupCmp for: %s into %s", wStr, vStr) + + // remove grouped vertex and merge edges (res is safe) + } else if err := g.VertexMerge(v, w, ag.vertexMerge, ag.edgeMerge); err != nil { // merge... + strch <- fmt.Sprintf("Compile: Grouping: !VertexMerge for: %s into %s", wStr, vStr) + + } else { // success! + strch <- fmt.Sprintf("Compile: Grouping: Success for: %s into %s", wStr, vStr) + merged = true // woo } - // we must match to an effective UUID for the resource, - // that is to say, the name value of a res is a helpful - // handle, but it is not necessarily a unique identity! - // remember, resources can return multiple UUID's each! - if UUIDExistsInUUIDs(uuid, vv.GetUUIDs()) { - // add edge from: vv -> v - if uuid.Reversed() { - txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", vv.Kind(), vv.GetName(), v.Kind(), v.GetName()) - log.Printf("Compile: Adding %v", txt) - g.AddEdge(vv, v, NewEdge(txt)) - } else { // edges go the "normal" way, eg: pkg resource - txt := fmt.Sprintf("AutoEdge: %v[%v] -> %v[%v]", v.Kind(), v.GetName(), vv.Kind(), vv.GetName()) - log.Printf("Compile: Adding %v", txt) - g.AddEdge(v, vv, NewEdge(txt)) - } - found = true - break + + // did these get used? + if ok, err := ag.vertexTest(merged); err != nil { + log.Fatalf("Error running autoGroup(vertexTest): %v", err) + } else if !ok { + break // done! } } - result = append(result, found) - } + close(strch) + return + }(strch) // call function + return strch +} - return result +// AutoGroup runs the auto grouping on the graph and prints out log messages +func (g *Graph) AutoGroup() { + // receive log messages from channel... + // this allows test cases to avoid printing them when they're unwanted! + for str := range g.autoGroup(&baseGrouper{}) { + log.Println(str) + } } diff --git a/configwatch.go b/configwatch.go index 307498ce0..804005ce5 100644 --- a/configwatch.go +++ b/configwatch.go @@ -138,7 +138,7 @@ func ConfigWatch(file string) chan bool { } case err := <-watcher.Errors: - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) } diff --git a/etcd.go b/etcd.go index b929fce0c..2e7f347d7 100644 --- a/etcd.go +++ b/etcd.go @@ -207,20 +207,14 @@ func (etcdO *EtcdWObject) EtcdWatch() chan etcdMsg { } // helper function to store our data in etcd -func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, obj interface{}) bool { +func (etcdO *EtcdWObject) EtcdPut(hostname, key, res string, data string) bool { kapi := etcdO.GetKAPI() - output, ok := ObjToB64(obj) - if !ok { - log.Printf("Etcd: Could not encode %v key.", key) - return false - } - path := fmt.Sprintf("/exported/%s/resources/%s/res", hostname, key) _, err := kapi.Set(etcd_context.Background(), path, res, nil) // XXX validate... path = fmt.Sprintf("/exported/%s/resources/%s/value", hostname, key) - resp, err := kapi.Set(etcd_context.Background(), path, output, nil) + resp, err := kapi.Set(etcd_context.Background(), path, data, nil) if err != nil { if cerr, ok := err.(*etcd.ClusterError); ok { // not running or disconnected diff --git a/examples/etcd1a.yaml b/examples/etcd1a.yaml index 6dd739367..46378d3c4 100644 --- a/examples/etcd1a.yaml +++ b/examples/etcd1a.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host A state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtA/" edges: [] diff --git a/examples/etcd1b.yaml b/examples/etcd1b.yaml index c232acb4e..ae491180c 100644 --- a/examples/etcd1b.yaml +++ b/examples/etcd1b.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host B state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtB/" edges: [] diff --git a/examples/etcd1c.yaml b/examples/etcd1c.yaml index 80184451e..2b9bd8cc5 100644 --- a/examples/etcd1c.yaml +++ b/examples/etcd1c.yaml @@ -13,6 +13,6 @@ resources: i am f2, exported from host C state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtC/" edges: [] diff --git a/examples/exec1.yaml b/examples/exec1.yaml index ab2f6227a..adb0bfd66 100644 --- a/examples/exec1.yaml +++ b/examples/exec1.yaml @@ -45,15 +45,15 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec3 diff --git a/examples/exec1a.yaml b/examples/exec1a.yaml index f9d3c1f1d..4896429e2 100644 --- a/examples/exec1a.yaml +++ b/examples/exec1a.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec1b.yaml b/examples/exec1b.yaml index 34a0f943f..04f8e0021 100644 --- a/examples/exec1b.yaml +++ b/examples/exec1b.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec1c.yaml b/examples/exec1c.yaml index 59992a061..942dcd42f 100644 --- a/examples/exec1c.yaml +++ b/examples/exec1c.yaml @@ -25,8 +25,8 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 diff --git a/examples/exec2.yaml b/examples/exec2.yaml index 567988b7f..bedec63dc 100644 --- a/examples/exec2.yaml +++ b/examples/exec2.yaml @@ -55,29 +55,29 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec2 - name: e2 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec3 - name: e3 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec4 - name: e4 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec4 diff --git a/examples/file1.yaml b/examples/file1.yaml index 1dd94c9a8..f69394eea 100644 --- a/examples/file1.yaml +++ b/examples/file1.yaml @@ -27,15 +27,15 @@ resources: edges: - name: e1 from: - res: file + kind: file name: file1 to: - res: file + kind: file name: file2 - name: e2 from: - res: file + kind: file name: file2 to: - res: file + kind: file name: file3 diff --git a/examples/graph0.yaml b/examples/graph0.yaml index 07acecc69..231d0051e 100644 --- a/examples/graph0.yaml +++ b/examples/graph0.yaml @@ -13,8 +13,8 @@ resources: edges: - name: e1 from: - res: noop + kind: noop name: noop1 to: - res: file + kind: file name: file1 diff --git a/examples/graph10.yaml b/examples/graph10.yaml index 41a6cac8c..fc7c7dfd4 100644 --- a/examples/graph10.yaml +++ b/examples/graph10.yaml @@ -86,43 +86,43 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec4 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec4 - name: e3 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec4 - name: e4 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec5 - name: e5 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec6 - name: e6 from: - res: exec + kind: exec name: exec4 to: - res: exec + kind: exec name: exec7 diff --git a/examples/graph1a.yaml b/examples/graph1a.yaml index b17302f96..20421b356 100644 --- a/examples/graph1a.yaml +++ b/examples/graph1a.yaml @@ -15,8 +15,8 @@ resources: edges: - name: e1 from: - res: file + kind: file name: file1 to: - res: file + kind: file name: file2 diff --git a/examples/graph1b.yaml b/examples/graph1b.yaml index 4dbd872dd..efedc8004 100644 --- a/examples/graph1b.yaml +++ b/examples/graph1b.yaml @@ -15,8 +15,8 @@ resources: edges: - name: e2 from: - res: file + kind: file name: file2 to: - res: file + kind: file name: file3 diff --git a/examples/graph3a.yaml b/examples/graph3a.yaml index 4e0906ca0..4f32f24b9 100644 --- a/examples/graph3a.yaml +++ b/examples/graph3a.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host A state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtA/" edges: [] diff --git a/examples/graph3b.yaml b/examples/graph3b.yaml index d7a670f98..f3616497e 100644 --- a/examples/graph3b.yaml +++ b/examples/graph3b.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host B state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtB/" edges: [] diff --git a/examples/graph3c.yaml b/examples/graph3c.yaml index db60ffee7..82050ab75 100644 --- a/examples/graph3c.yaml +++ b/examples/graph3c.yaml @@ -23,6 +23,6 @@ resources: i am f4, exported from host C state: exists collect: -- res: file +- kind: file pattern: "/tmp/mgmtC/" edges: [] diff --git a/examples/graph4.yaml b/examples/graph4.yaml index 941a2c650..d147cc3c7 100644 --- a/examples/graph4.yaml +++ b/examples/graph4.yaml @@ -13,6 +13,6 @@ resources: i am f3, exported from host A state: exists collect: -- res: file +- kind: file pattern: '' edges: diff --git a/examples/graph5.yaml b/examples/graph5.yaml index 736e4ddea..5cec004d2 100644 --- a/examples/graph5.yaml +++ b/examples/graph5.yaml @@ -8,6 +8,6 @@ resources: i am f1 state: exists collect: -- res: file +- kind: file pattern: '' edges: diff --git a/examples/graph9.yaml b/examples/graph9.yaml index 9a4dc6bee..0e1954932 100644 --- a/examples/graph9.yaml +++ b/examples/graph9.yaml @@ -56,22 +56,22 @@ resources: edges: - name: e1 from: - res: exec + kind: exec name: exec1 to: - res: exec + kind: exec name: exec5 - name: e2 from: - res: exec + kind: exec name: exec2 to: - res: exec + kind: exec name: exec5 - name: e3 from: - res: exec + kind: exec name: exec3 to: - res: exec + kind: exec name: exec5 diff --git a/examples/svc1.yaml b/examples/svc1.yaml index 9fdd6ced6..11e2a733a 100644 --- a/examples/svc1.yaml +++ b/examples/svc1.yaml @@ -16,15 +16,15 @@ resources: edges: - name: e1 from: - res: noop + kind: noop name: noop1 to: - res: file + kind: file name: file1 - name: e2 from: - res: file + kind: file name: file1 to: - res: svc + kind: svc name: purpleidea diff --git a/exec.go b/exec.go index 368edbba3..2f7c15286 100644 --- a/exec.go +++ b/exec.go @@ -20,12 +20,17 @@ package main import ( "bufio" "bytes" + "encoding/gob" "errors" "log" "os/exec" "strings" ) +func init() { + gob.Register(&ExecRes{}) +} + type ExecRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: exists/present?, absent, (undefined?) @@ -97,7 +102,7 @@ func (obj *ExecRes) BufioChanScanner(scanner *bufio.Scanner) (chan string, chan } // Exec watcher -func (obj *ExecRes) Watch() { +func (obj *ExecRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -187,8 +192,8 @@ func (obj *ExecRes) Watch() { if send { send = false // it is okay to invalidate the clean state on poke too - obj.isStateOK = false // something made state dirty - Process(obj) // XXX: rename this function + obj.isStateOK = false // something made state dirty + processChan <- struct{}{} // trigger process } } } diff --git a/file.go b/file.go index d56a3e8d3..bb8c015b1 100644 --- a/file.go +++ b/file.go @@ -22,6 +22,7 @@ import ( "encoding/hex" "gopkg.in/fsnotify.v1" //"github.com/go-fsnotify/fsnotify" // git master of "gopkg.in/fsnotify.v1" + "encoding/gob" "io" "log" "math" @@ -31,6 +32,10 @@ import ( "syscall" ) +func init() { + gob.Register(&FileRes{}) +} + type FileRes struct { BaseRes `yaml:",inline"` Path string `yaml:"path"` // path variable (should default to name) @@ -97,7 +102,7 @@ func (obj *FileRes) Validate() bool { // File watcher for files and directories // Modify with caution, probably important to write some test cases first! // obj.GetPath(): file or directory -func (obj *FileRes) Watch() { +func (obj *FileRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -230,7 +235,7 @@ func (obj *FileRes) Watch() { case err := <-watcher.Errors: obj.SetConvergedState(resConvergedNil) // XXX ? - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) //obj.events <- fmt.Sprintf("file: %v", "error") // XXX: how should we handle errors? @@ -255,7 +260,7 @@ func (obj *FileRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } @@ -488,3 +493,8 @@ func (obj *FileRes) Compare(res Res) bool { } return true } + +func (obj *FileRes) CollectPattern(pattern string) { + // XXX: currently the pattern for files can only override the Dirname variable :P + obj.Dirname = pattern // XXX: simplistic for now +} diff --git a/main.go b/main.go index fc06b05e7..ec9eb03d4 100644 --- a/main.go +++ b/main.go @@ -63,7 +63,7 @@ func run(c *cli.Context) { converged := make(chan bool) // converged signal log.Printf("This is: %v, version: %v", program, version) log.Printf("Main: Start: %v", start) - G := NewGraph("Graph") // give graph a default name + var G, fullGraph *Graph // exit after `max-runtime` seconds for no reason at all... if i := c.Int("max-runtime"); i > 0 { @@ -102,10 +102,11 @@ func run(c *cli.Context) { if !c.Bool("no-watch") { configchan = ConfigWatch(file) } - log.Printf("Etcd: Starting...") + log.Println("Etcd: Starting...") etcdchan := etcdO.EtcdWatch() first := true // first loop or not for { + log.Println("Main: Waiting...") select { case _ = <-startchan: // kick the loop once at start // pass @@ -134,17 +135,29 @@ func run(c *cli.Context) { } // run graph vertex LOCK... - if !first { // XXX: we can flatten this check out I think - log.Printf("State: %v -> %v", G.SetState(graphPausing), G.GetState()) + if !first { // TODO: we can flatten this check out I think G.Pause() // sync - log.Printf("State: %v -> %v", G.SetState(graphPaused), G.GetState()) } - // build the graph from a config file - // build the graph on events (eg: from etcd) - if !UpdateGraphFromConfig(config, hostname, G, etcdO) { - log.Fatal("Config: We borked the graph.") // XXX + // build graph from yaml file on events (eg: from etcd) + // we need the vertices to be paused to work on them + if newFullgraph, err := fullGraph.NewGraphFromConfig(config, etcdO, hostname); err == nil { // keep references to all original elements + fullGraph = newFullgraph + } else { + log.Printf("Config: Error making new graph from config: %v", err) + // unpause! + if !first { + G.Start(&wg, first) // sync + } + continue } + + G = fullGraph.Copy() // copy to active graph + // XXX: do etcd transaction out here... + G.AutoEdges() // add autoedges; modifies the graph + //G.AutoGroup() // run autogroup; modifies the graph // TODO + // TODO: do we want to do a transitive reduction? + log.Printf("Graph: %v", G) // show graph err := G.ExecGraphviz(c.String("graphviz-filter"), c.String("graphviz")) if err != nil { @@ -159,9 +172,7 @@ func run(c *cli.Context) { // some are not ready yet and the EtcdWatch // loops, we'll cause G.Pause(...) before we // even got going, thus causing nil pointer errors - log.Printf("State: %v -> %v", G.SetState(graphStarting), G.GetState()) G.Start(&wg, first) // sync - log.Printf("State: %v -> %v", G.SetState(graphStarted), G.GetState()) first = false } }() diff --git a/misc.go b/misc.go index 8962f7b5d..1f94ec6da 100644 --- a/misc.go +++ b/misc.go @@ -18,9 +18,6 @@ package main import ( - "bytes" - "encoding/base64" - "encoding/gob" "github.com/godbus/dbus" "path" "sort" @@ -28,6 +25,11 @@ import ( "time" ) +// returns the string with the first character capitalized +func FirstToUpper(str string) string { + return strings.ToUpper(str[0:1]) + str[1:] +} + // return true if a string exists inside a list, otherwise false func StrInList(needle string, haystack []string) bool { for _, x := range haystack { @@ -136,6 +138,9 @@ func Dirname(p string) string { func Basename(p string) string { _, b := path.Split(path.Clean(p)) + if p == "" { + return "" + } if p[len(p)-1:] == "/" { // don't loose the tail slash b += "/" } @@ -265,36 +270,6 @@ func DirifyFileList(fileList []string, removeDirs bool) []string { return result } -// encode an object as base 64, serialize and then base64 encode -func ObjToB64(obj interface{}) (string, bool) { - b := bytes.Buffer{} - e := gob.NewEncoder(&b) - err := e.Encode(obj) - if err != nil { - //log.Println("Gob failed to Encode: ", err) - return "", false - } - return base64.StdEncoding.EncodeToString(b.Bytes()), true -} - -// TODO: is it possible to somehow generically just return the obj? -// decode an object into the waiting obj which you pass a reference to -func B64ToObj(str string, obj interface{}) bool { - bb, err := base64.StdEncoding.DecodeString(str) - if err != nil { - //log.Println("Base64 failed to Decode: ", err) - return false - } - b := bytes.NewBuffer(bb) - d := gob.NewDecoder(b) - err = d.Decode(obj) - if err != nil { - //log.Println("Gob failed to Decode: ", err) - return false - } - return true -} - // special version of time.After that blocks when given a negative integer // when used in a case statement, the timer restarts on each select call to it func TimeAfterOrBlock(t int) <-chan time.Time { diff --git a/misc_test.go b/misc_test.go index 990e643bf..ab4841936 100644 --- a/misc_test.go +++ b/misc_test.go @@ -18,7 +18,6 @@ package main import ( - "fmt" "reflect" "sort" "testing" @@ -58,6 +57,9 @@ func TestMiscT1(t *testing.T) { t.Errorf("Result is incorrect.") } + if Basename("") != "" { // TODO: should this equal something different? + t.Errorf("Result is incorrect.") + } } func TestMiscT2(t *testing.T) { @@ -169,57 +171,6 @@ func TestMiscT5(t *testing.T) { } } -func TestMiscT6(t *testing.T) { - - type foo struct { - Name string `yaml:"name"` - Res string `yaml:"res"` - Value int `yaml:"value"` - } - - obj := foo{"dude", "sweet", 42} - output, ok := ObjToB64(obj) - if ok != true { - t.Errorf("First result should be true.") - } - var data foo - if B64ToObj(output, &data) != true { - t.Errorf("Second result should be true.") - } - // TODO: there is probably a better way to compare these two... - if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { - t.Errorf("Strings should match.") - } -} - -func TestMiscT7(t *testing.T) { - - type Foo struct { - Name string `yaml:"name"` - Res string `yaml:"res"` - Value int `yaml:"value"` - } - - type bar struct { - Foo `yaml:",inline"` // anonymous struct must be public! - Comment string `yaml:"comment"` - } - - obj := bar{Foo{"dude", "sweet", 42}, "hello world"} - output, ok := ObjToB64(obj) - if ok != true { - t.Errorf("First result should be true.") - } - var data bar - if B64ToObj(output, &data) != true { - t.Errorf("Second result should be true.") - } - // TODO: there is probably a better way to compare these two... - if fmt.Sprintf("%+v\n", obj) != fmt.Sprintf("%+v\n", data) { - t.Errorf("Strings should match.") - } -} - func TestMiscT8(t *testing.T) { r0 := []string{"/"} diff --git a/noop.go b/noop.go index 27b452dc0..591b36f21 100644 --- a/noop.go +++ b/noop.go @@ -18,9 +18,14 @@ package main import ( + "encoding/gob" "log" ) +func init() { + gob.Register(&NoopRes{}) +} + type NoopRes struct { BaseRes `yaml:",inline"` Comment string `yaml:"comment"` // extra field for example purposes @@ -48,7 +53,7 @@ func (obj *NoopRes) Validate() bool { return true } -func (obj *NoopRes) Watch() { +func (obj *NoopRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -79,7 +84,7 @@ func (obj *NoopRes) Watch() { send = false // only do this on certain types of events //obj.isStateOK = false // something made state dirty - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } diff --git a/pgraph.go b/pgraph.go index 912b614cf..bb23ecb96 100644 --- a/pgraph.go +++ b/pgraph.go @@ -35,11 +35,11 @@ import ( type graphState int const ( - graphNil graphState = iota - graphStarting - graphStarted - graphPausing - graphPaused + graphStateNil graphState = iota + graphStateStarting + graphStateStarted + graphStatePausing + graphStatePaused ) // The graph abstract data type (ADT) is defined as follows: @@ -55,9 +55,8 @@ type Graph struct { } type Vertex struct { - graph *Graph // store a pointer to the graph it's on - Res // anonymous field - data map[string]string // XXX: currently unused i think, remove? + Res // anonymous field + timestamp int64 // last updated timestamp ? } type Edge struct { @@ -68,7 +67,7 @@ func NewGraph(name string) *Graph { return &Graph{ Name: name, Adjacency: make(map[*Vertex]map[*Vertex]*Edge), - state: graphNil, + state: graphStateNil, } } @@ -84,6 +83,19 @@ func NewEdge(name string) *Edge { } } +// Copy makes a copy of the graph struct +func (g *Graph) Copy() *Graph { + newGraph := &Graph{ + Name: g.Name, + Adjacency: make(map[*Vertex]map[*Vertex]*Edge, len(g.Adjacency)), + state: g.state, + } + for k, v := range g.Adjacency { + newGraph.Adjacency[k] = v // copy + } + return newGraph +} + // returns the name of the graph func (g *Graph) GetName() string { return g.Name @@ -116,13 +128,12 @@ func (g *Graph) SetVertex() { } } -// add a new vertex to the graph -func (g *Graph) AddVertex(v *Vertex) { - if _, exists := g.Adjacency[v]; !exists { - g.Adjacency[v] = make(map[*Vertex]*Edge) - - // store a pointer to the graph it's on for convenience and readability - v.graph = g +// AddVertex uses variadic input to add all listed vertices to the graph +func (g *Graph) AddVertex(xv ...*Vertex) { + for _, v := range xv { + if _, exists := g.Adjacency[v]; !exists { + g.Adjacency[v] = make(map[*Vertex]*Edge) + } } } @@ -136,9 +147,9 @@ func (g *Graph) DeleteVertex(v *Vertex) { // adds a directed edge to the graph from v1 to v2 func (g *Graph) AddEdge(v1, v2 *Vertex, e *Edge) { // NOTE: this doesn't allow more than one edge between two vertexes... - // TODO: is this a problem? - g.AddVertex(v1) - g.AddVertex(v2) + g.AddVertex(v1, v2) // supports adding N vertices now + // TODO: check if an edge exists to avoid overwriting it! + // NOTE: VertexMerge() depends on overwriting it at the moment... g.Adjacency[v1][v2] = e } @@ -198,6 +209,11 @@ func (g *Graph) String() string { return fmt.Sprintf("Vertices(%d), Edges(%d)", g.NumVertices(), g.NumEdges()) } +// String returns the canonical form for a vertex +func (v *Vertex) String() string { + return fmt.Sprintf("%s[%s]", v.Res.Kind(), v.Res.GetName()) +} + // output the graph in graphviz format // https://en.wikipedia.org/wiki/DOT_%28graph_description_language%29 func (g *Graph) Graphviz() (out string) { @@ -281,7 +297,7 @@ func (g *Graph) ExecGraphviz(program, filename string) error { } // return an array (slice) of all directed vertices to vertex v (??? -> v) -// ostimestamp should use this +// OKTimestamp should use this func (g *Graph) IncomingGraphEdges(v *Vertex) []*Vertex { // TODO: we might be able to implement this differently by reversing // the Adjacency graph and then looping through it again... @@ -465,9 +481,105 @@ func (g *Graph) TopologicalSort() (result []*Vertex, ok bool) { // kahn's algori return L, true } -// return a pointer to the graph a vertex is on -func (v *Vertex) GetGraph() *Graph { - return v.graph +// Reachability finds the shortest path in a DAG from a to b, and returns the +// slice of vertices that matched this particular path including both a and b. +// It returns nil if a or b is nil, and returns empty list if no path is found. +// Since there could be more than one possible result for this operation, we +// arbitrarily choose one of the shortest possible. As a result, this should +// actually return a tree if we cared about correctness. +// This operates by a recursive algorithm; a more efficient version is likely. +// If you don't give this function a DAG, you might cause infinite recursion! +func (g *Graph) Reachability(a, b *Vertex) []*Vertex { + if a == nil || b == nil { + return nil + } + vertices := g.OutgoingGraphEdges(a) // what points away from a ? + if len(vertices) == 0 { + return []*Vertex{} // nope + } + if VertexContains(b, vertices) { + return []*Vertex{a, b} // found + } + // TODO: parallelize this with go routines? + var collected = make([][]*Vertex, len(vertices)) + pick := -1 + for i, v := range vertices { + collected[i] = g.Reachability(v, b) // find b by recursion + if l := len(collected[i]); l > 0 { + // pick shortest path + // TODO: technically i should return a tree + if pick < 0 || l < len(collected[pick]) { + pick = i + } + } + } + if pick < 0 { + return []*Vertex{} // nope + } + result := []*Vertex{a} // tack on a + result = append(result, collected[pick]...) + return result +} + +// VertexMerge merges v2 into v1 by reattaching the edges where appropriate, +// and then by deleting v2 from the graph. Since more than one edge between two +// vertices is not allowed, duplicate edges are merged as well. an edge merge +// function can be provided if you'd like to control how you merge the edges! +func (g *Graph) VertexMerge(v1, v2 *Vertex, vertexMergeFn func(*Vertex, *Vertex) (*Vertex, error), edgeMergeFn func(*Edge, *Edge) *Edge) error { + // methodology + // 1) edges between v1 and v2 are removed + //Loop: + for k1 := range g.Adjacency { + for k2 := range g.Adjacency[k1] { + // v1 -> v2 || v2 -> v1 + if (k1 == v1 && k2 == v2) || (k1 == v2 && k2 == v1) { + delete(g.Adjacency[k1], k2) // delete map & edge + // NOTE: if we assume this is a DAG, then we can + // assume only v1 -> v2 OR v2 -> v1 exists, and + // we can break out of these loops immediately! + //break Loop + break + } + } + } + + // 2) edges that point towards v2 from X now point to v1 from X (no dupes) + for _, x := range g.IncomingGraphEdges(v2) { // all to vertex v (??? -> v) + e := g.Adjacency[x][v2] // previous edge + // merge e with ex := g.Adjacency[x][v1] if it exists! + if ex, exists := g.Adjacency[x][v1]; exists && edgeMergeFn != nil { + e = edgeMergeFn(e, ex) + } + g.AddEdge(x, v1, e) // overwrite edge + delete(g.Adjacency[x], v2) // delete old edge + } + + // 3) edges that point from v2 to X now point from v1 to X (no dupes) + for _, x := range g.OutgoingGraphEdges(v2) { // all from vertex v (v -> ???) + e := g.Adjacency[v2][x] // previous edge + // merge e with ex := g.Adjacency[v1][x] if it exists! + if ex, exists := g.Adjacency[v1][x]; exists && edgeMergeFn != nil { + e = edgeMergeFn(e, ex) + } + g.AddEdge(v1, x, e) // overwrite edge + delete(g.Adjacency[v2], x) + } + + // 4) merge and then remove the (now merged/grouped) vertex + if vertexMergeFn != nil { // run vertex merge function + if v, err := vertexMergeFn(v1, v2); err != nil { + return err + } else if v != nil { // replace v1 with the "merged" version... + v1 = v // XXX: will this replace v1 the way we want? + } + } + g.DeleteVertex(v2) // remove grouped vertex + + // 5) creation of a cyclic graph should throw an error + if _, dag := g.TopologicalSort(); !dag { // am i a dag or not? + return fmt.Errorf("Graph is not a dag!") + } + return nil // success } func HeisenbergCount(ch chan *Vertex) int { @@ -479,8 +591,134 @@ func HeisenbergCount(ch chan *Vertex) int { return c } +// GetTimestamp returns the timestamp of a vertex +func (v *Vertex) GetTimestamp() int64 { + return v.timestamp +} + +// UpdateTimestamp updates the timestamp on a vertex and returns the new value +func (v *Vertex) UpdateTimestamp() int64 { + v.timestamp = time.Now().UnixNano() // update + return v.timestamp +} + +// can this element run right now? +func (g *Graph) OKTimestamp(v *Vertex) bool { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + // if the vertex has a greater timestamp than any pre-req (n) + // then we can't run right now... + // if they're equal (eg: on init of 0) then we also can't run + // b/c we should let our pre-req's go first... + x, y := v.GetTimestamp(), n.GetTimestamp() + if DEBUG { + log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", v.Kind(), v.GetName(), x, n.Kind(), n.GetName(), y, x >= y) + } + if x >= y { + return false + } + } + return true +} + +// notify nodes after me in the dependency graph that they need refreshing... +// NOTE: this assumes that this can never fail or need to be rescheduled +func (g *Graph) Poke(v *Vertex, activity bool) { + // these are all the vertices pointing AWAY FROM v, eg: v -> ??? + for _, n := range g.OutgoingGraphEdges(v) { + // XXX: if we're in state event and haven't been cancelled by + // apply, then we can cancel a poke to a child, right? XXX + // XXX: if n.Res.GetState() != resStateEvent { // is this correct? + if true { // XXX + if DEBUG { + log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? + } else { + if DEBUG { + log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// poke the pre-requisites that are stale and need to run before I can run... +func (g *Graph) BackPoke(v *Vertex) { + // these are all the vertices pointing TO v, eg: ??? -> v + for _, n := range g.IncomingGraphEdges(v) { + x, y, s := v.GetTimestamp(), n.GetTimestamp(), n.Res.GetState() + // if the parent timestamp needs poking AND it's not in state + // resStateEvent, then poke it. If the parent is in resStateEvent it + // means that an event is pending, so we'll be expecting a poke + // back soon, so we can safely discard the extra parent poke... + // TODO: implement a stateLT (less than) to tell if something + // happens earlier in the state cycle and that doesn't wrap nil + if x >= y && (s != resStateEvent && s != resStateCheckApply) { + if DEBUG { + log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? + } else { + if DEBUG { + log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) + } + } + } +} + +// XXX: rename this function +func (g *Graph) Process(v *Vertex) { + obj := v.Res + if DEBUG { + log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) + } + obj.SetState(resStateEvent) + var ok = true + var apply = false // did we run an apply? + // is it okay to run dependency wise right now? + // if not, that's okay because when the dependency runs, it will poke + // us back and we will run if needed then! + if g.OKTimestamp(v) { + if DEBUG { + log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), v.GetTimestamp()) + } + + obj.SetState(resStateCheckApply) + // if this fails, don't UpdateTimestamp() + stateok, err := obj.CheckApply(true) + if stateok && err != nil { // should never return this way + log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err) + } + if DEBUG { + log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err) + } + + if !stateok { // if state *was* not ok, we had to have apply'ed + if err != nil { // error during check or apply + ok = false + } else { + apply = true + } + } + + if ok { + // update this timestamp *before* we poke or the poked + // nodes might fail due to having a too old timestamp! + v.UpdateTimestamp() // this was touched... + obj.SetState(resStatePoking) // can't cancel parent poke + g.Poke(v, apply) + } + // poke at our pre-req's instead since they need to refresh/run... + } else { + // only poke at the pre-req's that need to run + go g.BackPoke(v) + } +} + // main kick to start the graph func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue + log.Printf("State: %v -> %v", g.SetState(graphStateStarting), g.GetState()) + defer log.Printf("State: %v -> %v", g.SetState(graphStateStarted), g.GetState()) t, _ := g.TopologicalSort() // TODO: only calculate indegree if `first` is true to save resources indegree := g.InDegree() // compute all of the indegree's @@ -492,7 +730,20 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // see: https://ttboj.wordpress.com/2015/07/27/golang-parallelism-issues-causing-too-many-open-files-error/ go func(vv *Vertex) { defer wg.Done() - vv.Res.Watch() + // listen for chan events from Watch() and run + // the Process() function when they're received + // this avoids us having to pass the data into + // the Watch() function about which graph it is + // running on, which isolates things nicely... + chanProcess := make(chan struct{}) + go func() { + for _ = range chanProcess { + // XXX: do we need to ACK so that it's synchronous? + g.Process(vv) + } + }() + vv.Res.Watch(chanProcess) // i block until i end + close(chanProcess) log.Printf("%v[%v]: Exited", vv.Kind(), vv.GetName()) }(v) } @@ -511,7 +762,7 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue // and not just selectively the subset with no indegree. if (!first) || indegree[v] == 0 { // ensure state is started before continuing on to next vertex - for !v.Res.SendEvent(eventStart, true, false) { + for !v.SendEvent(eventStart, true, false) { if DEBUG { // if SendEvent fails, we aren't up yet log.Printf("%v[%v]: Retrying SendEvent(Start)", v.Kind(), v.GetName()) @@ -525,13 +776,18 @@ func (g *Graph) Start(wg *sync.WaitGroup, first bool) { // start or continue } func (g *Graph) Pause() { + log.Printf("State: %v -> %v", g.SetState(graphStatePausing), g.GetState()) + defer log.Printf("State: %v -> %v", g.SetState(graphStatePaused), g.GetState()) t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... - v.Res.SendEvent(eventPause, true, false) + v.SendEvent(eventPause, true, false) } } func (g *Graph) Exit() { + if g == nil { + return + } // empty graph that wasn't populated yet t, _ := g.TopologicalSort() for _, v := range t { // squeeze out the events... // turn off the taps... @@ -539,7 +795,7 @@ func (g *Graph) Exit() { // when we hit the 'default' in the select statement! // XXX: we can do this to quiesce, but it's not necessary now - v.Res.SendEvent(eventExit, true, false) + v.SendEvent(eventExit, true, false) } } @@ -549,6 +805,7 @@ func (g *Graph) SetConvergedCallback(ctimeout int, converged chan bool) { } } +// in array function to test *Vertex in a slice of *Vertices func VertexContains(needle *Vertex, haystack []*Vertex) bool { for _, v := range haystack { if needle == v { @@ -558,16 +815,6 @@ func VertexContains(needle *Vertex, haystack []*Vertex) bool { return false } -// in array function to test *vertices in a slice of *vertices -func HasVertex(v *Vertex, haystack []*Vertex) bool { - for _, r := range haystack { - if v == r { - return true - } - } - return false -} - // reverse a list of vertices func Reverse(vs []*Vertex) []*Vertex { //var out []*Vertex // XXX: golint suggests, but it fails testing diff --git a/pgraph_test.go b/pgraph_test.go index 7c74bbc76..f9722e285 100644 --- a/pgraph_test.go +++ b/pgraph_test.go @@ -20,7 +20,10 @@ package main import ( + "fmt" "reflect" + "sort" + "strings" "testing" ) @@ -254,26 +257,26 @@ func TestPgraphT8(t *testing.T) { v1 := NewVertex(NewNoopRes("v1")) v2 := NewVertex(NewNoopRes("v2")) v3 := NewVertex(NewNoopRes("v3")) - if HasVertex(v1, []*Vertex{v1, v2, v3}) != true { + if VertexContains(v1, []*Vertex{v1, v2, v3}) != true { t.Errorf("Should be true instead of false.") } v4 := NewVertex(NewNoopRes("v4")) v5 := NewVertex(NewNoopRes("v5")) v6 := NewVertex(NewNoopRes("v6")) - if HasVertex(v4, []*Vertex{v5, v6}) != false { + if VertexContains(v4, []*Vertex{v5, v6}) != false { t.Errorf("Should be false instead of true.") } v7 := NewVertex(NewNoopRes("v7")) v8 := NewVertex(NewNoopRes("v8")) v9 := NewVertex(NewNoopRes("v9")) - if HasVertex(v8, []*Vertex{v7, v8, v9}) != true { + if VertexContains(v8, []*Vertex{v7, v8, v9}) != true { t.Errorf("Should be true instead of false.") } v1b := NewVertex(NewNoopRes("v1")) // same value, different objects - if HasVertex(v1b, []*Vertex{v1, v2, v3}) != false { + if VertexContains(v1b, []*Vertex{v1, v2, v3}) != false { t.Errorf("Should be false instead of true.") } } @@ -381,6 +384,211 @@ func TestPgraphT10(t *testing.T) { } } +// empty +func TestPgraphReachability0(t *testing.T) { + { + G := NewGraph("g") + result := G.Reachability(nil, nil) + if result != nil { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } + { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v6 := NewVertex(NewNoopRes("v6")) + + result := G.Reachability(v1, v6) + expected := []*Vertex{} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } + { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v1, v4, e3) + G.AddEdge(v3, v4, e4) + G.AddEdge(v3, v5, e5) + + result := G.Reachability(v1, v6) + expected := []*Vertex{} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } + } +} + +// simple linear path +func TestPgraphReachability1(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + //e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v5, v6, e5) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v2, v3, v4, v5, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// pick one of two correct paths +func TestPgraphReachability2(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v1, v3, e2) + G.AddEdge(v2, v4, e3) + G.AddEdge(v3, v4, e4) + G.AddEdge(v4, v5, e5) + G.AddEdge(v5, v6, e6) + + result := G.Reachability(v1, v6) + expected1 := []*Vertex{v1, v2, v4, v5, v6} + expected2 := []*Vertex{v1, v3, v4, v5, v6} + + // !xor test + if reflect.DeepEqual(result, expected1) == reflect.DeepEqual(result, expected2) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// pick shortest path +func TestPgraphReachability3(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v1, v5, e5) + G.AddEdge(v5, v6, e6) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v5, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + +// direct path +func TestPgraphReachability4(t *testing.T) { + G := NewGraph("g") + v1 := NewVertex(NewNoopRes("v1")) + v2 := NewVertex(NewNoopRes("v2")) + v3 := NewVertex(NewNoopRes("v3")) + v4 := NewVertex(NewNoopRes("v4")) + v5 := NewVertex(NewNoopRes("v5")) + v6 := NewVertex(NewNoopRes("v6")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + e5 := NewEdge("e5") + e6 := NewEdge("e6") + G.AddEdge(v1, v2, e1) + G.AddEdge(v2, v3, e2) + G.AddEdge(v3, v4, e3) + G.AddEdge(v4, v5, e4) + G.AddEdge(v5, v6, e5) + G.AddEdge(v1, v6, e6) + + result := G.Reachability(v1, v6) + expected := []*Vertex{v1, v6} + + if !reflect.DeepEqual(result, expected) { + t.Logf("Reachability failed!") + str := "Got:" + for _, v := range result { + str += " " + v.Res.GetName() + } + t.Errorf(str) + } +} + func TestPgraphT11(t *testing.T) { v1 := NewVertex(NewNoopRes("v1")) v2 := NewVertex(NewNoopRes("v2")) @@ -404,5 +612,647 @@ func TestPgraphT11(t *testing.T) { if rev := Reverse([]*Vertex{v6, v5, v4, v3, v2, v1}); !reflect.DeepEqual(rev, []*Vertex{v1, v2, v3, v4, v5, v6}) { t.Errorf("Reverse of vertex slice failed.") } +} + +type NoopResTest struct { + NoopRes +} +func (obj *NoopResTest) GroupCmp(r Res) bool { + res, ok := r.(*NoopResTest) + if !ok { + return false + } + + // TODO: implement this in vertexCmp for *testBaseGrouper instead? + if strings.Contains(res.Name, ",") { // HACK + return false // element to be grouped is already grouped! + } + + // group if they start with the same letter! (helpful hack for testing) + return obj.Name[0] == res.Name[0] +} + +func NewNoopResTest(name string) *NoopResTest { + obj := &NoopResTest{ + NoopRes: NoopRes{ + BaseRes: BaseRes{ + Name: name, + Meta: MetaParams{ + AutoGroup: true, // always autogroup + }, + }, + }, + } + obj.Init() // optional here in this testing scenario (for now) + return obj +} + +// ListStrCmp compares two lists of strings +func ListStrCmp(a, b []string) bool { + //fmt.Printf("CMP: %v with %v\n", a, b) // debugging + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + +// GraphCmp compares the topology of two graphs and returns nil if they're equal +// It also compares if grouped element groups are identical +func GraphCmp(g1, g2 *Graph) error { + if n1, n2 := g1.NumVertices(), g2.NumVertices(); n1 != n2 { + return fmt.Errorf("Graph g1 has %d vertices, while g2 has %d.", n1, n2) + } + if e1, e2 := g1.NumEdges(), g2.NumEdges(); e1 != e2 { + return fmt.Errorf("Graph g1 has %d edges, while g2 has %d.", e1, e2) + } + + var m = make(map[*Vertex]*Vertex) // g1 to g2 vertex correspondence +Loop: + // check vertices + for v1 := range g1.Adjacency { // for each vertex in g1 + + l1 := strings.Split(v1.GetName(), ",") // make list of everyone's names... + for _, x1 := range v1.GetGroup() { + l1 = append(l1, x1.GetName()) // add my contents + } + l1 = StrRemoveDuplicatesInList(l1) // remove duplicates + sort.Strings(l1) + + // inner loop + for v2 := range g2.Adjacency { // does it match in g2 ? + + l2 := strings.Split(v2.GetName(), ",") + for _, x2 := range v2.GetGroup() { + l2 = append(l2, x2.GetName()) + } + l2 = StrRemoveDuplicatesInList(l2) // remove duplicates + sort.Strings(l2) + + // does l1 match l2 ? + if ListStrCmp(l1, l2) { // cmp! + m[v1] = v2 + continue Loop + } + } + return fmt.Errorf("Graph g1, has no match in g2 for: %v", v1.GetName()) + } + // vertices (and groups) match :) + + // check edges + for v1 := range g1.Adjacency { // for each vertex in g1 + v2 := m[v1] // lookup in map to get correspondance + // g1.Adjacency[v1] corresponds to g2.Adjacency[v2] + if e1, e2 := len(g1.Adjacency[v1]), len(g2.Adjacency[v2]); e1 != e2 { + return fmt.Errorf("Graph g1, vertex(%v) has %d edges, while g2, vertex(%v) has %d.", v1.GetName(), e1, v2.GetName(), e2) + } + + for vv1, ee1 := range g1.Adjacency[v1] { + vv2 := m[vv1] + ee2 := g2.Adjacency[v2][vv2] + + // these are edges from v1 -> vv1 via ee1 (graph 1) + // to cmp to edges from v2 -> vv2 via ee2 (graph 2) + + // check: (1) vv1 == vv2 ? (we've already checked this!) + l1 := strings.Split(vv1.GetName(), ",") // make list of everyone's names... + for _, x1 := range vv1.GetGroup() { + l1 = append(l1, x1.GetName()) // add my contents + } + l1 = StrRemoveDuplicatesInList(l1) // remove duplicates + sort.Strings(l1) + + l2 := strings.Split(vv2.GetName(), ",") + for _, x2 := range vv2.GetGroup() { + l2 = append(l2, x2.GetName()) + } + l2 = StrRemoveDuplicatesInList(l2) // remove duplicates + sort.Strings(l2) + + // does l1 match l2 ? + if !ListStrCmp(l1, l2) { // cmp! + return fmt.Errorf("Graph g1 and g2 don't agree on: %v and %v", vv1.GetName(), vv2.GetName()) + } + + // check: (2) ee1 == ee2 + if ee1.Name != ee2.Name { + return fmt.Errorf("Graph g1 edge(%v) doesn't match g2 edge(%v)", ee1.Name, ee2.Name) + } + } + } + + return nil // success! +} + +type testBaseGrouper struct { // FIXME: update me when we've implemented the correct grouping algorithm! + baseGrouper // "inherit" what we want, and reimplement the rest +} + +func (ag *testBaseGrouper) name() string { + return "testBaseGrouper" +} + +func (ag *testBaseGrouper) vertexMerge(v1, v2 *Vertex) (v *Vertex, err error) { + if err := v1.Res.GroupRes(v2.Res); err != nil { // group them first + return nil, err + } + // HACK: update the name so it matches full list of self+grouped + obj := v1.Res + names := strings.Split(obj.GetName(), ",") // load in stored names + for _, n := range obj.GetGroup() { + names = append(names, n.GetName()) // add my contents + } + names = StrRemoveDuplicatesInList(names) // remove duplicates + sort.Strings(names) + obj.SetName(strings.Join(names, ",")) + return // success or fail, and no need to merge the actual vertices! +} + +func (ag *testBaseGrouper) edgeMerge(e1, e2 *Edge) *Edge { + // HACK: update the name so it makes a union of both names + n1 := strings.Split(e1.Name, ",") // load + n2 := strings.Split(e2.Name, ",") // load + names := append(n1, n2...) + names = StrRemoveDuplicatesInList(names) // remove duplicates + sort.Strings(names) + return NewEdge(strings.Join(names, ",")) +} + +func (g *Graph) fullPrint() (str string) { + str += "\n" + for v := range g.Adjacency { + str += fmt.Sprintf("* v: %v\n", v.GetName()) + // TODO: add explicit grouping data? + } + for v1 := range g.Adjacency { + for v2, e := range g.Adjacency[v1] { + str += fmt.Sprintf("* e: %v -> %v # %v\n", v1.GetName(), v2.GetName(), e.Name) + } + } + return +} + +// helper function +func runGraphCmp(t *testing.T, g1, g2 *Graph) { + // FIXME: update me when we've implemented the correct grouping algorithm! + ch := g1.autoGroup(&testBaseGrouper{}) // edits the graph + for _ = range ch { // bleed the channel or it won't run :( + // pass + } + err := GraphCmp(g1, g2) + if err != nil { + t.Logf(" actual (g1): %v%v", g1, g1.fullPrint()) + t.Logf("expected (g2): %v%v", g2, g2.fullPrint()) + t.Logf("Cmp error:") + t.Errorf("%v", err) + } +} + +// all of the following test cases are layed out with the following semantics: +// * vertices which start with the same single letter are considered "like" +// * "like" elements should be merged +// * vertices can have any integer after their single letter "family" type +// * grouped vertices should have a name with a comma separated list of names +// * edges follow the same conventions about grouping + +// empty graph +func TestPgraphGrouping1(t *testing.T) { + g1 := NewGraph("g1") // original graph + g2 := NewGraph("g2") // expected result + runGraphCmp(t, g1, g2) +} + +// single vertex +func TestPgraphGrouping2(t *testing.T) { + g1 := NewGraph("g1") // original graph + { // grouping to limit variable scope + a1 := NewVertex(NewNoopResTest("a1")) + g1.AddVertex(a1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + g2.AddVertex(a1) + } + runGraphCmp(t, g1, g2) +} + +// two vertices +func TestPgraphGrouping3(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, b1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a1, b1) + } + runGraphCmp(t, g1, g2) +} + +// two vertices merge +func TestPgraphGrouping4(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + g1.AddVertex(a1, a2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + g2.AddVertex(a) + } + runGraphCmp(t, g1, g2) +} + +// three vertices merge +func TestPgraphGrouping5(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + g1.AddVertex(a1, a2, a3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + g2.AddVertex(a) + } + runGraphCmp(t, g1, g2) +} + +// three vertices, two merge +func TestPgraphGrouping6(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, a2, b1) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a, b1) + } + runGraphCmp(t, g1, g2) +} + +// four vertices, three merge +func TestPgraphGrouping7(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + b1 := NewVertex(NewNoopResTest("b1")) + g1.AddVertex(a1, a2, a3, b1) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + b1 := NewVertex(NewNoopResTest("b1")) + g2.AddVertex(a, b1) + } + runGraphCmp(t, g1, g2) +} + +// four vertices, two&two merge +func TestPgraphGrouping8(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + g1.AddVertex(a1, a2, b1, b2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2")) + g2.AddVertex(a, b) + } + runGraphCmp(t, g1, g2) +} + +// five vertices, two&three merge +func TestPgraphGrouping9(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + b3 := NewVertex(NewNoopResTest("b3")) + g1.AddVertex(a1, a2, b1, b2, b3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2,b3")) + g2.AddVertex(a, b) + } + runGraphCmp(t, g1, g2) +} + +// three unique vertices +func TestPgraphGrouping10(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + g1.AddVertex(a1, b1, c1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + g2.AddVertex(a1, b1, c1) + } + runGraphCmp(t, g1, g2) +} + +// three unique vertices, two merge +func TestPgraphGrouping11(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + g1.AddVertex(a1, b1, b2, c1) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + g2.AddVertex(a1, b, c1) + } + runGraphCmp(t, g1, g2) +} + +// simple merge 1 +// a1 a2 a1,a2 +// \ / >>> | (arrows point downwards) +// b b +func TestPgraphGrouping12(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a2, b1, e2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2") + g2.AddEdge(a, b1, e) + } + runGraphCmp(t, g1, g2) +} + +// simple merge 2 +// b b +// / \ >>> | (arrows point downwards) +// a1 a2 a1,a2 +func TestPgraphGrouping13(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + g1.AddEdge(b1, a1, e1) + g1.AddEdge(b1, a2, e2) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2") + g2.AddEdge(b1, a, e) + } + runGraphCmp(t, g1, g2) +} + +// triple merge +// a1 a2 a3 a1,a2,a3 +// \ | / >>> | (arrows point downwards) +// b b +func TestPgraphGrouping14(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + a3 := NewVertex(NewNoopResTest("a3")) + b1 := NewVertex(NewNoopResTest("b1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a2, b1, e2) + g1.AddEdge(a3, b1, e3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2,a3")) + b1 := NewVertex(NewNoopResTest("b1")) + e := NewEdge("e1,e2,e3") + g2.AddEdge(a, b1, e) + } + runGraphCmp(t, g1, g2) +} + +// chain merge +// a1 a1 +// / \ | +// b1 b2 >>> b1,b2 (arrows point downwards) +// \ / | +// c1 c1 +func TestPgraphGrouping15(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(a1, b2, e2) + g1.AddEdge(b1, c1, e3) + g1.AddEdge(b2, c1, e4) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e2") + e2 := NewEdge("e3,e4") + g2.AddEdge(a1, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +/* FIXME: uncomment me when we've implemented the correct grouping algorithm! +// reattach 1 (outer) +// a1 a2 a1,a2 +// | / | +// b1 / >>> b1 (arrows point downwards) +// | / | +// c1 c1 +func TestPgraphGrouping16(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(a2, c1, e3) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b1 := NewVertex(NewNoopResTest("b1")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e3") + e2 := NewEdge("e2") // TODO: should this be e2,e3 (eg we split e3?) + g2.AddEdge(a, b1, e1) + g2.AddEdge(b1, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// reattach 2 (inner) +// a1 b2 a1 +// | / | +// b1 / >>> b1,b2 (arrows point downwards) +// | / | +// c1 c1 +func TestPgraphGrouping17(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(b2, c1, e3) + } + g2 := NewGraph("g2") // expected result + { + a1 := NewVertex(NewNoopResTest("a1")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2,e3") + g2.AddEdge(a1, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// re-attach 3 (double) +// a2 a1 b2 a1,a2 +// \ | / | +// \ b1 / >>> b1,b2 (arrows point downwards) +// \ | / | +// c1 c1 +func TestPgraphGrouping18(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + b1 := NewVertex(NewNoopResTest("b1")) + b2 := NewVertex(NewNoopResTest("b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1") + e2 := NewEdge("e2") + e3 := NewEdge("e3") + e4 := NewEdge("e4") + g1.AddEdge(a1, b1, e1) + g1.AddEdge(b1, c1, e2) + g1.AddEdge(a2, c1, e3) + g1.AddEdge(b2, c1, e4) + } + g2 := NewGraph("g2") // expected result + { + a := NewVertex(NewNoopResTest("a1,a2")) + b := NewVertex(NewNoopResTest("b1,b2")) + c1 := NewVertex(NewNoopResTest("c1")) + e1 := NewEdge("e1,e3") + e2 := NewEdge("e2,e4") + g2.AddEdge(a, b, e1) + g2.AddEdge(b, c1, e2) + } + runGraphCmp(t, g1, g2) +} + +// tricky merge, (no change or merge?) +// a1 a1 +// \ >>> \ (arrows point downwards) +// a2 a2 +func TestPgraphGroupingTricky1(t *testing.T) { + g1 := NewGraph("g1") // original graph + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + e1 := NewEdge("e1") + g1.AddEdge(a1, a2, e1) + } + g2 := NewGraph("g2") // expected result ? + { + a1 := NewVertex(NewNoopResTest("a1")) + a2 := NewVertex(NewNoopResTest("a2")) + e1 := NewEdge("e1") + g2.AddEdge(a1, a2, e1) + } + //g3 := NewGraph("g2") // expected result ? + //{ + // a := NewVertex(NewNoopResTest("a1,a2")) + //} + runGraphCmp(t, g1, g2) // TODO: i'm tempted to think this is correct + //runGraphCmp(t, g1, g3) } +*/ diff --git a/pkg.go b/pkg.go index 4dcefd0c0..0ff7eb204 100644 --- a/pkg.go +++ b/pkg.go @@ -19,6 +19,7 @@ package main import ( //"packagekit" // TODO + "encoding/gob" "errors" "fmt" "log" @@ -26,6 +27,10 @@ import ( "strings" ) +func init() { + gob.Register(&PkgRes{}) +} + type PkgRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: installed, uninstalled, newest, @@ -102,7 +107,7 @@ func (obj *PkgRes) Validate() bool { // use UpdatesChanged signal to watch for changes // TODO: https://github.com/hughsie/PackageKit/issues/109 // TODO: https://github.com/hughsie/PackageKit/issues/110 -func (obj *PkgRes) Watch() { +func (obj *PkgRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -168,7 +173,7 @@ func (obj *PkgRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } } } diff --git a/resources.go b/resources.go index c06463eb2..2390f9fad 100644 --- a/resources.go +++ b/resources.go @@ -18,9 +18,11 @@ package main import ( + "bytes" + "encoding/base64" + "encoding/gob" "fmt" "log" - "time" ) //go:generate stringer -type=resState -output=resstate_stringer.go @@ -73,27 +75,24 @@ type MetaParams struct { // everything here only needs to be implemented once, in the BaseRes type Base interface { GetName() string // can't be named "Name()" because of struct field + SetName(string) Kind() string GetMeta() MetaParams SetVertex(*Vertex) SetConvergedCallback(ctimeout int, converged chan bool) - SendEvent(eventName, bool, bool) bool IsWatching() bool SetWatching(bool) GetConvergedState() resConvergedState SetConvergedState(resConvergedState) GetState() resState SetState(resState) - GetTimestamp() int64 - UpdateTimestamp() int64 - OKTimestamp() bool - Poke(bool) - BackPoke() - GroupCmp(Res) bool // TODO: is there a better name for this? - GroupRes(Res) error // group resource (arg) into self - IsGrouped() bool // am I grouped? - SetGrouped(bool) // set grouped bool - GetGroup() []Res // return everyone grouped inside me + SendEvent(eventName, bool, bool) bool + ReadEvent(*Event) (bool, bool) // TODO: optional here? + GroupCmp(Res) bool // TODO: is there a better name for this? + GroupRes(Res) error // group resource (arg) into self + IsGrouped() bool // am I grouped? + SetGrouped(bool) // set grouped bool + GetGroup() []Res // return everyone grouped inside me SetGroup([]Res) } @@ -103,17 +102,17 @@ type Res interface { Init() //Validate() bool // TODO: this might one day be added GetUUIDs() []ResUUID // most resources only return one - Watch() + Watch(chan struct{}) // send on channel to signal process() events CheckApply(bool) (bool, error) AutoEdges() AutoEdge Compare(Res) bool + CollectPattern(string) // XXX: temporary until Res collection is more advanced } type BaseRes struct { Name string `yaml:"name"` Meta MetaParams `yaml:"meta"` // struct of all the metaparams kind string - timestamp int64 // last updated timestamp ? events chan Event vertex *Vertex state resState @@ -168,11 +167,15 @@ func (obj *BaseRes) Init() { obj.events = make(chan Event) // unbuffered chan size to avoid stale events } -// this method gets used by all the resources, if we have one of (obj NoopRes) it would get overridden in that case! +// this method gets used by all the resources func (obj *BaseRes) GetName() string { return obj.Name } +func (obj *BaseRes) SetName(name string) { + obj.Name = name +} + // return the kind of resource this is func (obj *BaseRes) Kind() string { return obj.kind @@ -224,87 +227,6 @@ func (obj *BaseRes) SetState(state resState) { obj.state = state } -// GetTimestamp returns the timestamp of a vertex -func (obj *BaseRes) GetTimestamp() int64 { - return obj.timestamp -} - -// UpdateTimestamp updates the timestamp on a vertex and returns the new value -func (obj *BaseRes) UpdateTimestamp() int64 { - obj.timestamp = time.Now().UnixNano() // update - return obj.timestamp -} - -// can this element run right now? -func (obj *BaseRes) OKTimestamp() bool { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - // if the vertex has a greater timestamp than any pre-req (n) - // then we can't run right now... - // if they're equal (eg: on init of 0) then we also can't run - // b/c we should let our pre-req's go first... - x, y := obj.GetTimestamp(), n.Res.GetTimestamp() - if DEBUG { - log.Printf("%v[%v]: OKTimestamp: (%v) >= %v[%v](%v): !%v", obj.Kind(), obj.GetName(), x, n.Kind(), n.GetName(), y, x >= y) - } - if x >= y { - return false - } - } - return true -} - -// notify nodes after me in the dependency graph that they need refreshing... -// NOTE: this assumes that this can never fail or need to be rescheduled -func (obj *BaseRes) Poke(activity bool) { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing AWAY FROM v, eg: v -> ??? - for _, n := range g.OutgoingGraphEdges(v) { - // XXX: if we're in state event and haven't been cancelled by - // apply, then we can cancel a poke to a child, right? XXX - // XXX: if n.Res.GetState() != resStateEvent { // is this correct? - if true { // XXX - if DEBUG { - log.Printf("%v[%v]: Poke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(eventPoke, false, activity) // XXX: can this be switched to sync? - } else { - if DEBUG { - log.Printf("%v[%v]: Poke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - -// poke the pre-requisites that are stale and need to run before I can run... -func (obj *BaseRes) BackPoke() { - v := obj.GetVertex() - g := v.GetGraph() - // these are all the vertices pointing TO v, eg: ??? -> v - for _, n := range g.IncomingGraphEdges(v) { - x, y, s := obj.GetTimestamp(), n.Res.GetTimestamp(), n.Res.GetState() - // if the parent timestamp needs poking AND it's not in state - // resStateEvent, then poke it. If the parent is in resStateEvent it - // means that an event is pending, so we'll be expecting a poke - // back soon, so we can safely discard the extra parent poke... - // TODO: implement a stateLT (less than) to tell if something - // happens earlier in the state cycle and that doesn't wrap nil - if x >= y && (s != resStateEvent && s != resStateCheckApply) { - if DEBUG { - log.Printf("%v[%v]: BackPoke: %v[%v]", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - n.SendEvent(eventBackPoke, false, false) // XXX: can this be switched to sync? - } else { - if DEBUG { - log.Printf("%v[%v]: BackPoke: %v[%v]: Skipped!", v.Kind(), v.GetName(), n.Kind(), n.GetName()) - } - } - } -} - // push an event into the message queue for a particular vertex func (obj *BaseRes) SendEvent(event eventName, sync bool, activity bool) bool { // TODO: isn't this race-y ? @@ -394,50 +316,38 @@ func (obj *BaseRes) SetGroup(g []Res) { obj.grouped = g } -// XXX: rename this function -func Process(obj Res) { - if DEBUG { - log.Printf("%v[%v]: Process()", obj.Kind(), obj.GetName()) - } - obj.SetState(resStateEvent) - var ok = true - var apply = false // did we run an apply? - // is it okay to run dependency wise right now? - // if not, that's okay because when the dependency runs, it will poke - // us back and we will run if needed then! - if obj.OKTimestamp() { - if DEBUG { - log.Printf("%v[%v]: OKTimestamp(%v)", obj.Kind(), obj.GetName(), obj.GetTimestamp()) - } +func (obj *BaseRes) CollectPattern(pattern string) { + // XXX: default method is empty +} - obj.SetState(resStateCheckApply) - // if this fails, don't UpdateTimestamp() - stateok, err := obj.CheckApply(true) - if stateok && err != nil { // should never return this way - log.Fatalf("%v[%v]: CheckApply(): %t, %+v", obj.Kind(), obj.GetName(), stateok, err) - } - if DEBUG { - log.Printf("%v[%v]: CheckApply(): %t, %v", obj.Kind(), obj.GetName(), stateok, err) - } +// ResToB64 encodes a resource to a base64 encoded string (after serialization) +func ResToB64(res Res) (string, error) { + b := bytes.Buffer{} + e := gob.NewEncoder(&b) + err := e.Encode(&res) // pass with & + if err != nil { + return "", fmt.Errorf("Gob failed to encode: %v", err) + } + return base64.StdEncoding.EncodeToString(b.Bytes()), nil +} - if !stateok { // if state *was* not ok, we had to have apply'ed - if err != nil { // error during check or apply - ok = false - } else { - apply = true - } - } +// B64ToRes decodes a resource from a base64 encoded string (after deserialization) +func B64ToRes(str string) (Res, error) { + var output interface{} + bb, err := base64.StdEncoding.DecodeString(str) + if err != nil { + return nil, fmt.Errorf("Base64 failed to decode: %v", err) + } + b := bytes.NewBuffer(bb) + d := gob.NewDecoder(b) + err = d.Decode(&output) // pass with & + if err != nil { + return nil, fmt.Errorf("Gob failed to decode: %v", err) + } + res, ok := output.(Res) + if !ok { + return nil, fmt.Errorf("Output %v is not a Res", res) - if ok { - // update this timestamp *before* we poke or the poked - // nodes might fail due to having a too old timestamp! - obj.UpdateTimestamp() // this was touched... - obj.SetState(resStatePoking) // can't cancel parent poke - obj.Poke(apply) - } - // poke at our pre-req's instead since they need to refresh/run... - } else { - // only poke at the pre-req's that need to run - go obj.BackPoke() } + return res, nil } diff --git a/resources_test.go b/resources_test.go new file mode 100644 index 000000000..70f13c7c7 --- /dev/null +++ b/resources_test.go @@ -0,0 +1,105 @@ +// Mgmt +// Copyright (C) 2013-2016+ James Shubin and the project contributors +// Written by James Shubin and the project contributors +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +package main + +import ( + "bytes" + "encoding/base64" + "encoding/gob" + "testing" +) + +func TestMiscEncodeDecode1(t *testing.T) { + var err error + //gob.Register( &NoopRes{} ) // happens in noop.go : init() + //gob.Register( &FileRes{} ) // happens in file.go : init() + // ... + + // encode + var input interface{} = &FileRes{} + b1 := bytes.Buffer{} + e := gob.NewEncoder(&b1) + err = e.Encode(&input) // pass with & + if err != nil { + t.Errorf("Gob failed to Encode: %v", err) + } + str := base64.StdEncoding.EncodeToString(b1.Bytes()) + + // decode + var output interface{} + bb, err := base64.StdEncoding.DecodeString(str) + if err != nil { + t.Errorf("Base64 failed to Decode: %v", err) + } + b2 := bytes.NewBuffer(bb) + d := gob.NewDecoder(b2) + err = d.Decode(&output) // pass with & + if err != nil { + t.Errorf("Gob failed to Decode: %v", err) + } + + res1, ok := input.(Res) + if !ok { + t.Errorf("Input %v is not a Res", res1) + return + } + res2, ok := output.(Res) + if !ok { + t.Errorf("Output %v is not a Res", res2) + return + } + if !res1.Compare(res2) { + t.Error("The input and output Res values do not match!") + } +} + +func TestMiscEncodeDecode2(t *testing.T) { + var err error + //gob.Register( &NoopRes{} ) // happens in noop.go : init() + //gob.Register( &FileRes{} ) // happens in file.go : init() + // ... + + // encode + var input Res = &FileRes{} + + b64, err := ResToB64(input) + if err != nil { + t.Errorf("Can't encode: %v", err) + return + } + + output, err := B64ToRes(b64) + if err != nil { + t.Errorf("Can't decode: %v", err) + return + } + + res1, ok := input.(Res) + if !ok { + t.Errorf("Input %v is not a Res", res1) + return + } + res2, ok := output.(Res) + if !ok { + t.Errorf("Output %v is not a Res", res2) + return + } + if !res1.Compare(res2) { + t.Error("The input and output Res values do not match!") + } +} diff --git a/svc.go b/svc.go index 635a66a9d..69f0a3092 100644 --- a/svc.go +++ b/svc.go @@ -20,6 +20,7 @@ package main import ( + "encoding/gob" "errors" "fmt" systemd "github.com/coreos/go-systemd/dbus" // change namespace @@ -28,6 +29,10 @@ import ( "log" ) +func init() { + gob.Register(&SvcRes{}) +} + type SvcRes struct { BaseRes `yaml:",inline"` State string `yaml:"state"` // state: running, stopped, undefined @@ -62,7 +67,7 @@ func (obj *SvcRes) Validate() bool { } // Service watcher -func (obj *SvcRes) Watch() { +func (obj *SvcRes) Watch(processChan chan struct{}) { if obj.IsWatching() { return } @@ -189,7 +194,7 @@ func (obj *SvcRes) Watch() { case err := <-subErrors: obj.SetConvergedState(resConvergedNil) // XXX ? - log.Println("error:", err) + log.Printf("error: %v", err) log.Fatal(err) //vertex.events <- fmt.Sprintf("svc: %v", "error") // XXX: how should we handle errors? @@ -210,7 +215,7 @@ func (obj *SvcRes) Watch() { dirty = false obj.isStateOK = false // something made state dirty } - Process(obj) // XXX: rename this function + processChan <- struct{}{} // trigger process } }