Skip to content

Commit

Permalink
Cleanup Watches and reattach existing listeners after leave and rejoin
Browse files Browse the repository at this point in the history
Signed-off-by: Madhu Venugopal <madhu@socketplane.io>
  • Loading branch information
mavenugo committed Feb 14, 2015
1 parent 005eeec commit c703ba4
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 39 deletions.
123 changes: 84 additions & 39 deletions ecc.go
Expand Up @@ -30,10 +30,8 @@ import (
var started bool
var OfflineSupport bool = true
var listener eccListener
var errCh chan int

func Start(serverMode bool, bootstrap bool, bindInterface string, dataDir string) error {
watches = make(map[WatchType]watchData)
bindAddress := ""
if bindInterface != "" {
intf, err := net.InterfaceByName(bindInterface)
Expand All @@ -52,7 +50,8 @@ func Start(serverMode bool, bootstrap bool, bindInterface string, dataDir string
}
}
}
errCh = make(chan int)
errCh := make(chan int)
watchForExistingRegisteredUpdates()
go RegisterForNodeUpdates(listener)
go startConsul(serverMode, bootstrap, bindAddress, dataDir, errCh)

Expand Down Expand Up @@ -102,15 +101,12 @@ func Join(address string) error {
}

func Leave() error {
stopWatches()
ret := Execute("leave")
if ret != 0 {
log.Println("Error Leaving Consul membership")
return errors.New("Error leaving Consul cluster")
}
code := <-errCh
close(errCh)
log.Println("Consul Agent Exited with Status ", code)
time.Sleep(3 * time.Second)
return nil
}

Expand Down Expand Up @@ -374,10 +370,11 @@ const (
type WatchType int

type watchData struct {
listeners map[string][]Listener
listeners map[string][]Listener
watchPlans []*watch.WatchPlan
}

var watches map[WatchType]watchData
var watches map[WatchType]watchData = make(map[WatchType]watchData)

type Listener interface {
NotifyNodeUpdate(NotifyUpdateType, string)
Expand Down Expand Up @@ -426,7 +423,7 @@ func addListener(wtype WatchType, key string, listener Listener) watchconsul {
if !contains(WATCH_TYPE_NODE, key, listener) {
ws, ok := watches[wtype]
if !ok {
watches[wtype] = watchData{make(map[string][]Listener)}
watches[wtype] = watchData{make(map[string][]Listener), make([]*watch.WatchPlan, 0)}
ws = watches[wtype]
}

Expand All @@ -453,13 +450,33 @@ func getListeners(wtype WatchType, key string) []Listener {
return nil
}

func register(params map[string]interface{}, handler watch.HandlerFunc) {
func addWatchPlan(wtype WatchType, wp *watch.WatchPlan) {
ws, ok := watches[wtype]
if !ok {
return
}

ws.watchPlans = append(ws.watchPlans, wp)
watches[wtype] = ws
}

func stopWatches() {
for _, ws := range watches {
for _, wp := range ws.watchPlans {
wp.Stop()
}
ws.watchPlans = ws.watchPlans[:0]
}
}

func register(wtype WatchType, params map[string]interface{}, handler watch.HandlerFunc) {
// Create the watch
wp, err := watch.Parse(params)
if err != nil {
fmt.Printf("Register error : %s", err)
return
}
addWatchPlan(wtype, wp)
wp.Handler = handler
cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError)
httpAddr := command.HTTPAddrFlag(cmdFlags)
Expand Down Expand Up @@ -510,19 +527,6 @@ func updateNodeListeners(clusterNodes []*api.Node) {
}
}

func RegisterForNodeUpdates(listener Listener) {
wc := addListener(WATCH_TYPE_NODE, "", listener)
if wc {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "nodes"
handler := func(idx uint64, data interface{}) {
updateNodeListeners(data.([]*api.Node))
}
register(params, handler)
}
}

func updateKeyListeners(idx uint64, key string, data interface{}) {
listeners := getListeners(WATCH_TYPE_KEY, key)
if listeners == nil {
Expand Down Expand Up @@ -552,31 +556,72 @@ func updateKeyListeners(idx uint64, key string, data interface{}) {
}
}

func registerForNodeUpdates() {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "nodes"
handler := func(idx uint64, data interface{}) {
updateNodeListeners(data.([]*api.Node))
}
register(WATCH_TYPE_NODE, params, handler)
}

func RegisterForNodeUpdates(listener Listener) {
wc := addListener(WATCH_TYPE_NODE, "", listener)
if wc {
registerForNodeUpdates()
}
}

func registerForKeyUpdates(absKey string) {
params := make(map[string]interface{})
params["type"] = "key"
params["key"] = absKey
handler := func(idx uint64, data interface{}) {
updateKeyListeners(idx, absKey, data)
}
register(WATCH_TYPE_KEY, params, handler)
}

func RegisterForKeyUpdates(store string, key string, listener Listener) {
absKey := store + "/" + key
wc := addListener(WATCH_TYPE_KEY, absKey, listener)
if wc {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "key"
params["key"] = absKey
handler := func(idx uint64, data interface{}) {
updateKeyListeners(idx, absKey, data)
}
register(params, handler)
registerForKeyUpdates(absKey)
}
}

func registerForStoreUpdates(store string) {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "keyprefix"
params["prefix"] = store + "/"
handler := func(idx uint64, data interface{}) {
fmt.Println("NOT IMPLEMENTED Store Update :", idx, data)
}
register(WATCH_TYPE_STORE, params, handler)
}

func RegisterForStoreUpdates(store string, listener Listener) {
wc := addListener(WATCH_TYPE_STORE, store, listener)
if wc {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "keyprefix"
params["prefix"] = store + "/"
handler := func(idx uint64, data interface{}) {
fmt.Println("NOT IMPLEMENTED Store Update :", idx, data)
registerForStoreUpdates(store)
}
}

func watchForExistingRegisteredUpdates() {
for wType, ws := range watches {
log.Println("watchForExistingRegisteredUpdates : ", wType)
for key, _ := range ws.listeners {
log.Println("key : ", key)
switch wType {
case WATCH_TYPE_NODE:
go registerForNodeUpdates()
case WATCH_TYPE_KEY:
go registerForKeyUpdates(key)
case WATCH_TYPE_STORE:
go registerForStoreUpdates(key)
}
}
register(params, handler)
}
}
4 changes: 4 additions & 0 deletions example/ecc_demo.go
Expand Up @@ -33,6 +33,10 @@ func main() {
keyUpdates("db")
keyUpdates("web")
// Ctrl+C handling
ecc.Leave()
time.Sleep(time.Second * 15)
ecc.Start(true, true, "eth1", dataDir)

handler := make(chan os.Signal, 1)
signal.Notify(handler, os.Interrupt)
for sig := range handler {
Expand Down

0 comments on commit c703ba4

Please sign in to comment.