Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

create an explicit interface for a service #34

Merged
merged 5 commits into from

3 participants

@kesselborn
Owner

currently, bazooka-pm and bazooka-proxy services register themselves in the coordinator via the generic file interface. We need to have an extra function for it that we can use for other services like bazooka-repomgr and eventually for something like mysql as well.
Those services need to have the possibility to store env vars on them as well -- just as apps.
Values that are currently saved in /bazooka/meta can all be saved in services:

  • irc
  • storage
  • yammer

...

@kesselborn kesselborn was assigned
@xla

Needs further discussion.

@xla

Assumption is that every service returns one or more addresses. $ADDR consists of ip and port.

visor service get $SERVICE_NAME $PATH
visor service set $SERVICE_NAME $PATH $ADDR
visor service del $SERVICE_NAME $PATH $ADDR
@xla xla was assigned
@xla
xla commented

This is the first step to have discovery via dns based on coordinator information. We start with a simple representation of a service. A service consists of a name and a list of addresses. Through exposing this interface via the cli it should give enough flexibility to interact with it from non-bazooka controlled systems and use it programmatically from other components.

The underlying tree structure looks like this:

/$root
  /services
    /$service_name
      /addrs
        1.2.3.4 = $utc_timestamp
        5.6.7.8 = $utc_timestamp

This structure should give enough flexibility to add more important information like port.

TODO

  • Build service interface
  • Expose a cli
  • Apps should be represented by service entries
  • Implement service events
  • Add more attributes to service endpoints(port, priority, weight)
service.go
((74 lines not shown))
+ rev, err := s.Set(path.Join(ADDRS_PATH, addr), time.Now().UTC().String())
+ if err != nil {
+ return
+ }
+
+ srv = s.FastForward(rev)
+
+ return
+}
+
+// RemoveAddr removes the given address string from the Service.
+func (s *Service) RemoveAddr(addr string) (srv *Service, err error) {
+ _, ok := s.Addrs[addr]
+ if ok {
+ delete(s.Addrs, addr)
+ }

I'd move this to after the s.Del(), to avoid inconsistencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
service.go
((57 lines not shown))
+ return
+ }
+
+ srv = s.FastForward(rev)
+
+ return
+}
+
+// Unregister removes the Service form the global process state.
+func (s *Service) Unregister() error {
+ return s.Del("/")
+}
+
+// AddAddr adds the given address string to the Service.
+func (s *Service) AddAddr(addr string) (srv *Service, err error) {
+ s.Addrs[addr] = true

I'd move this to after the s.Set(), to avoid inconsistencies.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
service.go
((98 lines not shown))
+ return
+}
+
+func (s *Service) String() string {
+ return fmt.Sprintf("Service<%s>", s.Name)
+}
+
+func (s *Service) Inspect() string {
+ return fmt.Sprintf("%#v", s)
+}
+
+func (s *Service) getAddrs() (addrs []string, err error) {
+ addrs, err = s.Getdir(s.Path.Prefix(ADDRS_PATH))
+ if err != nil && err.Error() == "NOENT" {
+ return addrs, nil
+ }

This is patched in current visor, you can use IsErrNoEnt

@xla
xla added a note

On the error object directly?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
service_test.go
((94 lines not shown))
+ addrs := []string{"1.2.3.4", "4.3.2.1"}
+ srv := serviceSetup(name)
+
+ srv, err := srv.Register()
+ if err != nil {
+ t.Error(t)
+ }
+
+ for _, addr := range addrs {
+ srv, err = srv.AddAddr(addr)
+ if err != nil {
+ t.Error(t)
+ }
+ }
+
+ srv2, err := GetService(srv.FastForward(srv.Snapshot.Rev).Snapshot, name)

same as:

GetService(srv.Snapshot, name)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
service_test.go
((129 lines not shown))
+ t.Error(err)
+ }
+
+ for _, addr := range addrs {
+ srv, err = srv.AddAddr(addr)
+ if err != nil {
+ t.Error(t)
+ }
+ }
+
+ srv, err = srv.RemoveAddr("5.6.7.8")
+ if err != nil {
+ t.Error(err)
+ }
+
+ srv2, err := GetService(srv.FastForward(srv.Snapshot.Rev).Snapshot, name)

