diff --git a/component/all/process.go b/component/all/process.go index f4127ee10c5..a7f3629b423 100644 --- a/component/all/process.go +++ b/component/all/process.go @@ -6,6 +6,7 @@ package all import ( "github.com/juju/cmd" "github.com/juju/errors" + "github.com/juju/names" "github.com/juju/juju/apiserver/common" "github.com/juju/juju/process" @@ -13,6 +14,7 @@ import ( "github.com/juju/juju/process/api/server" "github.com/juju/juju/process/context" "github.com/juju/juju/process/plugin" + procstate "github.com/juju/juju/process/state" "github.com/juju/juju/state" "github.com/juju/juju/worker/uniter/runner" "github.com/juju/juju/worker/uniter/runner/jujuc" @@ -22,6 +24,7 @@ type workloadProcesses struct{} func (c workloadProcesses) registerForServer() error { c.registerHookContext() + c.registerState() return nil } @@ -111,3 +114,13 @@ func (workloadProcesses) registerHookContextCommands() { return cmd }) } + +func (c workloadProcesses) registerState() { + newUnitProcesses := func(persist state.Persistence, unit names.UnitTag, charm names.CharmTag) (state.UnitProcesses, error) { + return procstate.NewUnitProcesses(persist, unit, &charm), nil + } + newProcessDefinitions := func(persist state.Persistence, charm names.CharmTag) (state.ProcessDefinitions, error) { + return procstate.NewDefinitions(persist, charm), nil + } + state.SetProcessesComponent(newUnitProcesses, newProcessDefinitions) +} diff --git a/process/api/server/hookcontext.go b/process/api/server/hookcontext.go index 56eed2380e7..24bcc527608 100644 --- a/process/api/server/hookcontext.go +++ b/process/api/server/hookcontext.go @@ -3,6 +3,8 @@ package server +// TODO(ericsnow) Eliminate the apiserver/common import if possible. + import ( "github.com/juju/errors" "github.com/juju/loggo" @@ -59,8 +61,7 @@ func (a HookContextAPI) RegisterProcesses(args api.RegisterProcessesArgs) (api.P for _, apiProc := range args.Processes { info := api.API2Proc(apiProc) res := api.ProcessResult{ - ID: info.Name + "/" + info.Details.ID, - //ID: info.ID(), + ID: info.ID(), } if err := st.Register(info); err != nil { res.Error = common.ServerError(errors.Trace(err)) @@ -94,12 +95,7 @@ func (a HookContextAPI) ListProcesses(args api.ListProcessesArgs) (api.ListProce if len(ids) == 0 { for _, proc := range procs { - id := proc.Name - if proc.Details.ID != "" { - id += "/" + proc.Details.ID - } - ids = append(ids, id) - //ids = append(ids, info.ID()) + ids = append(ids, proc.ID()) } } @@ -114,8 +110,7 @@ func (a HookContextAPI) ListProcesses(args api.ListProcessesArgs) (api.ListProce if proc.Details.ID != "" { procID += "/" + proc.Details.ID } - //if id == proc.ID() { - if id == procID { + if id == proc.ID() { res.Info = api.Proc2api(proc) found = true break diff --git a/process/info.go b/process/info.go index 266ba0509af..9666fd3a6ab 100644 --- a/process/info.go +++ b/process/info.go @@ -4,7 +4,9 @@ package process import ( + "fmt" "reflect" + "strings" "github.com/juju/errors" "gopkg.in/juju/charm.v5" @@ -24,6 +26,8 @@ type Info struct { Details Details } +// TODO(ericsnow) Eliminate NewInfoUnvalidated. + // NewInfoUnvalidated builds a new Info object with the provided // values. The returned Info may be invalid if the given values cause // that result. The Validate method can be used to check. @@ -36,12 +40,36 @@ func NewInfoUnvalidated(name, procType string) *Info { } } +// ID composes a unique ID for the process (relative to the unit/charm). +func (info Info) ID() string { + id := info.Process.Name + if info.Details.ID != "" { + id = fmt.Sprintf("%s/%s", id, info.Details.ID) + } + return id +} + +// ParseID extracts the process name and details ID from the provided string. +func ParseID(id string) (string, string) { + parts := strings.SplitN(id, "/", 2) + if len(parts) == 2 { + return parts[0], parts[1] + } + return id, "" +} + // Validate checks the process info to ensure it is correct. func (info Info) Validate() error { if err := info.Process.Validate(); err != nil { return errors.Trace(err) } + if !reflect.DeepEqual(info.Details, Details{}) { + if err := info.Details.Validate(); err != nil { + return errors.Trace(err) + } + } + return nil } diff --git a/process/info_test.go b/process/info_test.go index 5e700a30ec0..e6378102340 100644 --- a/process/info_test.go +++ b/process/info_test.go @@ -6,6 +6,7 @@ package process_test import ( jc "github.com/juju/testing/checkers" gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" "github.com/juju/juju/process" "github.com/juju/juju/testing" @@ -17,30 +18,91 @@ type infoSuite struct { var _ = gc.Suite(&infoSuite{}) +func (s *infoSuite) newInfo(name, procType string) *process.Info { + return &process.Info{ + Process: charm.Process{ + Name: name, + Type: procType, + }, + } +} + +func (s *infoSuite) TestIDFull(c *gc.C) { + info := s.newInfo("a-proc", "docker") + info.Details.ID = "my-proc" + id := info.ID() + + c.Check(id, gc.Equals, "a-proc/my-proc") +} + +func (s *infoSuite) TestIDMissingDetailsID(c *gc.C) { + info := s.newInfo("a-proc", "docker") + id := info.ID() + + c.Check(id, gc.Equals, "a-proc") +} + +func (s *infoSuite) TestIDNameOnly(c *gc.C) { + info := s.newInfo("a-proc", "docker") + id := info.ID() + + c.Check(id, gc.Equals, "a-proc") +} + +func (s *infoSuite) TestParseIDFull(c *gc.C) { + name, id := process.ParseID("a-proc/my-proc") + + c.Check(name, gc.Equals, "a-proc") + c.Check(id, gc.Equals, "my-proc") +} + +func (s *infoSuite) TestParseIDNameOnly(c *gc.C) { + name, id := process.ParseID("a-proc") + + c.Check(name, gc.Equals, "a-proc") + c.Check(id, gc.Equals, "") +} + +func (s *infoSuite) TestParseIDExtras(c *gc.C) { + name, id := process.ParseID("somecharm/0/a-proc/my-proc") + + c.Check(name, gc.Equals, "somecharm") + c.Check(id, gc.Equals, "0/a-proc/my-proc") +} + func (s *infoSuite) TestValidateOkay(c *gc.C) { - info := process.NewInfoUnvalidated("a proc", "docker") + info := s.newInfo("a proc", "docker") err := info.Validate() c.Check(err, jc.ErrorIsNil) } func (s *infoSuite) TestValidateBadMetadata(c *gc.C) { - info := process.NewInfoUnvalidated("a proc", "") + info := s.newInfo("a proc", "") err := info.Validate() c.Check(err, gc.ErrorMatches, ".*type: name is required") } +func (s *infoSuite) TestValidateBadDetails(c *gc.C) { + info := s.newInfo("a proc", "docker") + info.Details.ID = "my-proc" + err := info.Validate() + + c.Check(err, gc.ErrorMatches, ".*Label cannot be empty.*") +} + func (s *infoSuite) TestIsRegisteredTrue(c *gc.C) { - info := process.NewInfoUnvalidated("a proc", "docker") + info := s.newInfo("a proc", "docker") info.Details.ID = "abc123" info.Details.Status.Label = "running" isRegistered := info.IsRegistered() + c.Check(isRegistered, jc.IsTrue) } func (s *infoSuite) TestIsRegisteredFalse(c *gc.C) { - info := process.NewInfoUnvalidated("a proc", "docker") + info := s.newInfo("a proc", "docker") isRegistered := info.IsRegistered() c.Check(isRegistered, jc.IsFalse) diff --git a/process/persistence/base_test.go b/process/persistence/base_test.go new file mode 100644 index 00000000000..35ffcc00d0c --- /dev/null +++ b/process/persistence/base_test.go @@ -0,0 +1,87 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package persistence_test + +import ( + "fmt" + + "github.com/juju/names" + gitjujutesting "github.com/juju/testing" + "github.com/juju/utils" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process" + "github.com/juju/juju/testing" +) + +type baseProcessesSuite struct { + testing.BaseSuite + + stub *gitjujutesting.Stub + charm names.CharmTag + unit names.UnitTag +} + +func (s *baseProcessesSuite) SetUpTest(c *gc.C) { + s.BaseSuite.SetUpTest(c) + + s.stub = &gitjujutesting.Stub{} + s.charm = names.NewCharmTag("local:series/dummy-1") + s.unit = names.NewUnitTag("a-unit/0") +} + +func (s *baseProcessesSuite) setUnit(id string) { + if id == "" { + s.unit = names.UnitTag{} + } else { + s.unit = names.NewUnitTag(id) + } +} + +func (s *baseProcessesSuite) setCharm(id string) { + if id == "" { + s.charm = names.CharmTag{} + } else { + s.charm = names.NewCharmTag(id) + } +} + +func (s *baseProcessesSuite) newDefinitions(pType string, names ...string) []charm.Process { + var definitions []charm.Process + for _, name := range names { + definitions = append(definitions, charm.Process{ + Name: name, + Type: pType, + }) + } + return definitions +} + +func (s *baseProcessesSuite) newProcesses(pType string, names ...string) []process.Info { + var ids []string + for i, name := range names { + name, id := process.ParseID(name) + names[i] = name + if id == "" { + id = fmt.Sprintf("%s-%s", name, utils.MustNewUUID()) + } + ids = append(ids, id) + } + + var processes []process.Info + for i, definition := range s.newDefinitions(pType, names...) { + id := ids[i] + processes = append(processes, process.Info{ + Process: definition, + Details: process.Details{ + ID: id, + Status: process.Status{ + Label: "running", + }, + }, + }) + } + return processes +} diff --git a/process/persistence/mongo.go b/process/persistence/mongo.go new file mode 100644 index 00000000000..936089f6ee5 --- /dev/null +++ b/process/persistence/mongo.go @@ -0,0 +1,560 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +// TODO(ericsnow) Move this to a subpackage and split it up? + +package persistence + +import ( + "fmt" + "strings" + + "github.com/juju/errors" + "github.com/juju/names" + "gopkg.in/juju/charm.v5" + "gopkg.in/mgo.v2/bson" + "gopkg.in/mgo.v2/txn" + + "github.com/juju/juju/process" +) + +const workloadProcessesC = "workloadprocesses" + +// TODO(ericsnow) These two functions are copied from state. + +func unitGlobalKey(name string) string { + return "u#" + name + "#charm" +} + +func charmGlobalKey(charmURL *charm.URL) string { + return "c#" + charmURL.String() +} + +// TOOD(ericsnow) Move the methods under their own type. + +func (pp Persistence) indexDefinitionDocs(ids []string) (map[interface{}]ProcessDefinitionDoc, error) { + var docs []ProcessDefinitionDoc + query := bson.D{{"$in", ids}} + if err := pp.allID(query, &docs); err != nil { + return nil, errors.Trace(err) + } + indexed := make(map[interface{}]ProcessDefinitionDoc) + for _, doc := range docs { + indexed[doc.DocID] = doc + } + return indexed, nil +} + +func (pp Persistence) extractProc(id string, definitionDocs map[string]ProcessDefinitionDoc, launchDocs map[string]ProcessLaunchDoc, procDocs map[string]ProcessDoc) (*process.Info, int) { + missing := 0 + name, _ := process.ParseID(id) + definitionDoc, ok := definitionDocs[name] + if !ok { + missing += 1 + } + launchDoc, ok := launchDocs[id] + if !ok { + missing += 2 + } + procDoc, ok := procDocs[id] + if !ok { + missing += 4 + } + if missing > 0 { + return nil, missing + } + + doc := processInfoDoc{ + definition: definitionDoc, + launch: launchDoc, + proc: procDoc, + } + info := doc.info() + return &info, 0 +} + +func (pp Persistence) checkRecords(id string) (bool, error) { + missing := 0 + _, err := pp.definition(id) + if errors.IsNotFound(err) { + missing += 1 + } else if err != nil { + return false, errors.Trace(err) + } + _, err = pp.launch(id) + if errors.IsNotFound(err) { + missing += 2 + } else if err != nil { + return false, errors.Trace(err) + } + _, err = pp.proc(id) + if errors.IsNotFound(err) { + missing += 4 + } else if err != nil { + return false, errors.Trace(err) + } + if missing > 0 { + if missing < 7 { + return false, errors.Errorf("found inconsistent records for process %q", id) + } + return false, nil + } + return true, nil +} + +func dropEnvUUID(id string) string { + fullID := id + parts := strings.SplitN(fullID, ":", 2) + if len(parts) == 2 { + if names.IsValidEnvironment(parts[0]) { + fullID = parts[1] + } + } + return fullID +} + +// TODO(ericsnow) Factor most of the below into a processesCollection type. + +func (pp Persistence) one(id string, doc interface{}) error { + return errors.Trace(pp.st.One(workloadProcessesC, id, doc)) +} + +func (pp Persistence) all(query bson.D, docs interface{}) error { + return errors.Trace(pp.st.All(workloadProcessesC, query, docs)) +} + +func (pp Persistence) allID(query bson.D, docs interface{}) error { + if query != nil { + query = bson.D{{"_id", query}} + } + return errors.Trace(pp.all(query, docs)) +} + +func (pp Persistence) definitionID(id string) string { + name, _ := process.ParseID(id) + // The URL will always parse successfully. + charmURL, _ := charm.ParseURL(pp.charm.Id()) + return fmt.Sprintf("%s#%s", charmGlobalKey(charmURL), name) +} + +func (pp Persistence) processID(id string) string { + return fmt.Sprintf("%s#%s", unitGlobalKey(pp.unit.Id()), id) +} + +func (pp Persistence) launchID(id string) string { + return pp.processID(id) + "#launch" +} + +func (pp Persistence) newInsertDefinitionOp(definition charm.Process) txn.Op { + doc := pp.newProcessDefinitionDoc(definition) + return txn.Op{ + C: workloadProcessesC, + Id: doc.DocID, + Assert: txn.DocMissing, + Insert: doc, + } +} + +func (pp Persistence) newInsertProcessOps(info process.Info) []txn.Op { + var ops []txn.Op + ops = append(ops, pp.newInsertLaunchOp(info)) + ops = append(ops, pp.newInsertProcOp(info)) + return ops +} + +func (pp Persistence) newInsertLaunchOp(info process.Info) txn.Op { + doc := pp.newLaunchDoc(info) + return txn.Op{ + C: workloadProcessesC, + Id: doc.DocID, + Assert: txn.DocMissing, + Insert: doc, + } +} + +func (pp Persistence) newInsertProcOp(info process.Info) txn.Op { + doc := pp.newProcessDoc(info) + return txn.Op{ + C: workloadProcessesC, + Id: doc.DocID, + Assert: txn.DocMissing, + Insert: doc, + } +} + +func (pp Persistence) newSetRawStatusOps(id string, status process.Status) []txn.Op { + id = pp.processID(id) + return []txn.Op{{ + C: workloadProcessesC, + Id: id, + Assert: txn.DocExists, + }, { + C: workloadProcessesC, + Id: id, + Assert: IsAliveDoc, + Update: bson.D{{"$set", bson.D{{"pluginstatus", status.Label}}}}, + }} +} + +func (pp Persistence) newRemoveProcessOps(id string) []txn.Op { + var ops []txn.Op + ops = append(ops, pp.newRemoveLaunchOp(id)) + ops = append(ops, pp.newRemoveProcOps(id)...) + return ops +} + +func (pp Persistence) newRemoveLaunchOp(id string) txn.Op { + id = pp.launchID(id) + return txn.Op{ + C: workloadProcessesC, + Id: id, + Assert: txn.DocExists, + Remove: true, + } +} + +func (pp Persistence) newRemoveProcOps(id string) []txn.Op { + id = pp.processID(id) + return []txn.Op{{ + C: workloadProcessesC, + Id: id, + Assert: IsAliveDoc, + }, { + C: workloadProcessesC, + Id: id, + Assert: txn.DocExists, + Remove: true, + }} +} + +type processInfoDoc struct { + definition ProcessDefinitionDoc + launch ProcessLaunchDoc + proc ProcessDoc +} + +func (d processInfoDoc) info() process.Info { + info := d.proc.info() + + info.Process = d.definition.definition() + + rawStatus := info.Details.Status + info.Details = d.launch.details() + info.Details.Status = rawStatus + + return info +} + +// ProcessDefinitionDoc is the document for process definitions. +type ProcessDefinitionDoc struct { + DocID string `bson:"_id"` + EnvUUID string `bson:"env-uuid"` + DocKind string `bson:"dockind"` + + UnitID string `bson:"unitid"` + + Name string `bson:"name"` + Description string `bson:"description"` + Type string `bson:"type"` + TypeOptions map[string]string `bson:"typeoptions"` + Command string `bson:"command"` + Image string `bson:"image"` + Ports []string `bson:"ports"` + Volumes []string `bson:"volumes"` + EnvVars map[string]string `bson:"envvars"` +} + +func (d ProcessDefinitionDoc) definition() charm.Process { + definition := charm.Process{ + Name: d.Name, + Description: d.Description, + Type: d.Type, + Command: d.Command, + Image: d.Image, + } + + if len(d.TypeOptions) > 0 { + definition.TypeOptions = d.TypeOptions + } + + if len(d.EnvVars) > 0 { + definition.EnvVars = d.EnvVars + } + + if len(d.Ports) > 0 { + ports := make([]charm.ProcessPort, len(d.Ports)) + for i, raw := range d.Ports { + p := ports[i] + fmt.Sscanf(raw, "%d:%d:%s", &p.External, &p.Internal, &p.Endpoint) + } + definition.Ports = ports + } + + if len(d.Volumes) > 0 { + volumes := make([]charm.ProcessVolume, len(d.Volumes)) + for i, raw := range d.Volumes { + v := volumes[i] + fmt.Sscanf(raw, "%s:%s:%s:%s", &v.ExternalMount, &v.InternalMount, &v.Mode, &v.Name) + } + definition.Volumes = volumes + } + + return definition +} + +func (pp Persistence) newProcessDefinitionDoc(definition charm.Process) *ProcessDefinitionDoc { + id := pp.definitionID(definition.Name) + + var ports []string + for _, p := range definition.Ports { + // TODO(ericsnow) Ensure p.Endpoint is in state? + ports = append(ports, fmt.Sprintf("%d:%d:%s", p.External, p.Internal, p.Endpoint)) + } + + var volumes []string + for _, v := range definition.Volumes { + // TODO(ericsnow) Ensure v.Name is in state? + volumes = append(volumes, fmt.Sprintf("%s:%s:%s:%s", v.ExternalMount, v.InternalMount, v.Mode, v.Name)) + } + + return &ProcessDefinitionDoc{ + DocID: id, + UnitID: pp.unit.Id(), + DocKind: "definition", + + Name: definition.Name, + Description: definition.Description, + Type: definition.Type, + TypeOptions: definition.TypeOptions, + Command: definition.Command, + Image: definition.Image, + Ports: ports, + Volumes: volumes, + EnvVars: definition.EnvVars, + } +} + +func (pp Persistence) definition(id string) (*ProcessDefinitionDoc, error) { + id = pp.definitionID(id) + + var doc ProcessDefinitionDoc + if err := pp.one(id, &doc); err != nil { + return nil, errors.Trace(err) + } + return &doc, nil +} + +func (pp Persistence) allDefinitions() (map[string]ProcessDefinitionDoc, error) { + var docs []ProcessDefinitionDoc + query := bson.D{{"dockind", "definition"}} + if err := pp.all(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessDefinitionDoc) + for _, doc := range docs { + parts := strings.Split(doc.DocID, "#") + id := parts[len(parts)-1] + results[id] = doc + } + return results, nil +} + +func (pp Persistence) definitions(ids []string) (map[string]ProcessDefinitionDoc, error) { + fullIDs := make([]string, len(ids)) + idMap := make(map[string]string, len(ids)) + for i, id := range ids { + fullID := pp.definitionID(id) + fullIDs[i] = fullID + name, _ := process.ParseID(id) + idMap[fullID] = name + } + + var docs []ProcessDefinitionDoc + query := bson.D{{"$in", fullIDs}} + if err := pp.allID(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessDefinitionDoc) + for _, doc := range docs { + fullID := dropEnvUUID(doc.DocID) + id := idMap[fullID] + results[id] = doc + } + return results, nil +} + +// ProcessLaunchDoc is the document for process launch details. +type ProcessLaunchDoc struct { + DocID string `bson:"_id"` + EnvUUID string `bson:"env-uuid"` + DocKind string `bson:"dockind"` + + PluginID string `bson:"pluginid"` + RawStatus string `bson:"rawstatus"` +} + +func (d ProcessLaunchDoc) details() process.Details { + return process.Details{ + ID: d.PluginID, + Status: process.Status{ + Label: d.RawStatus, + }, + } +} + +func (pp Persistence) newLaunchDoc(info process.Info) *ProcessLaunchDoc { + id := pp.launchID(info.ID()) + return &ProcessLaunchDoc{ + DocID: id, + DocKind: "launch", + + PluginID: info.Details.ID, + RawStatus: info.Details.Status.Label, + } +} + +func (pp Persistence) launch(id string) (*ProcessLaunchDoc, error) { + id = pp.launchID(id) + + var doc ProcessLaunchDoc + if err := pp.one(id, &doc); err != nil { + return nil, errors.Trace(err) + } + return &doc, nil +} + +func (pp Persistence) allLaunches() (map[string]ProcessLaunchDoc, error) { + var docs []ProcessLaunchDoc + query := bson.D{{"dockind", "launch"}} + if err := pp.all(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessLaunchDoc) + for _, doc := range docs { + parts := strings.Split(doc.DocID, "#") + id := parts[len(parts)-2] + results[id] = doc + } + return results, nil +} + +func (pp Persistence) launches(ids []string) (map[string]ProcessLaunchDoc, error) { + fullIDs := make([]string, len(ids)) + idMap := make(map[string]string, len(ids)) + for i, id := range ids { + fullID := pp.launchID(id) + fullIDs[i] = fullID + idMap[fullID] = id + } + + var docs []ProcessLaunchDoc + query := bson.D{{"$in", fullIDs}} + if err := pp.allID(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessLaunchDoc) + for _, doc := range docs { + fullID := dropEnvUUID(doc.DocID) + id := idMap[fullID] + results[id] = doc + } + return results, nil +} + +// TODO(ericsnow) The life stuff here is just a temporary hack. + +type Life int8 // a mirror of state.Life. + +const ( + Alive Life = iota + Dying + Dead +) + +var IsAliveDoc = bson.D{{"life", Alive}} + +// ProcessDoc is the top-level document for processes. +type ProcessDoc struct { + DocID string `bson:"_id"` + EnvUUID string `bson:"env-uuid"` + DocKind string `bson:"dockind"` + + Life Life `bson:"life"` + PluginStatus string `bson:"pluginstatus"` +} + +func (d ProcessDoc) info() process.Info { + return process.Info{ + Details: process.Details{ + Status: process.Status{ + Label: d.PluginStatus, + }, + }, + } +} + +func (pp Persistence) newProcessDoc(info process.Info) *ProcessDoc { + id := pp.processID(info.ID()) + + return &ProcessDoc{ + DocID: id, + DocKind: "process", + + Life: Alive, + PluginStatus: info.Details.Status.Label, + } +} + +func (pp Persistence) proc(id string) (*ProcessDoc, error) { + id = pp.processID(id) + + var doc ProcessDoc + if err := pp.one(id, &doc); err != nil { + return nil, errors.Trace(err) + } + return &doc, nil +} + +func (pp Persistence) allProcs() (map[string]ProcessDoc, error) { + var docs []ProcessDoc + query := bson.D{{"dockind", "process"}} + if err := pp.all(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessDoc) + for _, doc := range docs { + parts := strings.Split(doc.DocID, "#") + id := parts[len(parts)-1] + results[id] = doc + } + return results, nil +} + +func (pp Persistence) procs(ids []string) (map[string]ProcessDoc, error) { + fullIDs := make([]string, len(ids)) + idMap := make(map[string]string, len(ids)) + for i, id := range ids { + fullID := pp.processID(id) + fullIDs[i] = fullID + idMap[fullID] = id + } + + var docs []ProcessDoc + query := bson.D{{"$in", fullIDs}} + if err := pp.allID(query, &docs); err != nil { + return nil, errors.Trace(err) + } + + results := make(map[string]ProcessDoc) + for _, doc := range docs { + fullID := dropEnvUUID(doc.DocID) + id := idMap[fullID] + results[id] = doc + } + return results, nil +} diff --git a/process/persistence/package_test.go b/process/persistence/package_test.go new file mode 100644 index 00000000000..61114a31f96 --- /dev/null +++ b/process/persistence/package_test.go @@ -0,0 +1,14 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package persistence_test + +import ( + "testing" + + gc "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + gc.TestingT(t) +} diff --git a/process/persistence/processes.go b/process/persistence/processes.go new file mode 100644 index 00000000000..23f4d473e16 --- /dev/null +++ b/process/persistence/processes.go @@ -0,0 +1,288 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package persistence + +// TODO(ericsnow) Eliminate the mongo-related imports here. + +import ( + "fmt" + "reflect" + + "github.com/juju/errors" + "github.com/juju/names" + jujutxn "github.com/juju/txn" + "gopkg.in/juju/charm.v5" + "gopkg.in/mgo.v2/txn" + + "github.com/juju/juju/process" +) + +// TODO(ericsnow) Implement persistence using a TXN abstraction (used +// in the business logic) with ops factories available from the +// persistence layer. + +// PersistenceBase exposes the core persistence functionality needed +// for workload processes. +type PersistenceBase interface { + // One populates doc with the document corresponding to the given + // ID. Missing documents result in errors.NotFound. + One(collName, id string, doc interface{}) error + // All populates docs with the list of the documents corresponding + // to the provided query. + All(collName string, query, docs interface{}) error + // Run runs the transaction generated by the provided factory + // function. It may be retried several times. + Run(transactions jujutxn.TransactionSource) error +} + +// Persistence exposes the high-level persistence functionality +// related to workload processes in Juju. +type Persistence struct { + st PersistenceBase + charm names.CharmTag + unit names.UnitTag +} + +// NewPersistence builds a new Persistence based on the provided info. +func NewPersistence(st PersistenceBase, charm *names.CharmTag, unit *names.UnitTag) *Persistence { + pp := &Persistence{ + st: st, + } + if charm != nil { + pp.charm = *charm + } + if unit != nil { + pp.unit = *unit + } + return pp +} + +// EnsureDefinitions checks persistence to see if records for the +// definitions are already there. If not then they are added. If so then +// they are checked to be sure they match those provided. The lists of +// names for those that already exist and that don't match are returned. +func (pp Persistence) EnsureDefinitions(definitions ...charm.Process) ([]string, []string, error) { + var found []string + var mismatched []string + + if len(definitions) == 0 { + return found, mismatched, nil + } + + var ids []string + var ops []txn.Op + for _, definition := range definitions { + ids = append(ids, pp.definitionID(definition.Name)) + ops = append(ops, pp.newInsertDefinitionOp(definition)) + } + buildTxn := func(attempt int) ([]txn.Op, error) { + if attempt > 0 { + // The last attempt aborted so clear out any ops that failed + // the DocMissing assertion and try again. + found = []string{} + mismatched = []string{} + indexed, err := pp.indexDefinitionDocs(ids) + if err != nil { + return nil, errors.Trace(err) + } + + var okOps []txn.Op + for _, op := range ops { + if existing, ok := indexed[op.Id]; !ok { + okOps = append(okOps, op) + } else { // Otherwise the op is dropped. + id := fmt.Sprintf("%s", op.Id) + found = append(found, id) + requested, ok := op.Insert.(*ProcessDefinitionDoc) + if !ok { + return nil, errors.Errorf("inserting invalid type %T", op.Insert) + } + if !reflect.DeepEqual(requested.definition(), existing.definition()) { + mismatched = append(mismatched, id) + } + } + } + if len(okOps) == 0 { + return nil, jujutxn.ErrNoOperations + } + ops = okOps + } + return ops, nil + } + if err := pp.st.Run(buildTxn); err != nil { + return nil, nil, errors.Trace(err) + } + + return found, mismatched, nil +} + +// Insert adds records for the process to persistence. If the process +// is already there then false gets returned (true if inserted). +// Existing records are not checked for consistency. +func (pp Persistence) Insert(info process.Info) (bool, error) { + var okay bool + var ops []txn.Op + // TODO(ericsnow) Add unitPersistence.newEnsureAliveOp(pp.unit)? + // TODO(ericsnow) Add pp.newEnsureDefinitionOp(info.Process)? + ops = append(ops, pp.newInsertProcessOps(info)...) + buildTxn := func(attempt int) ([]txn.Op, error) { + if attempt > 0 { + // One of the records already exists. + okay = false + return nil, jujutxn.ErrNoOperations + } + okay = true + return ops, nil + } + if err := pp.st.Run(buildTxn); err != nil { + return false, errors.Trace(err) + } + return okay, nil +} + +// SetStatus updates the raw status for the identified process in +// persistence. The return value corresponds to whether or not the +// record was found in persistence. Any other problem results in +// an error. The process is not checked for inconsistent records. +func (pp Persistence) SetStatus(id string, status process.Status) (bool, error) { + var found bool + var ops []txn.Op + // TODO(ericsnow) Add unitPersistence.newEnsureAliveOp(pp.unit)? + ops = append(ops, pp.newSetRawStatusOps(id, status)...) + buildTxn := func(attempt int) ([]txn.Op, error) { + if attempt > 0 { + _, err := pp.proc(id) + if errors.IsNotFound(err) { + found = false + return nil, jujutxn.ErrNoOperations + } else if err != nil { + return nil, errors.Trace(err) + } + // We ignore the request since the proc is dying. + // TODO(ericsnow) Ensure that procDoc.Status != state.Alive? + return nil, jujutxn.ErrNoOperations + } + found = true + return ops, nil + } + if err := pp.st.Run(buildTxn); err != nil { + return false, errors.Trace(err) + } + return found, nil +} + +// List builds the list of processes found in persistence which match +// the provided IDs. The lists of IDs with missing records is also +// returned. Inconsistent records result in errors.NotValid. +func (pp Persistence) List(ids ...string) ([]process.Info, []string, error) { + var missing []string + + // TODO(ericsnow) Ensure that the unit is Alive? + // TODO(ericsnow) fix race that exists between the 3 calls + definitionDocs, err := pp.definitions(ids) + if err != nil { + return nil, nil, errors.Trace(err) + } + launchDocs, err := pp.launches(ids) + if err != nil { + return nil, nil, errors.Trace(err) + } + procDocs, err := pp.procs(ids) + if err != nil { + return nil, nil, errors.Trace(err) + } + + var results []process.Info + for _, id := range ids { + proc, missingCount := pp.extractProc(id, definitionDocs, launchDocs, procDocs) + if missingCount > 0 { + if missingCount < 7 { + return nil, nil, errors.Errorf("found inconsistent records for process %q", id) + } + missing = append(missing, id) + continue + } + results = append(results, *proc) + } + return results, missing, nil +} + +// ListAll builds the list of all processes found in persistence. +// Inconsistent records result in errors.NotValid. +func (pp Persistence) ListAll() ([]process.Info, error) { + // TODO(ericsnow) Ensure that the unit is Alive? + // TODO(ericsnow) fix race that exists between the 3 calls + definitionDocs, err := pp.allDefinitions() + if err != nil { + return nil, errors.Trace(err) + } + launchDocs, err := pp.allLaunches() + if err != nil { + return nil, errors.Trace(err) + } + procDocs, err := pp.allProcs() + if err != nil { + return nil, errors.Trace(err) + } + + if len(launchDocs) > len(procDocs) { + return nil, errors.Errorf("found inconsistent records (extra launch docs)") + } + + var results []process.Info + for id := range procDocs { + proc, missingCount := pp.extractProc(id, definitionDocs, launchDocs, procDocs) + if missingCount > 0 { + return nil, errors.Errorf("found inconsistent records for process %q", id) + } + results = append(results, *proc) + } + for name, doc := range definitionDocs { + matched := false + for _, proc := range results { + if name == proc.Name { + matched = true + break + } + } + if !matched { + results = append(results, process.Info{ + Process: doc.definition(), + }) + } + } + return results, nil +} + +// TODO(ericsnow) Add procs to state/cleanup.go. + +// TODO(ericsnow) How to ensure they are completely removed from state? + +// Remove removes all records associated with the identified process +// from persistence. Also returned is whether or not the process was +// found. If the records for the process are not consistent then +// errors.NotValid is returned. +func (pp Persistence) Remove(id string) (bool, error) { + var found bool + var ops []txn.Op + // TODO(ericsnow) Add unitPersistence.newEnsureAliveOp(pp.unit)? + ops = append(ops, pp.newRemoveProcessOps(id)...) + buildTxn := func(attempt int) ([]txn.Op, error) { + if attempt > 0 { + okay, err := pp.checkRecords(id) + if err != nil { + return nil, errors.Trace(err) + } + // If okay is true, it must be dying. + found = okay + return nil, jujutxn.ErrNoOperations + } + found = true + return ops, nil + } + if err := pp.st.Run(buildTxn); err != nil { + return false, errors.Trace(err) + } + return found, nil +} diff --git a/process/persistence/processes_test.go b/process/persistence/processes_test.go new file mode 100644 index 00000000000..39d1e29ab10 --- /dev/null +++ b/process/persistence/processes_test.go @@ -0,0 +1,1066 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package persistence_test + +import ( + "github.com/juju/errors" + gitjujutesting "github.com/juju/testing" + jc "github.com/juju/testing/checkers" + jujutxn "github.com/juju/txn" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + "gopkg.in/mgo.v2/bson" + "gopkg.in/mgo.v2/txn" + + "github.com/juju/juju/process" + "github.com/juju/juju/process/persistence" +) + +var _ = gc.Suite(&procsPersistenceSuite{}) + +type procsPersistenceSuite struct { + baseProcessesSuite + + state *fakeStatePersistence +} + +func (s *procsPersistenceSuite) SetUpTest(c *gc.C) { + s.baseProcessesSuite.SetUpTest(c) + + s.state = &fakeStatePersistence{Stub: s.stub} +} + +type processesPersistence interface { + EnsureDefinitions(definitions ...charm.Process) ([]string, []string, error) + Insert(info process.Info) (bool, error) + SetStatus(id string, status process.Status) (bool, error) + List(ids ...string) ([]process.Info, []string, error) + ListAll() ([]process.Info, error) + Remove(id string) (bool, error) +} + +func (s *procsPersistenceSuite) newPersistence() processesPersistence { + return persistence.NewPersistence(s.state, &s.charm, &s.unit) +} + +type processInfoDoc struct { + definition *persistence.ProcessDefinitionDoc + launch *persistence.ProcessLaunchDoc + proc *persistence.ProcessDoc +} + +func (s *procsPersistenceSuite) setDocs(procs ...process.Info) []processInfoDoc { + var results []processInfoDoc + var docs []interface{} + for _, proc := range procs { + doc := processInfoDoc{} + + doc.definition = &persistence.ProcessDefinitionDoc{ + DocID: "c#" + s.charm.Id() + "#" + proc.Name, + DocKind: "definition", + Name: proc.Name, + Type: proc.Type, + UnitID: s.unit.Id(), + } + docs = append(docs, doc.definition) + + if proc.Details.ID != "" { + doc.launch = &persistence.ProcessLaunchDoc{ + DocID: "u#" + s.unit.Id() + "#charm#" + proc.ID() + "#launch", + DocKind: "launch", + PluginID: proc.Details.ID, + RawStatus: proc.Details.Status.Label, + } + doc.proc = &persistence.ProcessDoc{ + DocID: "u#" + s.unit.Id() + "#charm#" + proc.ID(), + DocKind: "process", + Life: 0, + PluginStatus: proc.Details.Status.Label, + } + docs = append(docs, doc.launch, doc.proc) + } + + results = append(results, doc) + } + s.state.setDocs(docs...) + return results +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsCharmAndUnit(c *gc.C) { + definitions := s.newDefinitions("docker", "procA") + s.setUnit("a-unit/0") + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, gc.HasLen, 0) + c.Check(mismatched, gc.HasLen, 0) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + UnitID: "a-unit/0", + Name: "procA", + Type: "docker", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsCharmOnly(c *gc.C) { + definitions := s.newDefinitions("docker", "procA") + s.setUnit("") + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, gc.HasLen, 0) + c.Check(mismatched, gc.HasLen, 0) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsMultiple(c *gc.C) { + definitions := s.newDefinitions("docker", "procA", "procB") + s.setUnit("") + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, gc.HasLen, 0) + c.Check(mismatched, gc.HasLen, 0) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + }, + }, { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procB", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procB", + DocKind: "definition", + Name: "procB", + Type: "docker", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsEmpty(c *gc.C) { + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions() + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, gc.HasLen, 0) + c.Check(mismatched, gc.HasLen, 0) + s.stub.CheckCallNames(c) + c.Check(s.state.ops, gc.HasLen, 0) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + definitions := s.newDefinitions("docker", "procA") + s.setUnit("") + + pp := s.newPersistence() + _, _, err := pp.EnsureDefinitions(definitions...) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsFound(c *gc.C) { + s.stub.SetErrors(txn.ErrAborted) + definitions := s.newDefinitions("docker", "procA") + s.setUnit("") + expected := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + } + s.state.setDocs(expected) + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.DeepEquals, []string{ + "c#local:series/dummy-1#procA", + }) + c.Check(mismatched, gc.HasLen, 0) + s.stub.CheckCallNames(c, "Run", "All") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: expected, + }, + }}) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsMismatched(c *gc.C) { + s.stub.SetErrors(txn.ErrAborted) + definitions := s.newDefinitions("kvm", "procA") + s.setUnit("") + doc := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + } + s.state.setDocs(doc) + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.DeepEquals, []string{ + "c#local:series/dummy-1#procA", + }) + c.Check(mismatched, jc.DeepEquals, []string{ + "c#local:series/dummy-1#procA", + }) + s.stub.CheckCallNames(c, "Run", "All") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "kvm", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestEnsureDefininitionsMixed(c *gc.C) { + s.stub.SetErrors(txn.ErrAborted) + definitions := s.newDefinitions("kvm", "procA") + definitions = append(definitions, s.newDefinitions("docker", "procB", "procC")...) + s.setUnit("a-unit/0") + doc := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + UnitID: "a-unit/0", + Type: "docker", + } + expected := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procB", + DocKind: "definition", + Name: "procB", + UnitID: "a-unit/0", + Type: "docker", + } + s.state.setDocs(doc, expected) + + pp := s.newPersistence() + found, mismatched, err := pp.EnsureDefinitions(definitions...) + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.DeepEquals, []string{ + "c#local:series/dummy-1#procA", + "c#local:series/dummy-1#procB", + }) + c.Check(mismatched, jc.DeepEquals, []string{ + "c#local:series/dummy-1#procA", + }) + s.stub.CheckCallNames(c, "Run", "All") + s.state.checkOps(c, [][]txn.Op{{ + // first attempt + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procA", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + UnitID: "a-unit/0", + Type: "kvm", + }, + }, + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procB", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procB", + DocKind: "definition", + Name: "procB", + UnitID: "a-unit/0", + Type: "docker", + }, + }, + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procC", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procC", + DocKind: "definition", + Name: "procC", + UnitID: "a-unit/0", + Type: "docker", + }, + }, + }, { + // second attempt + { + C: "workloadprocesses", + Id: "c#local:series/dummy-1#procC", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procC", + DocKind: "definition", + Name: "procC", + UnitID: "a-unit/0", + Type: "docker", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestInsertOkay(c *gc.C) { + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + + pp := s.newPersistence() + okay, err := pp.Insert(proc) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz#launch", + Assert: txn.DocMissing, + Insert: &persistence.ProcessLaunchDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz#launch", + DocKind: "launch", + PluginID: "procA-xyz", + RawStatus: "running", + }, + }, + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz", + DocKind: "process", + Life: 0, + PluginStatus: "running", + }, + }, + // TODO(ericsnow) This op will be there once we add definitions. + //{ + // C: "workloadprocesses", + // Id: "c#local:series/dummy-1#procA", + // Assert: txn.DocMissing, + // Insert: &persistence.ProcessDefinitionDoc{ + // DocID: "c#local:series/dummy-1#procA", + // DocKind: "definition", + // Name: "procA", + // Type: "docker", + // }, + //}, + }}) +} + +func (s *procsPersistenceSuite) TestInsertDefinitionExists(c *gc.C) { + expected := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + } + s.state.setDocs(expected) + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + + pp := s.newPersistence() + okay, err := pp.Insert(proc) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz#launch", + Assert: txn.DocMissing, + Insert: &persistence.ProcessLaunchDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz#launch", + DocKind: "launch", + PluginID: "procA-xyz", + RawStatus: "running", + }, + }, + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz", + DocKind: "process", + Life: 0, + PluginStatus: "running", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestInsertDefinitionMismatch(c *gc.C) { + expected := &persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + } + s.state.setDocs(expected) + proc := s.newProcesses("kvm", "procA/procA-xyz")[0] + + pp := s.newPersistence() + okay, err := pp.Insert(proc) + // TODO(ericsnow) Should this fail instead? + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz#launch", + Assert: txn.DocMissing, + Insert: &persistence.ProcessLaunchDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz#launch", + DocKind: "launch", + PluginID: "procA-xyz", + RawStatus: "running", + }, + }, + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz", + DocKind: "process", + Life: 0, + PluginStatus: "running", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestInsertAlreadyExists(c *gc.C) { + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + s.setDocs(proc) + s.stub.SetErrors(txn.ErrAborted) + + pp := s.newPersistence() + okay, err := pp.Insert(proc) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsFalse) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz#launch", + Assert: txn.DocMissing, + Insert: &persistence.ProcessLaunchDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz#launch", + DocKind: "launch", + PluginID: "procA-xyz", + RawStatus: "running", + }, + }, + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocMissing, + Insert: &persistence.ProcessDoc{ + DocID: "u#a-unit/0#charm#procA/procA-xyz", + DocKind: "process", + Life: 0, + PluginStatus: "running", + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestInsertFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + proc := s.newProcesses("docker", "procA")[0] + + pp := s.newPersistence() + _, err := pp.Insert(proc) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *procsPersistenceSuite) TestSetStatusOkay(c *gc.C) { + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + s.setDocs(proc) + newStatus := process.Status{Label: "still running"} + + pp := s.newPersistence() + okay, err := pp.SetStatus("procA/procA-xyz", newStatus) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocExists, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: persistence.IsAliveDoc, + Update: bson.D{ + {"$set", bson.D{{"pluginstatus", "still running"}}}, + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestSetStatusMissing(c *gc.C) { + s.stub.SetErrors(txn.ErrAborted) + newStatus := process.Status{Label: "still running"} + + pp := s.newPersistence() + okay, err := pp.SetStatus("procA/procA-xyz", newStatus) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsFalse) + s.stub.CheckCallNames(c, "Run", "One") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocExists, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: persistence.IsAliveDoc, + Update: bson.D{ + {"$set", bson.D{{"pluginstatus", "still running"}}}, + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestSetStatusDying(c *gc.C) { + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + docs := s.setDocs(proc) + docs[0].proc.Life = persistence.Dying + s.stub.SetErrors(txn.ErrAborted) + newStatus := process.Status{Label: "still running"} + + pp := s.newPersistence() + okay, err := pp.SetStatus("procA/procA-xyz", newStatus) + c.Assert(err, jc.ErrorIsNil) + + c.Check(okay, jc.IsTrue) + s.stub.CheckCallNames(c, "Run", "One") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: txn.DocExists, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/procA-xyz", + Assert: persistence.IsAliveDoc, + Update: bson.D{ + {"$set", bson.D{{"pluginstatus", "still running"}}}, + }, + }, + }}) +} + +func (s *procsPersistenceSuite) TestSetStatusFailed(c *gc.C) { + proc := s.newProcesses("docker", "procA/procA-xyz")[0] + s.setDocs(proc) + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + pp := s.newPersistence() + _, err := pp.SetStatus("procA/procA-xyz", process.Status{Label: "still running"}) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *procsPersistenceSuite) TestListOkay(c *gc.C) { + existing := s.newProcesses("docker", "procA/xyz", "procB/abc") + s.setDocs(existing...) + + pp := s.newPersistence() + procs, missing, err := pp.List("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + c.Check(procs, jc.DeepEquals, []process.Info{existing[0]}) + c.Check(missing, gc.HasLen, 0) +} + +func (s *procsPersistenceSuite) TestListSomeMissing(c *gc.C) { + existing := s.newProcesses("docker", "procA/xyz", "procB/abc") + s.setDocs(existing...) + + pp := s.newPersistence() + procs, missing, err := pp.List("procB/abc", "procC/123") + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + c.Check(procs, jc.DeepEquals, []process.Info{existing[1]}) + c.Check(missing, jc.DeepEquals, []string{"procC/123"}) +} + +func (s *procsPersistenceSuite) TestListEmpty(c *gc.C) { + pp := s.newPersistence() + procs, missing, err := pp.List("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + c.Check(procs, gc.HasLen, 0) + c.Check(missing, jc.DeepEquals, []string{"procA/xyz"}) +} + +func (s *procsPersistenceSuite) TestListInconsistent(c *gc.C) { + existing := s.newProcesses("docker", "procA/xyz", "procB/abc") + s.setDocs(existing...) + delete(s.state.docs, "u#a-unit/0#charm#procA/xyz#launch") + + pp := s.newPersistence() + _, _, err := pp.List("procA/xyz") + + c.Check(err, gc.ErrorMatches, "found inconsistent records .*") +} + +func (s *procsPersistenceSuite) TestListFailure(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + pp := s.newPersistence() + _, _, err := pp.List() + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *procsPersistenceSuite) TestListAllOkay(c *gc.C) { + existing := s.newProcesses("docker", "procA/xyz", "procB/abc") + s.setDocs(existing...) + + pp := s.newPersistence() + procs, err := pp.ListAll() + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + c.Check(procs, jc.DeepEquals, existing) +} + +func (s *procsPersistenceSuite) TestListAllEmpty(c *gc.C) { + pp := s.newPersistence() + procs, err := pp.ListAll() + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + c.Check(procs, gc.HasLen, 0) +} + +func (s *procsPersistenceSuite) TestListAllIncludeCharmDefined(c *gc.C) { + s.state.setDocs(&persistence.ProcessDefinitionDoc{ + DocID: "c#local:series/dummy-1#procA", + DocKind: "definition", + Name: "procA", + Type: "docker", + }) + existing := s.newProcesses("docker", "procB/abc", "procC/xyz") + s.setDocs(existing...) + + pp := s.newPersistence() + procs, err := pp.ListAll() + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "All", "All", "All") + c.Check(s.state.ops, gc.HasLen, 0) + existing = append(existing, process.Info{ + Process: charm.Process{ + Name: "procA", + Type: "docker", + }, + }) + c.Check(procs, jc.DeepEquals, existing) +} + +func (s *procsPersistenceSuite) TestListAllInconsistent(c *gc.C) { + existing := s.newProcesses("docker", "procA/xyz", "procB/abc") + s.setDocs(existing...) + delete(s.state.docs, "u#a-unit/0#charm#procA/xyz#launch") + + pp := s.newPersistence() + _, err := pp.ListAll() + + c.Check(err, gc.ErrorMatches, "found inconsistent records .*") +} + +func (s *procsPersistenceSuite) TestListAllFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + pp := s.newPersistence() + _, err := pp.ListAll() + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *procsPersistenceSuite) TestRemoveOkay(c *gc.C) { + proc := s.newProcesses("docker", "procA/xyz")[0] + s.setDocs(proc) + + pp := s.newPersistence() + found, err := pp.Remove("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz#launch", + Assert: txn.DocExists, + Remove: true, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: persistence.IsAliveDoc, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: txn.DocExists, + Remove: true, + }, + }}) +} + +func (s *procsPersistenceSuite) TestRemoveMissing(c *gc.C) { + s.stub.SetErrors(txn.ErrAborted) + + pp := s.newPersistence() + found, err := pp.Remove("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.IsFalse) + s.stub.CheckCallNames(c, "Run", "One", "One", "One") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz#launch", + Assert: txn.DocExists, + Remove: true, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: persistence.IsAliveDoc, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: txn.DocExists, + Remove: true, + }, + }}) +} + +func (s *procsPersistenceSuite) TestRemoveDying(c *gc.C) { + proc := s.newProcesses("docker", "procA/xyz")[0] + docs := s.setDocs(proc) + docs[0].proc.Life = persistence.Dying + + pp := s.newPersistence() + found, err := pp.Remove("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + c.Check(found, jc.IsTrue) + s.stub.CheckCallNames(c, "Run") + s.state.checkOps(c, [][]txn.Op{{ + { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz#launch", + Assert: txn.DocExists, + Remove: true, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: persistence.IsAliveDoc, + }, { + C: "workloadprocesses", + Id: "u#a-unit/0#charm#procA/xyz", + Assert: txn.DocExists, + Remove: true, + }, + }}) +} + +func (s *procsPersistenceSuite) TestRemoveInconsistent(c *gc.C) { + proc := s.newProcesses("docker", "procA/xyz")[0] + s.setDocs(proc) + delete(s.state.docs, "u#a-unit/0#charm#procA/xyz#launch") + s.stub.SetErrors(txn.ErrAborted) + + pp := s.newPersistence() + _, err := pp.Remove("procA/xyz") + + c.Check(err, gc.ErrorMatches, "found inconsistent records .*") +} + +func (s *procsPersistenceSuite) TestRemoveFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + pp := s.newPersistence() + _, err := pp.Remove("procA/xyz") + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +type fakeStatePersistence struct { + *gitjujutesting.Stub + + docs map[string]interface{} + definitionDocs []string + launchDocs []string + procDocs []string + ops [][]txn.Op +} + +func (sp *fakeStatePersistence) setDocs(docs ...interface{}) { + if sp.docs == nil { + sp.docs = make(map[string]interface{}) + } + for _, doc := range docs { + var id string + switch doc := doc.(type) { + case *persistence.ProcessDefinitionDoc: + id = doc.DocID + sp.definitionDocs = append(sp.definitionDocs, id) + case *persistence.ProcessLaunchDoc: + id = doc.DocID + sp.launchDocs = append(sp.launchDocs, id) + case *persistence.ProcessDoc: + id = doc.DocID + sp.procDocs = append(sp.procDocs, id) + default: + panic(doc) + } + if id == "" { + panic(doc) + } + sp.docs[id] = doc + } +} + +func (sp fakeStatePersistence) checkOps(c *gc.C, expected [][]txn.Op) { + if len(sp.ops) != len(expected) { + c.Check(sp.ops, jc.DeepEquals, expected) + return + } + + for i, ops := range sp.ops { + c.Logf(" -- txn attempt %d --\n", i) + expectedRun := expected[i] + if len(ops) != len(expectedRun) { + c.Check(ops, jc.DeepEquals, expectedRun) + continue + } + for j, op := range ops { + c.Logf(" \n", j) + c.Check(op, jc.DeepEquals, expectedRun[j]) + } + } +} + +func (sp fakeStatePersistence) One(collName, id string, doc interface{}) error { + sp.AddCall("One", collName, id, doc) + if err := sp.NextErr(); err != nil { + return errors.Trace(err) + } + + if len(sp.docs) == 0 { + return errors.NotFoundf(id) + } + found, ok := sp.docs[id] + if !ok { + return errors.NotFoundf(id) + } + + switch doc := doc.(type) { + case *persistence.ProcessDefinitionDoc: + expected := found.(*persistence.ProcessDefinitionDoc) + *doc = *expected + case *persistence.ProcessLaunchDoc: + expected := found.(*persistence.ProcessLaunchDoc) + *doc = *expected + case *persistence.ProcessDoc: + expected := found.(*persistence.ProcessDoc) + *doc = *expected + default: + panic(doc) + } + + return nil +} + +func (sp fakeStatePersistence) All(collName string, query, docs interface{}) error { + sp.AddCall("All", collName, query, docs) + if err := sp.NextErr(); err != nil { + return errors.Trace(err) + } + + var ids []string + elems := query.(bson.D) + if len(elems) != 1 { + panic("bad query") + } + switch elems[0].Name { + case "_id": + elems = elems[0].Value.(bson.D) + if len(elems) != 1 || elems[0].Name != "$in" { + panic(query) + panic("bad query") + } + for _, id := range elems[0].Value.([]string) { + ids = append(ids, id) + } + case "dockind": + switch elems[0].Value.(string) { + case "definition": + for _, id := range sp.definitionDocs { + ids = append(ids, id) + } + case "launch": + for _, id := range sp.launchDocs { + ids = append(ids, id) + } + case "process": + for _, id := range sp.procDocs { + ids = append(ids, id) + } + } + default: + panic(query) + } + + switch docs := docs.(type) { + case *[]persistence.ProcessDefinitionDoc: + var found []persistence.ProcessDefinitionDoc + for _, id := range ids { + doc, ok := sp.docs[id] + if !ok { + continue + } + found = append(found, *doc.(*persistence.ProcessDefinitionDoc)) + } + *docs = found + case *[]persistence.ProcessLaunchDoc: + var found []persistence.ProcessLaunchDoc + for _, id := range ids { + doc, ok := sp.docs[id] + if !ok { + continue + } + found = append(found, *doc.(*persistence.ProcessLaunchDoc)) + } + *docs = found + case *[]persistence.ProcessDoc: + var found []persistence.ProcessDoc + for _, id := range ids { + doc, ok := sp.docs[id] + if !ok { + continue + } + found = append(found, *doc.(*persistence.ProcessDoc)) + } + *docs = found + default: + panic(docs) + } + return nil +} + +func (sp *fakeStatePersistence) Run(transactions jujutxn.TransactionSource) error { + sp.AddCall("Run", transactions) + + // See transactionRunner.Run in github.com/juju/txn. + for i := 0; ; i++ { + const nrRetries = 3 + if i >= nrRetries { + return jujutxn.ErrExcessiveContention + } + + // Get the ops. + ops, err := transactions(i) + if err == jujutxn.ErrTransientFailure { + continue + } + if err == jujutxn.ErrNoOperations { + break + } + if err != nil { + return err + } + + // "run" the ops. + sp.ops = append(sp.ops, ops) + if err := sp.NextErr(); err == nil { + return nil + } else if err != txn.ErrAborted { + return err + } + } + return nil +} diff --git a/process/state/base_test.go b/process/state/base_test.go new file mode 100644 index 00000000000..65599737640 --- /dev/null +++ b/process/state/base_test.go @@ -0,0 +1,243 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state_test + +import ( + "fmt" + "reflect" + + "github.com/juju/errors" + "github.com/juju/names" + gitjujutesting "github.com/juju/testing" + jc "github.com/juju/testing/checkers" + "github.com/juju/utils" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process" + "github.com/juju/juju/testing" +) + +type baseProcessesSuite struct { + testing.BaseSuite + + stub *gitjujutesting.Stub + persist *fakeProcsPersistence + charm names.CharmTag +} + +func (s *baseProcessesSuite) SetUpTest(c *gc.C) { + s.BaseSuite.SetUpTest(c) + + s.stub = &gitjujutesting.Stub{} + s.persist = &fakeProcsPersistence{Stub: s.stub} + s.charm = names.NewCharmTag("local:series/dummy-1") +} + +func (s *baseProcessesSuite) newDefinitions(pType string, names ...string) []charm.Process { + var definitions []charm.Process + for _, name := range names { + definitions = append(definitions, charm.Process{ + Name: name, + Type: pType, + }) + } + return definitions +} + +func (s *baseProcessesSuite) newProcesses(pType string, names ...string) []process.Info { + var ids []string + for i, name := range names { + name, id := process.ParseID(name) + names[i] = name + if id == "" { + id = fmt.Sprintf("%s-%s", name, utils.MustNewUUID()) + } + ids = append(ids, id) + } + + var processes []process.Info + for i, definition := range s.newDefinitions(pType, names...) { + id := ids[i] + processes = append(processes, process.Info{ + Process: definition, + Details: process.Details{ + ID: id, + Status: process.Status{ + Label: "running", + }, + }, + }) + } + return processes +} + +type fakeProcsPersistence struct { + *gitjujutesting.Stub + definitions map[string]*charm.Process + procs map[string]*process.Info + inconsistent []string +} + +func (s *fakeProcsPersistence) checkDefinitions(c *gc.C, expectedList []charm.Process) { + c.Check(s.definitions, gc.HasLen, len(expectedList)) + for _, expected := range expectedList { + definition, ok := s.definitions[expected.Name] + if !ok { + c.Errorf("definition %q not found", expected.Name) + } else { + c.Check(definition, jc.DeepEquals, &expected) + } + } +} + +func (s *fakeProcsPersistence) checkProcesses(c *gc.C, expectedList []process.Info) { + c.Check(s.procs, gc.HasLen, len(expectedList)) + for _, expected := range expectedList { + proc, ok := s.procs[expected.ID()] + if !ok { + c.Errorf("process %q not found", expected.ID()) + } else { + c.Check(proc, jc.DeepEquals, &expected) + } + + definition, ok := s.definitions[expected.Name] + if !ok { + c.Errorf("definition %q not found", expected.Name) + } else { + c.Check(definition, jc.DeepEquals, &expected.Process) + } + } +} + +func (s *fakeProcsPersistence) setDefinitions(definitions ...*charm.Process) { + if s.definitions == nil { + s.definitions = make(map[string]*charm.Process) + } + for _, definition := range definitions { + s.definitions[definition.Name] = definition + } +} + +func (s *fakeProcsPersistence) setProcesses(procs ...*process.Info) { + if s.procs == nil { + s.procs = make(map[string]*process.Info) + } + for _, proc := range procs { + s.procs[proc.ID()] = proc + } +} + +func (s *fakeProcsPersistence) EnsureDefinitions(definitions ...charm.Process) ([]string, []string, error) { + s.AddCall("EnsureDefinitions", definitions) + if err := s.NextErr(); err != nil { + return nil, nil, errors.Trace(err) + } + + var existing []string + var mismatched []string + for _, definition := range definitions { + if added, ok := s.ensureDefinition(definition); !added { + existing = append(existing, definition.Name) + if !ok { + mismatched = append(mismatched, definition.Name) + } + } + } + return existing, mismatched, nil +} + +func (s *fakeProcsPersistence) ensureDefinition(definition charm.Process) (bool, bool) { + if expected, ok := s.definitions[definition.Name]; ok { + if !reflect.DeepEqual(&definition, expected) { + return false, false + } + return false, true + } else { + s.setDefinitions(&definition) + return true, true + } +} + +func (s *fakeProcsPersistence) Insert(info process.Info) (bool, error) { + s.AddCall("Insert", info) + if err := s.NextErr(); err != nil { + return false, errors.Trace(err) + } + + if _, ok := s.procs[info.ID()]; ok { + return false, nil + } + s.setProcesses(&info) + return true, nil +} + +func (s *fakeProcsPersistence) SetStatus(id string, status process.Status) (bool, error) { + s.AddCall("SetStatus", id, status) + if err := s.NextErr(); err != nil { + return false, errors.Trace(err) + } + + proc, ok := s.procs[id] + if !ok { + return false, nil + } + proc.Details.Status = status + return true, nil +} + +func (s *fakeProcsPersistence) List(ids ...string) ([]process.Info, []string, error) { + s.AddCall("List", ids) + if err := s.NextErr(); err != nil { + return nil, nil, errors.Trace(err) + } + + var procs []process.Info + var missing []string + for _, id := range ids { + if proc, ok := s.procs[id]; !ok { + missing = append(missing, id) + } else { + for _, inconsistent := range s.inconsistent { + if id == inconsistent { + return nil, nil, errors.NotValidf(id) + } + } + procs = append(procs, *proc) + } + } + return procs, missing, nil +} + +func (s *fakeProcsPersistence) ListAll() ([]process.Info, error) { + s.AddCall("ListAll") + if err := s.NextErr(); err != nil { + return nil, errors.Trace(err) + } + + var procs []process.Info + for _, proc := range s.procs { + procs = append(procs, *proc) + } + return procs, nil +} + +func (s *fakeProcsPersistence) Remove(id string) (bool, error) { + s.AddCall("Remove", id) + if err := s.NextErr(); err != nil { + return false, errors.Trace(err) + } + + if _, ok := s.procs[id]; !ok { + return false, nil + } + for _, inconsistent := range s.inconsistent { + if id == inconsistent { + return false, errors.NotValidf(id) + } + } + delete(s.procs, id) + // TODO(ericsnow) Remove definition if appropriate. + return true, nil +} diff --git a/process/state/definitions.go b/process/state/definitions.go new file mode 100644 index 00000000000..dd1234128bd --- /dev/null +++ b/process/state/definitions.go @@ -0,0 +1,50 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state + +import ( + "github.com/juju/errors" + "github.com/juju/names" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process/persistence" +) + +// The persistence methods needed for workload processes in state. +type definitionsPersistence interface { + EnsureDefinitions(definitions ...charm.Process) ([]string, []string, error) +} + +// Definitions provides the definition-related functionality +// needed by state. +type Definitions struct { + // Persist is the persistence layer that will be used. + Persist definitionsPersistence +} + +// NewDefinitions builds a new Definitions for the charm. +func NewDefinitions(st persistence.PersistenceBase, charm names.CharmTag) *Definitions { + persist := persistence.NewPersistence(st, &charm, nil) + return &Definitions{ + Persist: persist, + } +} + +// EnsureDefined makes sure that all the provided definitions exist in +// state. So either they are there already or they get added. +func (pd Definitions) EnsureDefined(definitions ...charm.Process) error { + for _, definition := range definitions { + if err := definition.Validate(); err != nil { + return errors.NewNotValid(err, "bad definition") + } + } + _, mismatched, err := pd.Persist.EnsureDefinitions(definitions...) + if err != nil { + return errors.Trace(err) + } + if len(mismatched) > 0 { + return errors.NotValidf("mismatched definitions for %v", mismatched) + } + return nil +} diff --git a/process/state/definitions_test.go b/process/state/definitions_test.go new file mode 100644 index 00000000000..b64227d95f8 --- /dev/null +++ b/process/state/definitions_test.go @@ -0,0 +1,72 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state_test + +import ( + "github.com/juju/errors" + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process/state" +) + +var _ = gc.Suite(&processDefinitionsSuite{}) + +type processDefinitionsSuite struct { + baseProcessesSuite +} + +func (s *processDefinitionsSuite) TestEnsureDefinedOkay(c *gc.C) { + definitions := s.newDefinitions("docker", "procA", "procB") + pd := state.Definitions{Persist: s.persist} + err := pd.EnsureDefined(definitions...) + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "EnsureDefinitions") + s.persist.checkDefinitions(c, definitions) +} + +func (s *processDefinitionsSuite) TestEnsureDefinedNoOp(c *gc.C) { + pd := state.Definitions{Persist: s.persist} + err := pd.EnsureDefined() + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "EnsureDefinitions") + c.Check(s.persist.definitions, gc.HasLen, 0) +} + +func (s *processDefinitionsSuite) TestEnsureDefinedBadDefinition(c *gc.C) { + definitions := s.newDefinitions("docker", "procA", "procB") + definitions = append(definitions, charm.Process{}) + pd := state.Definitions{Persist: s.persist} + err := pd.EnsureDefined(definitions...) + + c.Check(err, jc.Satisfies, errors.IsNotValid) +} + +func (s *processDefinitionsSuite) TestEnsureDefinedMatched(c *gc.C) { + same := charm.Process{Name: "procA", Type: "docker"} + s.persist.setDefinitions(&same) + + definitions := s.newDefinitions("docker", "procA", "procB") + pd := state.Definitions{Persist: s.persist} + err := pd.EnsureDefined(definitions...) + c.Assert(err, jc.ErrorIsNil) + + s.persist.checkDefinitions(c, definitions) +} + +func (s *processDefinitionsSuite) TestEnsureDefinedMismatched(c *gc.C) { + same := charm.Process{Name: "procA", Type: "docker"} + different := charm.Process{Name: "procB", Type: "kvm"} + s.persist.setDefinitions(&same, &different) + + definitions := s.newDefinitions("docker", "procA", "procB", "procC") + definitions = append(definitions, same) + pd := state.Definitions{Persist: s.persist} + err := pd.EnsureDefined(definitions...) + + c.Check(err, jc.Satisfies, errors.IsNotValid) +} diff --git a/process/state/package_test.go b/process/state/package_test.go new file mode 100644 index 00000000000..acedb40ebfa --- /dev/null +++ b/process/state/package_test.go @@ -0,0 +1,14 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state_test + +import ( + "testing" + + gc "gopkg.in/check.v1" +) + +func Test(t *testing.T) { + gc.TestingT(t) +} diff --git a/process/state/processes.go b/process/state/processes.go new file mode 100644 index 00000000000..64a3ca306aa --- /dev/null +++ b/process/state/processes.go @@ -0,0 +1,121 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state + +import ( + "github.com/juju/errors" + "github.com/juju/names" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process" + "github.com/juju/juju/process/persistence" +) + +// TODO(ericsnow) Add names.ProcessTag and use it here? + +// TODO(ericsnow) We need a worker to clean up dying procs. + +// TODO(ericsnow) Export ProcessesPersistence? + +// The persistence methods needed for workload processes in state. +type processesPersistence interface { + EnsureDefinitions(definitions ...charm.Process) ([]string, []string, error) + Insert(info process.Info) (bool, error) + SetStatus(id string, status process.Status) (bool, error) + List(ids ...string) ([]process.Info, []string, error) + ListAll() ([]process.Info, error) + Remove(id string) (bool, error) +} + +// UnitProcesses provides the functionality related to a unit's +// processes, as needed by state. +type UnitProcesses struct { + // Persist is the persistence layer that will be used. + Persist processesPersistence + // Unit identifies the unit associated with the processes. + Unit names.UnitTag +} + +// NewUnitProcesses builds a UnitProcesses for a charm/unit. +func NewUnitProcesses(st persistence.PersistenceBase, unit names.UnitTag, charm *names.CharmTag) *UnitProcesses { + persist := persistence.NewPersistence(st, charm, &unit) + return &UnitProcesses{ + Persist: persist, + Unit: unit, + } +} + +// Register adds the provided process info to state. +func (ps UnitProcesses) Register(info process.Info) error { + if err := info.Validate(); err != nil { + return errors.NewNotValid(err, "bad process info") + } + + _, mismatched, err := ps.Persist.EnsureDefinitions(info.Process) + if err != nil { + return errors.Trace(err) + } + if len(mismatched) > 0 { + return errors.NotValidf("mismatched definition for %q", info.Name) + } + + ok, err := ps.Persist.Insert(info) + if err != nil { + // TODO(ericsnow) Remove the definition we may have just added? + return errors.Trace(err) + } + if !ok { + // TODO(ericsnow) Remove the definition we may have just added? + return errors.NotValidf("process %s (already in state)", info.ID()) + } + + return nil +} + +// SetStatus updates the raw status for the identified process to the +// provided value. +func (ps UnitProcesses) SetStatus(id string, status process.Status) error { + found, err := ps.Persist.SetStatus(id, status) + if err != nil { + return errors.Trace(err) + } + if !found { + return errors.NotFoundf(id) + } + return nil +} + +// List builds the list of process information for the provided process +// IDs. If none are provided then the list contains the info for all +// workload processes associated with the unit. Missing processes +// are ignored. +func (ps UnitProcesses) List(ids ...string) ([]process.Info, error) { + if len(ids) == 0 { + results, err := ps.Persist.ListAll() + if err != nil { + return nil, errors.Trace(err) + } + return results, nil + } + + results, _, err := ps.Persist.List(ids...) + if err != nil { + return nil, errors.Trace(err) + } + // TODO(ericsnow) Ensure that the number returned matches the + // number expected. + return results, nil +} + +// Unregister removes the identified process from state. It does not +// trigger the actual destruction of the process. +func (ps UnitProcesses) Unregister(id string) error { + // If the record wasn't found then we're already done. + _, err := ps.Persist.Remove(id) + if err != nil { + return errors.Trace(err) + } + // TODO(ericsnow) Remove unit-based definition when no procs left. + return nil +} diff --git a/process/state/processes_test.go b/process/state/processes_test.go new file mode 100644 index 00000000000..5bd6cdcdeb3 --- /dev/null +++ b/process/state/processes_test.go @@ -0,0 +1,210 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state_test + +import ( + "github.com/juju/errors" + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process" + "github.com/juju/juju/process/state" +) + +var _ = gc.Suite(&unitProcessesSuite{}) + +type unitProcessesSuite struct { + baseProcessesSuite +} + +func (s *unitProcessesSuite) TestRegisterOkay(c *gc.C) { + procs := s.newProcesses("docker", "procA") + proc := procs[0] + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "EnsureDefinitions", "Insert") + s.persist.checkProcesses(c, procs) +} + +func (s *unitProcessesSuite) TestRegisterInvalid(c *gc.C) { + proc := s.newProcesses("", "procA")[0] + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + + c.Check(err, jc.Satisfies, errors.IsNotValid) +} + +func (s *unitProcessesSuite) TestRegisterEnsureDefinitionFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + proc := s.newProcesses("docker", "procA")[0] + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *unitProcessesSuite) TestRegisterMismatchedDefinition(c *gc.C) { + s.persist.setDefinitions(&charm.Process{Name: "procA", Type: "kvm"}) + proc := s.newProcesses("docker", "procA")[0] + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + + c.Check(err, jc.Satisfies, errors.IsNotValid) +} + +func (s *unitProcessesSuite) TestRegisterInsertFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(nil, failure) + proc := s.newProcesses("docker", "procA")[0] + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *unitProcessesSuite) TestRegisterAlreadyExists(c *gc.C) { + proc := s.newProcesses("docker", "procA")[0] + s.persist.setProcesses(&proc) + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Register(proc) + + c.Check(err, jc.Satisfies, errors.IsNotValid) +} + +func (s *unitProcessesSuite) TestSetStatusOkay(c *gc.C) { + proc := s.newProcesses("docker", "procA")[0] + s.persist.setProcesses(&proc) + status := process.Status{ + Label: "okay", + } + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.SetStatus(proc.ID(), status) + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "SetStatus") + c.Check(s.persist.procs[proc.ID()].Details.Status, jc.DeepEquals, status) +} + +func (s *unitProcessesSuite) TestSetStatusFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + proc := s.newProcesses("docker", "procA")[0] + s.persist.setProcesses(&proc) + status := process.Status{ + Label: "okay", + } + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.SetStatus(proc.ID(), status) + + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *unitProcessesSuite) TestSetStatusMissing(c *gc.C) { + status := process.Status{ + Label: "okay", + } + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.SetStatus("some-proc", status) + + c.Check(err, jc.Satisfies, errors.IsNotFound) +} + +func (s *unitProcessesSuite) TestListOkay(c *gc.C) { + proc1 := s.newProcesses("docker", "procA")[0] + proc2 := s.newProcesses("docker", "procB")[0] + s.persist.setProcesses(&proc1, &proc2) + + ps := state.UnitProcesses{Persist: s.persist} + procs, err := ps.List(proc1.ID()) + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "List") + c.Check(procs, jc.DeepEquals, []process.Info{proc1}) +} + +func (s *unitProcessesSuite) TestListAll(c *gc.C) { + expected := s.newProcesses("docker", "procA", "procB") + s.persist.setProcesses(&expected[0], &expected[1]) + + ps := state.UnitProcesses{Persist: s.persist} + procs, err := ps.List() + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "ListAll") + c.Check(procs, gc.HasLen, 2) + if procs[0].Name == "procA" { + c.Check(procs[0], jc.DeepEquals, expected[0]) + c.Check(procs[1], jc.DeepEquals, expected[1]) + } else { + c.Check(procs[0], jc.DeepEquals, expected[1]) + c.Check(procs[1], jc.DeepEquals, expected[0]) + } +} + +func (s *unitProcessesSuite) TestListFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + ps := state.UnitProcesses{Persist: s.persist} + _, err := ps.List() + + s.stub.CheckCallNames(c, "ListAll") + c.Check(errors.Cause(err), gc.Equals, failure) +} + +func (s *unitProcessesSuite) TestListMissing(c *gc.C) { + proc := s.newProcesses("docker", "procA")[0] + s.persist.setProcesses(&proc) + + ps := state.UnitProcesses{Persist: s.persist} + procs, err := ps.List(proc.ID(), "procB/xyz") + c.Assert(err, jc.ErrorIsNil) + + c.Check(procs, jc.DeepEquals, []process.Info{proc}) +} + +func (s *unitProcessesSuite) TestUnregisterOkay(c *gc.C) { + proc := s.newProcesses("docker", "procA")[0] + s.persist.setProcesses(&proc) + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Unregister(proc.ID()) + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "Remove") + c.Check(s.persist.procs, gc.HasLen, 0) +} + +func (s *unitProcessesSuite) TestUnregisterMissing(c *gc.C) { + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Unregister("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + s.stub.CheckCallNames(c, "Remove") + c.Check(s.persist.procs, gc.HasLen, 0) +} + +func (s *unitProcessesSuite) TestUnregisterFailed(c *gc.C) { + failure := errors.Errorf("") + s.stub.SetErrors(failure) + + ps := state.UnitProcesses{Persist: s.persist} + err := ps.Unregister("procA/xyz") + + s.stub.CheckCallNames(c, "Remove") + c.Check(errors.Cause(err), gc.Equals, failure) +} diff --git a/state/collections.go b/state/collections.go index 8487177743f..05fdf8b77c2 100644 --- a/state/collections.go +++ b/state/collections.go @@ -98,6 +98,7 @@ var multiEnvCollections = set.NewStrings( unitsC, volumesC, volumeAttachmentsC, + workloadProcessesC, ) func newStateCollection(coll *mgo.Collection, envUUID string) stateCollection { @@ -235,8 +236,11 @@ func (c *envStateCollection) mungeQuery(inq interface{}) bson.D { for _, elem := range inq { switch elem.Name { case "_id": + // TODO(ericsnow) We should be making a copy of elem. if id, ok := elem.Value.(string); ok { elem.Value = addEnvUUID(c.envUUID, id) + } else if subquery, ok := elem.Value.(bson.D); ok { + elem.Value = c.mungeIDSubQuery(subquery) } case "env-uuid": panic("env-uuid is added automatically and should not be provided") @@ -252,6 +256,30 @@ func (c *envStateCollection) mungeQuery(inq interface{}) bson.D { return outq } +// TODO(ericsnow) Is it okay to add support for $in? + +func (c *envStateCollection) mungeIDSubQuery(inq bson.D) bson.D { + var outq bson.D + for _, elem := range inq { + newElem := elem // copied + switch elem.Name { + case "$in": + ids, ok := elem.Value.([]string) + if !ok { + panic("$in requires []string") + } + var fullIDs []string + for _, id := range ids { + fullID := addEnvUUID(c.envUUID, id) + fullIDs = append(fullIDs, fullID) + } + newElem.Value = fullIDs + } + outq = append(outq, newElem) + } + return outq +} + func addEnvUUID(envUUID, id string) string { prefix := envUUID + ":" if strings.HasPrefix(id, prefix) { diff --git a/state/persistence.go b/state/persistence.go new file mode 100644 index 00000000000..3213893103e --- /dev/null +++ b/state/persistence.go @@ -0,0 +1,66 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state + +import ( + "github.com/juju/errors" + jujutxn "github.com/juju/txn" + "gopkg.in/mgo.v2" +) + +// Persistence exposes persistence-layer functionality of State. +type Persistence interface { + // One populates doc with the document corresponding to the given + // ID. Missing documents result in errors.NotFound. + One(collName, id string, doc interface{}) error + // All populates docs with the list of the documents corresponding + // to the provided query. + All(collName string, query, docs interface{}) error + // Run runs the transaction generated by the provided factory + // function. It may be retried several times. + Run(transactions jujutxn.TransactionSource) error +} + +type statePersistence struct { + st *State +} + +// newPersistence builds a new StatePersistence that wraps State. +func (st *State) newPersistence() Persistence { + return &statePersistence{st: st} +} + +// One gets the identified document from the collection. +func (sp statePersistence) One(collName, id string, doc interface{}) error { + coll, closeColl := sp.st.getCollection(collName) + defer closeColl() + + err := coll.FindId(id).One(doc) + if err == mgo.ErrNotFound { + return errors.NotFoundf(id) + } + if err != nil { + return errors.Trace(err) + } + return nil +} + +// All gets all documents from the collection matching the query. +func (sp statePersistence) All(collName string, query, docs interface{}) error { + coll, closeColl := sp.st.getCollection(collName) + defer closeColl() + + if err := coll.Find(query).All(docs); err != nil { + return errors.Trace(err) + } + return nil +} + +// Run runs the transaction produced by the provided factory function. +func (sp statePersistence) Run(transactions jujutxn.TransactionSource) error { + if err := sp.st.run(transactions); err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/state/processes.go b/state/processes.go new file mode 100644 index 00000000000..9d72d4d0e73 --- /dev/null +++ b/state/processes.go @@ -0,0 +1,130 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state + +import ( + "github.com/juju/errors" + "github.com/juju/names" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/process" +) + +// TODO(ericsnow) Track juju-level status in the status collection. + +// UnitProcesses exposes high-level interaction with workload processes +// for a unit. +type UnitProcesses interface { + // Register registers a workload process in state. + Register(info process.Info) error + // SetStatus sets the raw status of a workload process. + SetStatus(id string, status process.Status) error + // List builds the list of workload processes registered for + // the given unit and IDs. If no IDs are provided then all + // registered processes for the unit are returned. + List(ids ...string) ([]process.Info, error) + // Unregister marks the identified process as unregistered. + Unregister(id string) error +} + +// ProcessDefiniitions provides the state functionality related to +// workload process definitions. +type ProcessDefinitions interface { + // EnsureDefined adds the definitions to state if they aren't there + // already. If they are there then it verfies that the existing + // definitions match the provided ones. + EnsureDefined(definitions ...charm.Process) error +} + +// TODO(ericsnow) Use a more generic component registration mechanism? + +type newUnitProcessesFunc func(persist Persistence, unit names.UnitTag, charm names.CharmTag) (UnitProcesses, error) + +type newProcessDefinitionsFunc func(persist Persistence, charm names.CharmTag) (ProcessDefinitions, error) + +var ( + newUnitProcesses newUnitProcessesFunc + newProcessDefinitions newProcessDefinitionsFunc +) + +// SetProcessesComponent registers the functions that provide the state +// functionality related to workload processes. +func SetProcessesComponent(upFunc newUnitProcessesFunc, pdFunc newProcessDefinitionsFunc) { + newUnitProcesses = upFunc + newProcessDefinitions = pdFunc +} + +type unitProcesses struct { + UnitProcesses + charm *Charm + st *State +} + +// UnitProcesses exposes interaction with workload processes in state +// for a the given unit. +func (st *State) UnitProcesses(unit names.UnitTag) (UnitProcesses, error) { + if newUnitProcesses == nil { + return nil, errors.Errorf("unit processes not supported") + } + + // TODO(ericsnow) State.unitCharm is sometimes wrong... + // TODO(ericsnow) Do we really need the charm tag? + charm, err := st.unitCharm(unit) + if err != nil { + return nil, errors.Trace(err) + } + charmTag := charm.Tag().(names.CharmTag) + + persist := st.newPersistence() + unitProcs, err := newUnitProcesses(persist, unit, charmTag) + if err != nil { + return nil, errors.Trace(err) + } + + return &unitProcesses{ + // TODO(ericsnow) Eliminate the dependency on process/state? + UnitProcesses: unitProcs, + charm: charm, + st: st, + }, nil +} + +// List implements UnitProcesses. It also ensures that all of the +// process definitions in the charm's metadata are added to state. +func (up *unitProcesses) List(ids ...string) ([]process.Info, error) { + if len(ids) == 0 { + // TODO(ericsnow) Instead call st.defineProcesses when a charm is added? + if err := up.st.defineProcesses(up.charm.Tag().(names.CharmTag), *up.charm.Meta()); err != nil { + return nil, errors.Trace(err) + } + } + + return up.UnitProcesses.List(ids...) +} + +// TODO(ericsnow) DestroyProcess: Mark the proc as Dying. + +// defineProcesses adds the workload process definitions from the provided +// charm metadata to state. +func (st *State) defineProcesses(charmTag names.CharmTag, meta charm.Meta) error { + if newProcessDefinitions == nil { + return errors.Errorf("process definitions not supported") + } + + var definitions []charm.Process + for _, definition := range meta.Processes { + definitions = append(definitions, definition) + } + + persist := st.newPersistence() + pd, err := newProcessDefinitions(persist, charmTag) + if err != nil { + return errors.Trace(err) + } + + if err := pd.EnsureDefined(definitions...); err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/state/processes_test.go b/state/processes_test.go new file mode 100644 index 00000000000..8a62a4fb3bf --- /dev/null +++ b/state/processes_test.go @@ -0,0 +1,142 @@ +// Copyright 2015 Canonical Ltd. +// Licensed under the AGPLv3, see LICENCE file for details. + +package state_test + +import ( + "github.com/juju/names" + jc "github.com/juju/testing/checkers" + gc "gopkg.in/check.v1" + "gopkg.in/juju/charm.v5" + + "github.com/juju/juju/component/all" + "github.com/juju/juju/process" +) + +func init() { + if err := all.RegisterForServer(); err != nil { + panic(err) + } +} + +var _ = gc.Suite(&unitProcessesSuite{}) + +type unitProcessesSuite struct { + ConnSuite +} + +const metaYAML = ` +name: a-charm +summary: a charm... +description: a charm... +processes: + procA: + type: docker + command: do-something cool + image: spam/eggs + env: + IMPORTANT: YES +` + +func (s *unitProcessesSuite) addUnit(c *gc.C, charmName, serviceName, meta string) (names.CharmTag, names.UnitTag) { + ch := s.AddTestingCharm(c, charmName) + ch = s.AddMetaCharm(c, charmName, meta, 2) + + svc := s.AddTestingService(c, serviceName, ch) + unit, err := svc.AddUnit() + c.Assert(err, jc.ErrorIsNil) + + charmTag := ch.Tag().(names.CharmTag) + unitTag := unit.UnitTag() + return charmTag, unitTag +} + +func (s *unitProcessesSuite) TestFunctional(c *gc.C) { + _, unitTag := s.addUnit(c, "dummy", "a-service", metaYAML) + + st, err := s.State.UnitProcesses(unitTag) + c.Assert(err, jc.ErrorIsNil) + + procs, err := st.List() + c.Assert(err, jc.ErrorIsNil) + c.Check(procs, jc.DeepEquals, []process.Info{{ + Process: charm.Process{ + Name: "procA", + Type: "docker", + Command: "do-something cool", + Image: "spam/eggs", + EnvVars: map[string]string{ + // TODO(erisnow) YAML coerces YES into true... + "IMPORTANT": "true", + }, + }, + }}) + + info := process.Info{ + Process: charm.Process{ + Name: "procA", + Type: "docker", + Command: "do-something cool", + Image: "spam/eggs", + EnvVars: map[string]string{ + "IMPORTANT": "true", + }, + }, + Details: process.Details{ + ID: "xyz", + Status: process.Status{ + Label: "running", + }, + }, + } + err = st.Register(info) + c.Assert(err, jc.ErrorIsNil) + + procs, err = st.List() + c.Assert(err, jc.ErrorIsNil) + c.Check(procs, jc.DeepEquals, []process.Info{info}) + + procs, err = st.List("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + c.Check(procs, jc.DeepEquals, []process.Info{info}) + + err = st.SetStatus("procA/xyz", process.Status{Label: "still running"}) + c.Assert(err, jc.ErrorIsNil) + + procs, err = st.List("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + c.Check(procs, jc.DeepEquals, []process.Info{{ + Process: charm.Process{ + Name: "procA", + Type: "docker", + Command: "do-something cool", + Image: "spam/eggs", + EnvVars: map[string]string{ + "IMPORTANT": "true", + }, + }, + Details: process.Details{ + ID: "xyz", + Status: process.Status{ + Label: "still running", + }, + }, + }}) + + err = st.Unregister("procA/xyz") + c.Assert(err, jc.ErrorIsNil) + + procs, err = st.List() + c.Assert(err, jc.ErrorIsNil) + c.Check(procs, jc.DeepEquals, []process.Info{{ + Process: charm.Process{ + Name: "procA", + Type: "docker", + Command: "do-something cool", + Image: "spam/eggs", + EnvVars: map[string]string{ + "IMPORTANT": "true", + }, + }, + }}) +} diff --git a/state/state.go b/state/state.go index c37e3613236..5b0a881de0d 100644 --- a/state/state.go +++ b/state/state.go @@ -118,6 +118,10 @@ const ( // _id field of each collection is a concatenation of multiple fields // that form a compound index. userenvnameC = "userenvname" + + // workloadProcessesC is the name of the collection holding info + // about workload processes. + workloadProcessesC = "workloadprocesses" // TODO(ericsnow) Alias the const? ) // State represents the state of an environment diff --git a/state/unit.go b/state/unit.go index f89942ba085..caffe53fd8d 100644 --- a/state/unit.go +++ b/state/unit.go @@ -1061,6 +1061,30 @@ func (u *Unit) SetCharmURL(curl *charm.URL) error { return err } +func (u *Unit) charm() (*Charm, error) { + s, err := u.Service() + if err != nil { + return nil, errors.Annotatef(err, "getting service for unit %v", u.Tag().Id()) + } + ch, _, err := s.Charm() + if err != nil { + return nil, errors.Annotatef(err, "getting charm for unit %q", u.Tag().Id()) + } + return ch, nil +} + +func (st *State) unitCharm(unitTag names.UnitTag) (*Charm, error) { + u, err := st.Unit(unitTag.Id()) + if err != nil { + return nil, errors.Trace(err) + } + ch, err := u.charm() + if err != nil { + return nil, errors.Trace(err) + } + return ch, nil +} + // AgentPresence returns whether the respective remote agent is alive. func (u *Unit) AgentPresence() (bool, error) { return u.st.pwatcher.Alive(u.globalAgentKey())