Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Improve visor conn/snapshot/file abstractions #52

Merged
merged 5 commits into from

3 participants

@cloudhead
  • Get rid of (*Snapshot).Conn()
  • Use new snapshot methods instead of conn
  • Coordinator types embed Path instead of Snapshot
  • The 'Path' type wraps a Snapshot, while providing prefixed operations.
@cloudhead cloudhead was assigned
@xla
Owner

:shipit:

@kesselborn
Owner

:+1:

@cloudhead cloudhead merged commit 5492a0d into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 17, 2012
  1. @cloudhead
  2. @cloudhead
  3. @cloudhead

    Coordinator types embed Path instead of Snapshot

    cloudhead authored
    The 'Path' type wraps a Snapshot, while providing prefixed
    operations.
  4. @cloudhead

    Add path.go ;)

    cloudhead authored
  5. @cloudhead
This page is out of date. Refresh to see the latest.
View
45 app.go
@@ -20,7 +20,7 @@ const SERVICE_PROC_DEFAULT = "web"
type Env map[string]string
type App struct {
- Snapshot
+ Path
Name string
RepoUrl string
Stack Stack
@@ -30,7 +30,8 @@ type App struct {
// NewApp returns a new App given a name, repository url and stack.
func NewApp(name string, repourl string, stack Stack, snapshot Snapshot) (app *App) {
- app = &App{Name: name, RepoUrl: repourl, Stack: stack, Snapshot: snapshot, Env: Env{}}
+ app = &App{Name: name, RepoUrl: repourl, Stack: stack, Env: Env{}}
+ app.Path = Path{snapshot, path.Join(APPS_PATH, app.Name)}
return
}
@@ -49,7 +50,7 @@ func (a *App) FastForward(rev int64) (app *App) {
// Register adds the App to the global process state.
func (a *App) Register() (app *App, err error) {
- exists, _, err := a.conn.Exists(a.Path())
+ exists, _, err := a.conn.Exists(a.Path.Dir)
if err != nil {
return nil, fmt.Errorf("application '%s' is already registered", a.Name)
}
@@ -61,7 +62,7 @@ func (a *App) Register() (app *App, err error) {
a.DeployType = DEPLOY_LXC
}
- attrs := &File{a.Snapshot, a.Path() + "/attrs", map[string]interface{}{
+ attrs := &File{a.Snapshot, a.Path.Prefix("attrs"), map[string]interface{}{
"repo-url": a.RepoUrl,
"stack": string(a.Stack),
"deploy-type": a.DeployType,
@@ -79,7 +80,7 @@ func (a *App) Register() (app *App, err error) {
}
}
- rev, err := a.setPath("registered", time.Now().UTC().String())
+ rev, err := a.Set("registered", time.Now().UTC().String())
if err != nil {
return
}
@@ -91,12 +92,12 @@ func (a *App) Register() (app *App, err error) {
// Unregister removes the App form the global process state.
func (a *App) Unregister() error {
- return a.conn.Del(a.Path(), a.Rev)
+ return a.Del("/")
}
// EnvironmentVars returns all set variables for this app as a map.
func (a *App) EnvironmentVars() (vars Env, err error) {
- varNames, err := a.conn.Getdir(a.Path()+"/env", a.Rev)
+ varNames, err := a.Getdir(a.Path.Prefix("env"))
vars = Env{}
@@ -125,7 +126,7 @@ func (a *App) EnvironmentVars() (vars Env, err error) {
// GetEnvironmentVar returns the value stored for the given key.
func (a *App) GetEnvironmentVar(k string) (value string, err error) {
k = strings.Replace(k, "_", "-", -1)
- val, _, err := a.conn.Get(a.Path()+"/env/"+k, &a.Rev)
+ val, _, err := a.Get("env/" + k)
if err != nil {
return
}
@@ -136,7 +137,7 @@ func (a *App) GetEnvironmentVar(k string) (value string, err error) {
// SetEnvironmentVar stores the value for the given key.
func (a *App) SetEnvironmentVar(k string, v string) (app *App, err error) {
- rev, err := a.setPath("env/"+strings.Replace(k, "_", "-", -1), v)
+ rev, err := a.Set("env/"+strings.Replace(k, "_", "-", -1), v)
if err != nil {
return
}
@@ -149,7 +150,7 @@ func (a *App) SetEnvironmentVar(k string, v string) (app *App, err error) {
// DelEnvironmentVar removes the env variable for the given key.
func (a *App) DelEnvironmentVar(k string) (app *App, err error) {
- err = a.delPath("env/" + k)
+ err = a.Del("env/" + k)
if err != nil {
return
}
@@ -159,14 +160,14 @@ func (a *App) DelEnvironmentVar(k string) (app *App, err error) {
// GetProcTypes returns all registered ProcTypes for the App
func (a *App) GetProcTypes() (ptys []*ProcType, err error) {
- p := path.Join(a.Path(), PROCS_PATH)
+ p := a.Path.Prefix(PROCS_PATH)
exists, _, err := a.conn.Exists(p)
if err != nil || !exists {
return
}
- pNames, err := a.conn.Getdir(p, a.Snapshot.FastForward(-1).Rev)
+ pNames, err := a.FastForward(-1).Getdir(p)
if err != nil {
return
}
@@ -193,27 +194,11 @@ func (a *App) Inspect() string {
return fmt.Sprintf("%#v", a)
}
-// Path returns the path for the App in the global process state.
-func (a *App) Path() (p string) {
- return path.Join(APPS_PATH, a.Name)
-}
-
-func (a *App) prefixPath(p string) string {
- return path.Join(a.Path(), p)
-}
-
-func (a *App) setPath(k string, v string) (rev int64, err error) {
- return a.conn.Set(a.prefixPath(k), a.Rev, []byte(v))
-}
-func (a *App) delPath(k string) error {
- return a.conn.Del(a.prefixPath(k), a.Rev)
-}
-
// GetApp fetches an app with the given name.
func GetApp(s Snapshot, name string) (app *App, err error) {
app = NewApp(name, "", "", s)
- f, err := Get(s, app.Path()+"/attrs", new(JSONCodec))
+ f, err := Get(s, app.Path.Prefix("attrs"), new(JSONCodec))
if err != nil {
return nil, err
}
@@ -234,7 +219,7 @@ func Apps(s Snapshot) (apps []*App, err error) {
return
}
- names, err := s.conn.Getdir(APPS_PATH, s.FastForward(-1).Rev)
+ names, err := s.FastForward(-1).Getdir(APPS_PATH)
if err != nil {
return
}
View
10 app_test.go
@@ -22,7 +22,7 @@ func appSetup(name string) (app *App) {
panic(err)
}
- app = &App{Name: name, RepoUrl: "git://cat.git", Stack: "whiskers", Snapshot: s, Env: Env{}}
+ app = NewApp(name, "git://cat.git", "whiskers", s)
app = app.FastForward(rev)
return
@@ -31,7 +31,7 @@ func appSetup(name string) (app *App) {
func TestAppRegistration(t *testing.T) {
app := appSetup("lolcatapp")
- check, _, err := app.conn.Exists(app.Path())
+ check, _, err := app.conn.Exists(app.Path.Dir)
if err != nil {
t.Error(err)
return
@@ -46,7 +46,7 @@ func TestAppRegistration(t *testing.T) {
t.Error(err)
return
}
- check, _, err = app2.conn.Exists(app.Path())
+ check, _, err = app2.conn.Exists(app.Path.Dir)
if err != nil {
t.Error(err)
return
@@ -104,7 +104,7 @@ func TestAppUnregistration(t *testing.T) {
return
}
- check, _, err := app.conn.Exists(app.Path())
+ check, _, err := app.conn.Exists(app.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -215,7 +215,7 @@ func TestApps(t *testing.T) {
names := []string{"cat", "dog", "lol"}
for i := range names {
- a := NewApp(names[i], "zebra", "joke", app.Snapshot)
+ a := NewApp(names[i], "zebra", "joke", app.Path.Snapshot)
_, err := a.Register()
if err != nil {
t.Error(err)
View
2  cmd/app-services.go
@@ -37,7 +37,7 @@ func runAppServices(cmd *Command, args []string) {
os.Exit(2)
}
- proxies, err := app.Conn().Getdir("/proxies", app.Snapshot.Rev)
+ proxies, err := app.Snapshot.Getdir("/proxies")
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching proxy addresses %s\n", err.Error())
os.Exit(2)
View
7 conn.go
@@ -41,7 +41,12 @@ func (c *Conn) Stat(path string) (len int, pathrev int64, err error) {
// Exists returns true or false depending on if the path exists
func (c *Conn) Exists(path string) (exists bool, pathrev int64, err error) {
- _, pathrev, err = c.conn.Stat(c.prefixPath(path), nil)
+ return c.ExistsRev(path, nil)
+}
+
+// Exists returns true or false depending on if the path exists
+func (c *Conn) ExistsRev(path string, rev *int64) (exists bool, pathrev int64, err error) {
+ _, pathrev, err = c.conn.Stat(c.prefixPath(path), rev)
if err != nil {
return
}
View
2  event_test.go
@@ -27,7 +27,7 @@ func eventSetup(name string) (s Snapshot, app *App, l chan *Event) {
s = s.FastForward(rev)
- app = &App{Name: name, RepoUrl: "git://" + name, Stack: Stack(name + "stack"), Snapshot: s}
+ app = NewApp(name, "git://"+name, Stack(name+"stack"), s)
l = make(chan *Event)
return
View
2  file.go
@@ -44,7 +44,7 @@ func (f *File) FastForward(rev int64) *File {
// Del deletes a file
func (f *File) Del() error {
- return f.conn.Del(f.Path, f.Rev)
+ return f.Snapshot.Del(f.Path)
}
// Create creates a file from its Value attribute
View
38 instance.go
@@ -49,7 +49,7 @@ func (i InstanceInfo) LogString() string {
// An Instance represents a running process of a specific type.
type Instance struct {
- Snapshot
+ Path
ProcType *ProcType // ProcType the instance belongs to
Revision *Revision
Addr *net.TCPAddr // TCP address of the running instance
@@ -68,7 +68,8 @@ func NewInstance(pty *ProcType, rev *Revision, addr string, snapshot Snapshot) (
ProcType: pty,
Revision: rev,
State: InsStateInitial,
- Snapshot: snapshot}
+ }
+ ins.Path = Path{snapshot, "/instances/" + ins.Id()}
return
}
@@ -80,12 +81,14 @@ func (i *Instance) FastForward(rev int64) *Instance {
}
func (i *Instance) createSnapshot(rev int64) Snapshotable {
- return &Instance{Addr: i.Addr, State: i.State, ProcType: i.ProcType, Revision: i.Revision, Snapshot: Snapshot{rev, i.conn}}
+ tmp := *i
+ tmp.Snapshot = Snapshot{rev, i.conn}
+ return &tmp
}
// Register registers an instance with the registry.
func (i *Instance) Register() (instance *Instance, err error) {
- exists, _, err := i.conn.Exists(i.Path())
+ exists, _, err := i.conn.Exists(i.Path.Dir)
if err != nil {
return
}
@@ -93,17 +96,17 @@ func (i *Instance) Register() (instance *Instance, err error) {
return nil, ErrKeyConflict
}
- rev, err := i.conn.Set(i.Path()+"/info", i.Rev, []byte(i.String()))
+ rev, err := i.Set("info", i.String())
if err != nil {
return i, err
}
- rev, err = i.conn.Set(i.Path()+"/state", i.Rev, []byte(i.State))
+ rev, err = i.Set("state", string(i.State))
if err != nil {
return i, err
}
- now := []byte(time.Now().UTC().String())
+ now := time.Now().UTC().String()
- rev, err = i.conn.Set(i.ProcType.InstancePath(i.Id()), i.Rev, now)
+ rev, err = i.Snapshot.Set(i.ProcType.InstancePath(i.Id()), now)
instance = i.FastForward(rev)
return
@@ -111,33 +114,29 @@ func (i *Instance) Register() (instance *Instance, err error) {
// Unregister unregisters an instance with the registry.
func (i *Instance) Unregister() (err error) {
- rev := i.Rev
-
- err = i.conn.Del(i.ProcType.InstancePath(i.Id()), rev)
+ err = i.Snapshot.Del(i.ProcType.InstancePath(i.Id()))
if err != nil {
return
}
- err = i.conn.Del(i.Path(), rev)
+ err = i.Del("/")
return
}
func (i *InstanceInfo) Unregister(s Snapshot) (err error) {
- rev := s.Rev
-
p := path.Join(APPS_PATH, i.AppName, PROCS_PATH, string(i.ProcessName), INSTANCES_PATH, i.Name)
- err = s.conn.Del(p, rev)
+ err = s.Del(p)
if err != nil {
return
}
- err = s.conn.Del(path.Join(INSTANCES_PATH, i.Name), rev)
+ err = s.Del(path.Join(INSTANCES_PATH, i.Name))
return
}
// UpdateState updates the instance's state file in
// the coordinator to the given value.
func (i *Instance) UpdateState(s State) (ins *Instance, err error) {
- newrev, err := i.conn.Set(i.Path()+"/state", i.Rev, []byte(s))
+ newrev, err := i.Set("state", string(s))
if err != nil {
return
}
@@ -147,11 +146,6 @@ func (i *Instance) UpdateState(s State) (ins *Instance, err error) {
return
}
-// Path returns the instance's directory path in the registry.
-func (i *Instance) Path() (path string) {
- return "/instances/" + i.Id()
-}
-
func (i *Instance) Id() string {
return strings.Replace(strings.Replace(i.Addr.String(), ".", "-", -1), ":", "-", -1)
}
View
8 instance_test.go
@@ -54,7 +54,7 @@ func instanceSetup(addr string, pType ProcessName) (ins *Instance) {
func TestInstanceRegister(t *testing.T) {
ins := instanceSetup("localhost:12345", "web")
- check, _, err := ins.conn.Exists(ins.Path())
+ check, _, err := ins.conn.Exists(ins.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -67,7 +67,7 @@ func TestInstanceRegister(t *testing.T) {
t.Error(err)
}
- check, _, err = ins.conn.Exists(ins.Path())
+ check, _, err = ins.conn.Exists(ins.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -123,7 +123,7 @@ func TestInstanceUnregister(t *testing.T) {
t.Error(err)
}
- check, _, err := ins.conn.Exists(ins.Path())
+ check, _, err := ins.conn.Exists(ins.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -153,7 +153,7 @@ func TestInstanceUpdateState(t *testing.T) {
t.Error("Instance wasn't fast forwarded")
}
- val, _, err := newIns.conn.Get(newIns.Path()+"/state", &newIns.Rev)
+ val, _, err := newIns.conn.Get(newIns.Path.Prefix("state"), &newIns.Rev)
if err != nil {
t.Error(err)
}
View
34 path.go
@@ -0,0 +1,34 @@
+package visor
+
+type Path struct {
+ Snapshot
+ Dir string
+}
+
+func (p *Path) Get(key string) (string, int64, error) {
+ return p.Snapshot.Get(p.Prefix(key))
+}
+
+func (p *Path) Set(key string, val string) (int64, error) {
+ return p.Snapshot.Set(p.Prefix(key), val)
+}
+
+func (p *Path) Del(key string) error {
+ return p.Snapshot.Del(p.Prefix(key))
+}
+
+func (p *Path) Prefix(path string, paths ...string) (result string) {
+ if path == "/" {
+ result = p.Dir
+ } else {
+ result = p.Dir + "/" + path
+ }
+ for _, p := range paths {
+ result += "/" + p
+ }
+ return
+}
+
+func (p *Path) String() (dir string) {
+ return p.Dir
+}
View
41 proctype.go
@@ -8,13 +8,12 @@ package visor
import (
"errors"
"fmt"
- "path"
"time"
)
// ProcType represents a process type with a certain scale.
type ProcType struct {
- Snapshot
+ Path
Name ProcessName
App *App
Port int
@@ -23,7 +22,13 @@ type ProcType struct {
const PROCS_PATH = "procs"
func NewProcType(app *App, name ProcessName, s Snapshot) *ProcType {
- return &ProcType{Name: name, App: app, Snapshot: s}
+ return &ProcType{
+ Name: name,
+ App: app,
+ Path: Path{
+ s, app.Path.Prefix(PROCS_PATH, string(name)),
+ },
+ }
}
func (p *ProcType) createSnapshot(rev int64) Snapshotable {
@@ -40,7 +45,7 @@ func (p *ProcType) FastForward(rev int64) *ProcType {
// Register registers a proctype with the registry.
func (p *ProcType) Register() (ptype *ProcType, err error) {
- exists, _, err := p.conn.Exists(p.Path())
+ exists, _, err := p.conn.Exists(p.Path.Dir)
if err != nil {
return
}
@@ -53,14 +58,14 @@ func (p *ProcType) Register() (ptype *ProcType, err error) {
return nil, errors.New(fmt.Sprintf("couldn't claim port: %s", err.Error()))
}
- port := &File{p.Snapshot, p.Path() + "/port", p.Port, new(IntCodec)}
+ port := &File{p.Snapshot, p.Path.Prefix("port"), p.Port, new(IntCodec)}
port, err = port.Create()
if err != nil {
return p, err
}
- rev, err := p.conn.Set(p.Path()+"/registered", p.Rev, []byte(time.Now().UTC().String()))
+ rev, err := p.Set("registered", time.Now().UTC().String())
if err != nil {
return p, err
@@ -72,19 +77,15 @@ func (p *ProcType) Register() (ptype *ProcType, err error) {
// Unregister unregisters a proctype from the registry.
func (p *ProcType) Unregister() (err error) {
- return p.conn.Del(p.Path(), p.Rev)
-}
-
-func (p *ProcType) Path() string {
- return path.Join(p.App.Path(), PROCS_PATH, string(p.Name))
+ return p.Del("/")
}
-func (p *ProcType) InstancePath(Id string) string {
- return path.Join(p.InstancesPath(), Id)
+func (p *ProcType) InstancePath(id string) string {
+ return p.Path.Prefix(INSTANCES_PATH, id)
}
func (p *ProcType) InstancesPath() string {
- return path.Join(p.Path(), INSTANCES_PATH)
+ return p.Path.Prefix(INSTANCES_PATH)
}
func (p *ProcType) GetInstanceNames() (ins []string, err error) {
@@ -93,7 +94,7 @@ func (p *ProcType) GetInstanceNames() (ins []string, err error) {
return
}
- ins, err = p.conn.Getdir(p.InstancesPath(), p.Snapshot.FastForward(-1).Rev)
+ ins, err = p.FastForward(-1).Getdir(p.InstancesPath())
if err != nil {
return
}
@@ -123,19 +124,15 @@ func (p *ProcType) GetInstanceInfos() (ins []*InstanceInfo, err error) {
// GetProcType fetches a ProcType from the coordinator
func GetProcType(s Snapshot, app *App, name ProcessName) (p *ProcType, err error) {
- path := path.Join(app.Path(), PROCS_PATH, string(name))
+ path := app.Path.Prefix(PROCS_PATH, string(name))
port, err := Get(s, path+"/port", new(IntCodec))
if err != nil {
return
}
+ p = NewProcType(app, name, s)
+ p.Port = port.Value.(int)
- p = &ProcType{
- Name: name,
- Snapshot: s,
- App: app,
- Port: port.Value.(int),
- }
return
}
View
4 proctype_test.go
@@ -41,7 +41,7 @@ func TestProcTypeRegister(t *testing.T) {
t.Error(err)
}
- check, _, err := s.Conn().Exists(pty.Path())
+ check, _, err := s.conn.Exists(pty.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -64,7 +64,7 @@ func TestProcTypeUnregister(t *testing.T) {
t.Error(err)
}
- check, _, err := s.Conn().Exists(pty.Path())
+ check, _, err := s.Exists(pty.Path.Dir)
if check {
t.Errorf("proctype %s is still registered", pty)
}
View
27 revision.go
@@ -7,14 +7,13 @@ package visor
import (
"fmt"
- "path"
"time"
)
// A Revision represents an application revision,
// identifiable by its `ref`.
type Revision struct {
- Snapshot
+ Path
App *App
Ref string
ArchiveUrl string
@@ -24,7 +23,8 @@ const REVS_PATH = "revs"
// NewRevision returns a new instance of Revision.
func NewRevision(app *App, ref string, snapshot Snapshot) (rev *Revision) {
- rev = &Revision{App: app, Ref: ref, Snapshot: snapshot}
+ rev = &Revision{App: app, Ref: ref}
+ rev.Path = Path{snapshot, app.Path.Prefix(REVS_PATH, ref)}
return
}
@@ -43,7 +43,7 @@ func (r *Revision) FastForward(rev int64) *Revision {
// Register registers a new Revision with the registry.
func (r *Revision) Register() (revision *Revision, err error) {
- exists, _, err := r.conn.Exists(r.Path())
+ exists, _, err := r.conn.Exists(r.Path.Dir)
if err != nil {
return
}
@@ -51,11 +51,11 @@ func (r *Revision) Register() (revision *Revision, err error) {
return nil, ErrKeyConflict
}
- rev, err := r.conn.Set(r.Path()+"/archive-url", r.Rev, []byte(r.ArchiveUrl))
+ rev, err := r.Set("archive-url", r.ArchiveUrl)
if err != nil {
return
}
- rev, err = r.conn.Set(r.Path()+"/registered", r.Rev, []byte(time.Now().UTC().String()))
+ rev, err = r.Set("registered", time.Now().UTC().String())
if err != nil {
return
}
@@ -67,11 +67,11 @@ func (r *Revision) Register() (revision *Revision, err error) {
// Unregister unregisters a revision from the registry.
func (r *Revision) Unregister() (err error) {
- return r.conn.Del(r.Path(), r.Rev)
+ return r.Del("/")
}
func (r *Revision) SetArchiveUrl(url string) (revision *Revision, err error) {
- rev, err := r.conn.Set(r.Path()+"/archive-url", r.Rev, []byte(url))
+ rev, err := r.Set("archive-url", url)
if err != nil {
return
}
@@ -79,11 +79,6 @@ func (r *Revision) SetArchiveUrl(url string) (revision *Revision, err error) {
return
}
-// Path returns this.Revision's directory path in the registry.
-func (r *Revision) Path() string {
- return path.Join(r.App.Path(), REVS_PATH, r.Ref)
-}
-
func (r *Revision) String() string {
return fmt.Sprintf("Revision<%s:%s>", r.App.Name, r.Ref)
}
@@ -93,7 +88,7 @@ func (r *Revision) Inspect() string {
}
func GetRevision(s Snapshot, app *App, ref string) (r *Revision, err error) {
- path := app.Path() + "/revs/" + ref
+ path := app.Path.Prefix(REVS_PATH + "/" + ref)
codec := new(StringCodec)
f, err := Get(s, path+"/archive-url", codec)
@@ -102,7 +97,7 @@ func GetRevision(s Snapshot, app *App, ref string) (r *Revision, err error) {
}
r = &Revision{
- Snapshot: s,
+ Path: Path{s, path},
App: app,
Ref: ref,
ArchiveUrl: f.Value.(string),
@@ -133,7 +128,7 @@ func Revisions(s Snapshot) (revisions []*Revision, err error) {
// AppRevisions returns an array of all registered revisions belonging
// to the given application.
func AppRevisions(s Snapshot, app *App) (revisions []*Revision, err error) {
- refs, err := s.conn.Getdir(app.Path()+"/revs", s.Rev)
+ refs, err := s.Getdir(app.Path.Prefix("revs"))
if err != nil {
return
}
View
6 revision_test.go
@@ -34,7 +34,7 @@ func TestRevisionRegister(t *testing.T) {
s, app := revSetup()
rev := NewRevision(app, "stable", app.Snapshot)
- check, _, err := s.conn.Exists(rev.Path())
+ check, _, err := s.conn.Exists(rev.Path.Dir)
if err != nil {
t.Error(err)
return
@@ -50,7 +50,7 @@ func TestRevisionRegister(t *testing.T) {
return
}
- check, _, err = s.conn.Exists(rev.Path())
+ check, _, err = s.conn.Exists(rev.Path.Dir)
if err != nil {
t.Error(err)
}
@@ -78,7 +78,7 @@ func TestRevisionUnregister(t *testing.T) {
t.Error(err)
}
- check, _, err := s.conn.Exists(rev.Path())
+ check, _, err := s.conn.Exists(rev.Path.Dir)
if err != nil {
t.Error(err)
}
View
39 snapshot.go
@@ -6,6 +6,7 @@
package visor
import (
+ "fmt"
"github.com/soundcloud/doozer"
)
@@ -58,8 +59,42 @@ func DialUri(uri string, root string) (s Snapshot, err error) {
return
}
-func (s Snapshot) Conn() *Conn {
- return s.conn
+// Exists checks if the specified path exists at this snapshot's revision
+func (s Snapshot) Exists(path string) (bool, int64, error) {
+ return s.conn.ExistsRev(path, &s.Rev)
+}
+
+// Get returns the value at the specified path, at this snapshot's revision
+func (s Snapshot) Get(path string) (string, int64, error) {
+ val, rev, err := s.conn.Get(path, &s.Rev)
+ return string(val), rev, err
+}
+
+// Getdir returns the list of files in the specified directory, at this snapshot's revision
+func (s Snapshot) Getdir(path string) ([]string, error) {
+ return s.conn.Getdir(path, s.Rev)
+}
+
+// Set sets the specfied path's body to the passed value, at this snapshot's revision
+func (s Snapshot) Set(path string, val string) (int64, error) {
+ return s.conn.Set(path, s.Rev, []byte(val))
+}
+
+// Del deletes the file at the specified path, at this snapshot's revision
+func (s Snapshot) Del(path string) error {
+ return s.conn.Del(path, s.Rev)
+}
+
+// Update checks if the specified path exists, and if so, does a (*Snapshot).Set with the passed value.
+func (s Snapshot) Update(path string, val string) (rev int64, err error) {
+ exists, rev, err := s.Exists(path)
+ if err != nil {
+ return
+ }
+ if !exists {
+ return 0, fmt.Errorf("path %s doesn't exist", path)
+ }
+ return s.Set(path, val)
}
func (s Snapshot) createSnapshot(rev int64) Snapshotable {
View
39 ticket.go
@@ -17,7 +17,7 @@ import (
// Ticket carries instructions to start and stop Instances.
type Ticket struct {
- Snapshot
+ Path
Id int64
AppName string
RevisionName string
@@ -81,9 +81,9 @@ func CreateTicket(appName string, revName string, pName ProcessName, op Operatio
RevisionName: revName,
ProcessName: pName,
Op: op,
- Snapshot: s,
source: nil,
Status: TicketStatusUnClaimed,
+ Path: Path{s, "<invalid-path>"},
}
return t.Create()
}
@@ -108,12 +108,13 @@ func (t *Ticket) Create() (tt *Ticket, err error) {
return
}
t.Id = id
+ t.Path.Dir = path.Join(TICKETS_PATH, strconv.FormatInt(t.Id, 10))
- f, err := CreateFile(t.Snapshot, t.prefixPath("op"), t.toArray(), new(ListCodec))
+ f, err := CreateFile(t.Snapshot, t.Path.Prefix("op"), t.toArray(), new(ListCodec))
if err != nil {
return
}
- f, err = CreateFile(t.Snapshot, t.prefixPath("status"), string(t.Status), new(StringCodec))
+ f, err = CreateFile(t.Snapshot, t.Path.Prefix("status"), string(t.Status), new(StringCodec))
if err == nil {
t.Snapshot = t.Snapshot.FastForward(f.Rev)
}
@@ -126,7 +127,7 @@ func (t *Ticket) Claims() (claims []string, err error) {
if err != nil {
return
}
- claims, err = t.conn.Getdir(t.prefixPath("claims"), rev)
+ claims, err = t.conn.Getdir(t.Path.Prefix("claims"), rev)
if err, ok := err.(*doozer.Error); ok && err.Err == doozer.ErrNoEnt {
claims = []string{}
err = nil
@@ -136,7 +137,7 @@ func (t *Ticket) Claims() (claims []string, err error) {
// Claim locks the Ticket to the specified host.
func (t *Ticket) Claim(host string) (*Ticket, error) {
- status, rev, err := t.conn.Get(t.prefixPath("status"), nil)
+ status, rev, err := t.conn.Get(t.Path.Prefix("status"), nil)
if err != nil {
return t, err
}
@@ -144,7 +145,7 @@ func (t *Ticket) Claim(host string) (*Ticket, error) {
return t, fmt.Errorf("ticket status is '%s'", string(status))
}
- _, err = t.conn.Set(t.prefixPath("status"), rev, []byte(TicketStatusClaimed))
+ _, err = t.conn.Set(t.Path.Prefix("status"), rev, []byte(TicketStatusClaimed))
if err != nil {
return t, err
}
@@ -164,7 +165,7 @@ func (t *Ticket) Unclaim(host string) (t1 *Ticket, err error) {
if !exists {
return t, ErrUnauthorized
}
- status, rev, err := t.conn.Get(t.prefixPath("status"), nil)
+ status, rev, err := t.conn.Get(t.Path.Prefix("status"), nil)
if err != nil {
return t, err
}
@@ -172,7 +173,7 @@ func (t *Ticket) Unclaim(host string) (t1 *Ticket, err error) {
return t, fmt.Errorf("can't unclaim ticket, status is '%s'", status)
}
- rev, err = t.conn.Set(t.prefixPath("status"), rev, []byte(TicketStatusUnClaimed))
+ rev, err = t.conn.Set(t.Path.Prefix("status"), rev, []byte(TicketStatusUnClaimed))
if err != nil {
return t, err
}
@@ -189,7 +190,7 @@ func (t *Ticket) Dead(host string) (t1 *Ticket, err error) {
return t, ErrUnauthorized
}
- rev, err := t.conn.Set(t.prefixPath("status"), -1, []byte(TicketStatusDead))
+ rev, err := t.conn.Set(t.Path.Prefix("status"), -1, []byte(TicketStatusDead))
if err != nil {
return t, err
}
@@ -211,7 +212,7 @@ func (t *Ticket) Done(host string) (err error) {
rev = t.Rev
}
- t.conn.Set(t.prefixPath("status"), rev, []byte(TicketStatusDone))
+ t.conn.Set(t.Path.Prefix("status"), rev, []byte(TicketStatusDone))
if err == nil {
t.Status = TicketStatusDone
}
@@ -228,16 +229,8 @@ func (t *Ticket) IdString() string {
return fmt.Sprintf("TICKET[%d]", t.Id)
}
-func (t *Ticket) Path() string {
- return path.Join(TICKETS_PATH, strconv.FormatInt(t.Id, 10))
-}
-
-func (t *Ticket) prefixPath(aPath string) string {
- return path.Join(t.Path(), aPath)
-}
-
func (t *Ticket) claimPath(host string) string {
- return t.prefixPath("claims/" + host)
+ return t.Path.Prefix("claims", host)
}
func Tickets() ([]Ticket, error) {
@@ -301,7 +294,9 @@ func parseTicket(snapshot Snapshot, ev *doozer.Event, body []byte) (t *Ticket, e
return nil, fmt.Errorf("ticket id %s can't be parsed as an int64", idStr)
}
- f, err := Get(snapshot, path.Join(TICKETS_PATH, idStr, "op"), new(ListCodec))
+ p := path.Join(TICKETS_PATH, idStr)
+
+ f, err := Get(snapshot, path.Join(p, "op"), new(ListCodec))
if err != nil {
return t, err
}
@@ -313,7 +308,7 @@ func parseTicket(snapshot Snapshot, ev *doozer.Event, body []byte) (t *Ticket, e
RevisionName: data[1],
ProcessName: ProcessName(data[2]),
Op: NewOperationType(data[3]),
- Snapshot: snapshot,
+ Path: Path{snapshot, p},
source: ev}
return t, err
}
View
21 ticket_test.go
@@ -63,7 +63,7 @@ func TestTicketCreateTicket(t *testing.T) {
t.Error(err)
}
- b, _, err := s.conn.Get(ticket.Path()+"/op", &ticket.Snapshot.Rev)
+ b, _, err := s.conn.Get(ticket.Path.Prefix("op"), &ticket.Snapshot.Rev)
if err != nil {
t.Error(err)
}
@@ -117,13 +117,14 @@ func TestTicketClaim(t *testing.T) {
func TestTicketUnclaim(t *testing.T) {
s, host := ticketSetup()
id := s.Rev
- ticket := &Ticket{Id: id, AppName: "unclaim", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Snapshot: s}
+ p := fmt.Sprintf("tickets/%d", id)
+ ticket := &Ticket{Id: id, AppName: "unclaim", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Path: Path{s, p}}
- rev, err := s.conn.Set("tickets/"+strconv.FormatInt(id, 10)+"/claims/"+host, s.Rev, []byte(host))
+ rev, err := s.conn.Set(p+"/claims/"+host, s.Rev, []byte(host))
if err != nil {
t.Error(err)
}
- rev, err = s.conn.Set("tickets/"+strconv.FormatInt(id, 10)+"/status", s.Rev, []byte("claimed"))
+ rev, err = s.conn.Set(p+"/status", s.Rev, []byte("claimed"))
if err != nil {
t.Error(err)
}
@@ -145,8 +146,8 @@ func TestTicketUnclaim(t *testing.T) {
func TestTicketUnclaimWithWrongLock(t *testing.T) {
s, host := ticketSetup()
- p := "tickets/" + strconv.FormatInt(s.Rev, 10) + "/claims/" + host
- ticket := &Ticket{Id: s.Rev, AppName: "unclaim", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Snapshot: s}
+ p := fmt.Sprintf("tickets/%d/claims/%s", s.Rev, host)
+ ticket := &Ticket{Id: s.Rev, AppName: "unclaim", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Path: Path{s, p}}
rev, err := s.conn.Set(p, s.Rev, []byte(host))
if err != nil {
@@ -162,8 +163,8 @@ func TestTicketUnclaimWithWrongLock(t *testing.T) {
func TestTicketDone(t *testing.T) {
s, host := ticketSetup()
- p := "tickets/" + strconv.FormatInt(s.Rev, 10)
- ticket := &Ticket{Id: s.Rev, AppName: "done", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Snapshot: s}
+ p := fmt.Sprintf("tickets/%d", s.Rev)
+ ticket := &Ticket{Id: s.Rev, AppName: "done", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Path: Path{s, p}}
rev, err := s.conn.Set(p+"/claims/"+host, s.Rev, []byte(host))
if err != nil {
@@ -187,8 +188,8 @@ func TestTicketDone(t *testing.T) {
func TestTicketDoneWithWrongLock(t *testing.T) {
s, host := ticketSetup()
- p := "tickets/" + strconv.FormatInt(s.Rev, 10)
- ticket := &Ticket{Id: s.Rev, AppName: "done", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Snapshot: s}
+ p := fmt.Sprintf("tickets/%d", s.Rev)
+ ticket := &Ticket{Id: s.Rev, AppName: "done", RevisionName: "abcd123", ProcessName: "test", Op: OpStart, Path: Path{s, p}}
_, err := s.conn.Set(p+"/claims/"+host, s.Rev, []byte(host))
if err != nil {
View
6 visor.go
@@ -52,13 +52,13 @@ type Stack string
type State string
func Init(s Snapshot) (rev int64, err error) {
- exists, _, err := s.Conn().Exists(START_PORT_PATH)
+ exists, _, err := s.conn.Exists(START_PORT_PATH)
if err != nil {
return
}
if !exists {
- rev, err = s.Conn().Set(START_PORT_PATH, s.Rev, []byte(strconv.Itoa(START_PORT)))
+ rev, err = s.Set(START_PORT_PATH, strconv.Itoa(START_PORT))
if err != nil {
return
}
@@ -114,7 +114,7 @@ func GetScale(app string, revision string, processName string, s Snapshot) (scal
func SetScale(app string, revision string, processName string, factor int, s Snapshot) (rev int64, err error) {
path := path.Join(APPS_PATH, app, REVS_PATH, revision, SCALE_PATH, processName)
- rev, err = s.conn.Set(path, s.Rev, []byte(strconv.Itoa(factor)))
+ rev, err = s.Set(path, strconv.Itoa(factor))
return
}
Something went wrong with that request. Please try again.