see above comment :hamster:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@cloudhead cloudhead merged commit 4c75ed0 into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 2, 2012
  1. Implement & test service interface

    Alexander Simmerl authored
  2. Merge branch 'master' into feature/services

    Alexander Simmerl authored
  3. Use check for NoEnt error

    Alexander Simmerl authored
  4. Clean FastForwards and reuse snapshots

    Alexander Simmerl authored
  5. Avoid inconsistencies of in-memory structures

    Alexander Simmerl authored
This page is out of date. Refresh to see the latest.
Showing with 346 additions and 0 deletions.
  1. +163 −0 service.go
  2. +183 −0 service_test.go
View
163 service.go
@@ -0,0 +1,163 @@
+// Copyright (c) 2012, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/soundcloud/visor
+
+package visor
+
+import (
+ "fmt"
+ "path"
+ "time"
+)
+
+const (
+ ADDRS_PATH = "addrs"
+ SERVICES_PATH = "services"
+)
+
+type Service struct {
+ Path
+ Name string
+ Addrs map[string]bool
+}
+
+// NewService returns a new Service given a name.
+func NewService(name string, snapshot Snapshot) (srv *Service) {
+ srv = &Service{Name: name, Addrs: map[string]bool{}}
+ srv.Path = Path{snapshot, path.Join(SERVICES_PATH, srv.Name)}
+
+ return
+}
+
+func (s *Service) createSnapshot(rev int64) Snapshotable {
+ tmp := *s
+ tmp.Snapshot = Snapshot{rev, s.conn}
+ return &tmp
+}
+
+// FastForward advances the service in time. It returns
+// a new instance of Service with the supplied revision.
+func (s *Service) FastForward(rev int64) *Service {
+ return s.Snapshot.fastForward(s, rev).(*Service)
+}
+
+// Register adds the Service to the global process state.
+func (s *Service) Register() (srv *Service, err error) {
+ exists, _, err := s.conn.Exists(s.Path.Dir)
+ if err != nil {
+ return
+ }
+ if exists {
+ return nil, ErrKeyConflict
+ }
+
+ rev, err := s.Set("registered", time.Now().UTC().String())
+ if err != nil {
+ return
+ }
+
+ srv = s.FastForward(rev)
+
+ return
+}
+
+// Unregister removes the Service form the global process state.
+func (s *Service) Unregister() error {
+ return s.Del("/")
+}
+
+// AddAddr adds the given address string to the Service.
+func (s *Service) AddAddr(addr string) (srv *Service, err error) {
+ rev, err := s.Set(path.Join(ADDRS_PATH, addr), time.Now().UTC().String())
+ if err != nil {
+ return
+ }
+
+ srv = s.FastForward(rev)
+
+ s.Addrs[addr] = true
+
+ return
+}
+
+// RemoveAddr removes the given address string from the Service.
+func (s *Service) RemoveAddr(addr string) (srv *Service, err error) {
+ err = s.Del(path.Join(ADDRS_PATH, addr))
+ if err != nil {
+ return
+ }
+
+ srv = s.FastForward(s.Rev + 1)
+
+ _, ok := s.Addrs[addr]
+ if ok {
+ delete(s.Addrs, addr)
+ }
+
+ return
+}
+
+func (s *Service) String() string {
+ return fmt.Sprintf("Service<%s>", s.Name)
+}
+
+func (s *Service) Inspect() string {
+ return fmt.Sprintf("%#v", s)
+}
+
+func (s *Service) getAddrs() (addrs []string, err error) {
+ addrs, err = s.Getdir(s.Path.Prefix(ADDRS_PATH))
+ if err != nil && IsErrNoEnt(err) {
+ return addrs, nil
+ }
+
+ return
+}
+
+// GetService fetches a service with the given name.
+func GetService(s Snapshot, name string) (srv *Service, err error) {
+ srv = NewService(name, s)
+
+ exists, _, err := s.conn.Exists(srv.Path.Dir)
+ if err != nil && !exists {
+ return
+ }
+
+ addrs, err := srv.getAddrs()
+ if err != nil {
+ return
+ }
+
+ for _, addr := range addrs {
+ srv.Addrs[addr] = true
+ }
+
+ return
+}
+
+// Services returns the list of all registered Services.
+func Services(s Snapshot) (srvs []*Service, err error) {
+ exists, _, err := s.conn.Exists(SERVICES_PATH)
+ if err != nil || !exists {
+ return
+ }
+
+ names, err := s.Getdir(SERVICES_PATH)
+ if err != nil {
+ return
+ }
+
+ for _, name := range names {
+ var srv *Service
+
+ srv, err = GetService(s, name)
+ if err != nil {
+ return
+ }
+
+ srvs = append(srvs, srv)
+ }
+
+ return
+}
View
183 service_test.go
@@ -0,0 +1,183 @@
+// Copyright (c) 2012, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/soundcloud/visor
+
+package visor
+
+import (
+ "testing"
+)
+
+func serviceSetup(name string) (srv *Service) {
+ s, err := Dial(DEFAULT_ADDR, "/service-test")
+ if err != nil {
+ panic(err)
+ }
+
+ r, _ := s.conn.Rev()
+ err = s.conn.Del(SERVICES_PATH, r)
+ rev, err := Init(s)
+ if err != nil {
+ panic(err)
+ }
+
+ srv = NewService(name, s)
+ srv = srv.FastForward(rev)
+
+ return
+}
+
+func TestServiceRegistration(t *testing.T) {
+ srv := serviceSetup("fancydb")
+
+ check, _, err := srv.conn.Exists(srv.Path.Dir)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if check {
+ t.Error("Service already registered")
+ return
+ }
+
+ srv2, err := srv.Register()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ check, _, err = srv2.conn.Exists(srv2.Path.Dir)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if !check {
+ t.Error("Service registration failed")
+ return
+ }
+ _, err = srv.Register()
+ if err == nil {
+ t.Error("Service allowed to be registered twice")
+ }
+ _, err = srv2.Register()
+ if err == nil {
+ t.Error("Service allowed to be registered twice")
+ }
+}
+
+func TestServiceUnregistration(t *testing.T) {
+ srv := serviceSetup("broker")
+
+ srv, err := srv.Register()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ err = srv.Unregister()
+ if err != nil {
+ t.Error(err)
+ return
+ }
+
+ check, _, err := srv.conn.Exists(srv.Path.Dir)
+ if err != nil {
+ t.Error(err)
+ }
+ if check {
+ t.Error("srv still registered")
+ }
+}
+
+func TestServiceAddandGetAddr(t *testing.T) {
+ name := "cloudstorage"
+ addrs := []string{"1.2.3.4", "4.3.2.1"}
+ srv := serviceSetup(name)
+
+ srv, err := srv.Register()
+ if err != nil {
+ t.Error(t)
+ }
+
+ for _, addr := range addrs {
+ srv, err = srv.AddAddr(addr)
+ if err != nil {
+ t.Error(t)
+ }
+ }
+
+ srv2, err := GetService(srv.Snapshot, name)
+ if err != nil {
+ t.Error(t)
+ }
+
+ for _, addr := range addrs {
+ _, ok := srv2.Addrs[addr]
+ if !ok {
+ t.Errorf("expected %s to be present", addr)
+ }
+ }
+}
+
+func TestServiceRemoveAddr(t *testing.T) {
+ name := "owlsomedb"
+ addrs := []string{"5.6.7.8", "8.7.6.5"}
+ srv := serviceSetup(name)
+
+ srv, err := srv.Register()
+ if err != nil {
+ t.Error(err)
+ }
+
+ for _, addr := range addrs {
+ srv, err = srv.AddAddr(addr)
+ if err != nil {
+ t.Error(t)
+ }
+ }
+
+ srv, err = srv.RemoveAddr("5.6.7.8")
+ if err != nil {
+ t.Error(err)
+ }
+
+ srv2, err := GetService(srv.Snapshot, name)
+ if err != nil {
+ t.Error(t)
+ }
+
+ _, ok := srv2.Addrs[addrs[0]]
+ if ok {
+ t.Errorf("expected %s to be deleted", addrs[0])
+ }
+}
+
+func TestServices(t *testing.T) {
+ var err error
+
+ srv := serviceSetup("memstore")
+ names := []string{"boombroker", "comastorage", "lulzdb"}
+
+ for _, name := range names {
+ srv = NewService(name, srv.Snapshot)
+ srv, err = srv.Register()
+ if err != nil {
+ t.Error(err)
+ }
+ }
+
+ srvs, err := Services(srv.Snapshot)
+ if err != nil {
+ t.Error(err)
+ }
+
+ if len(srvs) != len(names) {
+ t.Errorf("expected length %d returned %d", len(names), len(srvs))
+ } else {
+ for i, name := range names {
+ if srvs[i].Name != name {
+ t.Errorf("expected %s got %s", name, srvs[i].Name)
+ }
+ }
+ }
+}
Something went wrong with that request. Please try again.