Skip to content

Commit

Permalink
Merge pull request #2 from mavenugo/master
Browse files Browse the repository at this point in the history
Key watch support
  • Loading branch information
mavenugo committed Jan 6, 2015
2 parents d35cbbe + 8d2aa36 commit 5990c02
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 5 deletions.
38 changes: 34 additions & 4 deletions ecc.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,45 @@ func RegisterForNodeUpdates(listener Listener) {
}
}

func RegisterForKeyUpdates(key string, listener Listener) {
wc := addListener(WATCH_TYPE_KEY, key, listener)
func updateKeyListeners(idx uint64, key string, data interface{}) {
listeners := getListeners(WATCH_TYPE_KEY, key)
if listeners == nil {
return
}

var kv *consulapi.KVPair = nil
var val []byte = nil
updateType := NOTIFY_UPDATE_MODIFY

if data != nil {
kv = data.(*consulapi.KVPair)
}

if kv == nil {
updateType = NOTIFY_UPDATE_DELETE
} else {
updateType = NOTIFY_UPDATE_MODIFY
if idx == kv.CreateIndex {
updateType = NOTIFY_UPDATE_ADD
}
val = kv.Value
}

for _, listener := range listeners {
listener.NotifyKeyUpdate(NotifyUpdateType(updateType), key, val)
}
}

func RegisterForKeyUpdates(store string, key string, listener Listener) {
absKey := store + "/" + key
wc := addListener(WATCH_TYPE_KEY, absKey, listener)
if wc {
// Compile the watch parameters
params := make(map[string]interface{})
params["type"] = "key"
params["key"] = key
params["key"] = absKey
handler := func(idx uint64, data interface{}) {
fmt.Println("NOT IMPLEMENTED Key Update :", idx, data)
updateKeyListeners(idx, absKey, data)
}
register(params, handler)
}
Expand Down
23 changes: 22 additions & 1 deletion example/ecc_demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ func (e eccListener) NotifyNodeUpdate(nType ecc.NotifyUpdateType, nodeName strin
fmt.Println("CLIENT UPDATE :", nType, nodeName)
}
func (e eccListener) NotifyKeyUpdate(nType ecc.NotifyUpdateType, key string, data []byte) {
fmt.Println("KEY UPDATE :", nType, key, data)
}
func (e eccListener) NotifyStoreUpdate(nType ecc.NotifyUpdateType, store string, data map[string][]byte) {
}
func main() {
ecc.Start(true, true, "eth1", dataDir)
go ecc.RegisterForNodeUpdates(eccListener{})
listener := eccListener{}
go ecc.RegisterForNodeUpdates(listener)
go ecc.RegisterForKeyUpdates("network", "web", listener)
keyUpdates("web")
ecc.Delete("network", "web")
go ecc.RegisterForKeyUpdates("network", "db", listener)
keyUpdates("db")
keyUpdates("web")
// Ctrl+C handling
handler := make(chan os.Signal, 1)
signal.Notify(handler, os.Interrupt)
Expand All @@ -34,3 +42,16 @@ func main() {
}
}
}

//Random Key updates
func keyUpdates(key string) {
valArray, _, _ := ecc.Get("network", key)
currVal := make([]byte, len(valArray))
copy(currVal, valArray)
valArray = []byte("value1")
ecc.Put("network", key, valArray, currVal)
time.Sleep(time.Second * 2)
updArray := []byte("value2")
ecc.Put("network", key, updArray, valArray)
time.Sleep(time.Second * 2)
}

0 comments on commit 5990c02

Please sign in to comment.