From 6e75f200eb4064f2740c28287123ce3cddfe58c7 Mon Sep 17 00:00:00 2001 From: Alvaro Saurin Date: Tue, 3 Nov 2015 23:03:26 +0100 Subject: [PATCH] A simple observers mechanism for names WebSocket interface for observing names --- nameserver/http.go | 136 +++++++++++++++++++++++++++++++++++++++ nameserver/nameserver.go | 10 ++- nameserver/observer.go | 116 +++++++++++++++++++++++++++++++++ 3 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 nameserver/observer.go diff --git a/nameserver/http.go b/nameserver/http.go index 5cd70f5ad3..fc5e72512a 100644 --- a/nameserver/http.go +++ b/nameserver/http.go @@ -3,15 +3,37 @@ package nameserver import ( "encoding/json" "fmt" + "io" "net/http" + "time" "github.com/gorilla/mux" + "github.com/gorilla/websocket" "github.com/miekg/dns" "github.com/weaveworks/weave/common/docker" "github.com/weaveworks/weave/net/address" ) +const ( + // Maximum message size allowed from peer. + maxMessageSize = 512 + + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 +) + +var websocketsUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, +} + func (n *Nameserver) badRequest(w http.ResponseWriter, err error) { http.Error(w, err.Error(), http.StatusBadRequest) n.infof("%v", err) @@ -55,6 +77,120 @@ func (n *Nameserver) HandleHTTP(router *mux.Router, dockerCli *docker.Client) { w.WriteHeader(204) }) + router.Path("/name/ws").HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // observation request + type ObserveRequest struct { + Name string + } + + // updates message + type ObserveUpdate struct { + Addresses []address.Address + } + + n.debugf("[websocket][%s] trying to upgrade connection", r.RemoteAddr) + ws, err := websocketsUpgrader.Upgrade(w, r, nil) + if err != nil { + n.errorf("[websocket][%s] upgrade failed", r.RemoteAddr) + return + } + defer ws.Close() + + // write writes a message with the given message type and payload. + send := func(mt int, payload []byte) error { + ws.SetWriteDeadline(time.Now().Add(writeWait)) + return ws.WriteMessage(mt, payload) + } + + sendAddresses := func(as []address.Address) error { + update := ObserveUpdate{Addresses: as} + updateJSON, err := json.Marshal(update) + if err != nil { + n.errorf("[websocket][%s] encoding update mesage: %s", r.RemoteAddr, err) + return err + } + n.debugf("[websocket][%s] sending '%s'", r.RemoteAddr, updateJSON) + if err := send(websocket.TextMessage, updateJSON); err != nil { + n.errorf("[websocket][%s] sending update mesage: %s", r.RemoteAddr, err) + return err + } + return nil + } + + // wait (for a reasonable time) for an observation request + ws.SetReadLimit(maxMessageSize) + ws.SetReadDeadline(time.Now().Add(pongWait)) + ws.SetPongHandler(func(string) error { + n.debugf("[websocket][%s] PONG", r.RemoteAddr) + ws.SetReadDeadline(time.Now().Add(pongWait)) + return nil + }) + + for { + _, message, err := ws.ReadMessage() + if err != nil { + n.errorf("[websocket][%s] reading mesage: %s", r.RemoteAddr, err) + return + } + + // decode the request + var m ObserveRequest + if err := json.Unmarshal(message, &m); err == io.EOF { + n.debugf("[websocket][%s] EOF", r.RemoteAddr) + send(websocket.CloseMessage, []byte{}) + return + } else if err != nil { + n.errorf("[websocket][%s] could not decode watch request: %s", r.RemoteAddr, err) + } + + // we do not let observe names that do not currently exist + fullName := fqdnWithDomain(m.Name, n.domain) + addrs := n.Lookup(fullName) + if len(addrs) == 0 { + n.errorf("[websocket][%s] cannot observe '%s': it does not exist", r.RemoteAddr, fullName) + send(websocket.CloseMessage, []byte{}) + return + } + if err := sendAddresses(addrs); err != nil { + return + } + + // create an observer + n.debugf("[websocket][%s] installing observer for %s", r.RemoteAddr, m.Name) + updates, err := n.Observe(m.Name, r.RemoteAddr) + if err != nil { + n.errorf("[websocket][%s] could not install observer: %s", r.RemoteAddr, err) + send(websocket.CloseMessage, []byte{}) + return + } + defer n.Forget(m.Name, r.RemoteAddr) + + // loop waiting for updates for than name, and forwarding those updates to the client + n.debugf("[websocket][%s] waiting for %s (%d secs ping interval)", r.RemoteAddr, m.Name, pingPeriod/time.Second) + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + for { + select { + case addresses, ok := <-updates: + if !ok { + n.debugf("[websocket][%s] closing connection: name disappeared", r.RemoteAddr) + send(websocket.CloseMessage, []byte{}) + return + } + if err := sendAddresses(addresses); err != nil { + return + } + case <-ticker.C: + n.debugf("[websocket][%s] PING", r.RemoteAddr) + if err := send(websocket.PingMessage, []byte{}); err != nil { + n.errorf("[websocket][%s] when sending PING: %s", r.RemoteAddr, err) + return + } + } + } + } + }) + deleteHandler := func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) diff --git a/nameserver/nameserver.go b/nameserver/nameserver.go index 5a87fb0205..05917785fd 100644 --- a/nameserver/nameserver.go +++ b/nameserver/nameserver.go @@ -34,6 +34,7 @@ const ( // - Update is O(n) for now type Nameserver struct { sync.RWMutex + *Observers ourName router.PeerName domain string gossip router.Gossip @@ -43,12 +44,14 @@ type Nameserver struct { } func New(ourName router.PeerName, domain string, isKnownPeer func(router.PeerName) bool) *Nameserver { - return &Nameserver{ + ns := &Nameserver{ ourName: ourName, domain: dns.Fqdn(domain), isKnownPeer: isKnownPeer, quit: make(chan struct{}), } + ns.Observers = NewObservers(ns) + return ns } func (n *Nameserver) SetGossip(gossip router.Gossip) { @@ -88,6 +91,7 @@ func (n *Nameserver) AddEntry(hostname, containerid string, origin router.PeerNa n.Lock() entry := n.entries.add(hostname, containerid, origin, addr) n.Unlock() + n.Notify() return n.broadcastEntries(entry) } @@ -143,6 +147,7 @@ func (n *Nameserver) ContainerDied(ident string) { func (n *Nameserver) PeerGone(peer router.PeerName) { n.infof("peer %s gone", peer.String()) n.Lock() + defer n.Notify() defer n.Unlock() n.entries.filter(func(e *Entry) bool { return e.Origin != peer @@ -169,11 +174,13 @@ func (n *Nameserver) Delete(hostname, containerid, ipStr string, ip address.Addr return true }) n.Unlock() + n.Notify() return n.broadcastEntries(entries...) } func (n *Nameserver) deleteTombstones() { n.Lock() + defer n.Notify() defer n.Unlock() now := time.Now().Unix() n.entries.filter(func(e *Entry) bool { @@ -206,6 +213,7 @@ func (n *Nameserver) receiveGossip(msg []byte) (router.GossipData, router.Gossip } n.Lock() + defer n.Notify() defer n.Unlock() gossip.Entries.filter(func(e *Entry) bool { diff --git a/nameserver/observer.go b/nameserver/observer.go new file mode 100644 index 0000000000..827f2b0b05 --- /dev/null +++ b/nameserver/observer.go @@ -0,0 +1,116 @@ +package nameserver + +import ( + "errors" + "reflect" + "strings" + "sync" + + "github.com/miekg/dns" + + "github.com/weaveworks/weave/net/address" +) + +var ( + errUnknownName = errors.New("unknown name") +) + +func fqdnWithDomain(hostname string, domain string) string { + fqdnName := dns.Fqdn(hostname) + if !strings.HasSuffix(fqdnName, domain) { // "x." -> "x.weave.local.", "x.y." -> "x.y.weave.local." + fqdnName = fqdnName + domain + } + return fqdnName +} + +// Observer is a name observer +type Observer struct { + ID string + Last []address.Address + Addrs chan []address.Address +} + +// NewObserver is an Observer constructor +func NewObserver(id string) *Observer { + return &Observer{ID: id, Addrs: make(chan []address.Address)} +} + +// Notify is a observer notifier +func (obs *Observer) Notify(addrs []address.Address) { + if !reflect.DeepEqual(obs.Last, addrs) { + obs.Addrs <- addrs + obs.Last = make([]address.Address, len(addrs)) + copy(obs.Last, addrs) + } +} + +// Observers is the list of names that are being observed +type Observers struct { + sync.RWMutex + ns *Nameserver + obs map[string][]*Observer +} + +// NewObservers is a Observers constructor +func NewObservers(ns *Nameserver) *Observers { + return &Observers{ + ns: ns, + obs: make(map[string][]*Observer), + } +} + +// Observe starts observing a name +func (nobs *Observers) Observe(hostname, id string) (chan []address.Address, error) { + nobs.Lock() + defer nobs.Unlock() + + hostname = fqdnWithDomain(hostname, nobs.ns.domain) + + nobs.ns.debugf("%s is observing %s", id, hostname) + observer := NewObserver(id) + if _, found := nobs.obs[hostname]; found { + // TODO: check the (hostname, id) does not exist + nobs.obs[hostname] = append(nobs.obs[hostname], observer) + } else { + nobs.obs[hostname] = []*Observer{observer} + } + return observer.Addrs, nil +} + +// Forget disconnects an observer from a name +func (nobs *Observers) Forget(hostname, id string) { + nobs.Lock() + defer nobs.Unlock() + + hostname = fqdnWithDomain(hostname, nobs.ns.domain) + + observers, found := nobs.obs[hostname] + if !found { + return + } + + // filter out the `id` + var newObservers []*Observer + for _, v := range observers { + if v.ID != id { + newObservers = append(newObservers, v) + } + } + nobs.obs[hostname] = newObservers +} + +// Notify is a observers notifier +func (nobs *Observers) Notify() { + nobs.Lock() + defer nobs.Unlock() + + // we do not expect many observers (probably just a couple), so we just + // perform a lookups for all observed names... + // TODO: implement a proper observation mechanism + for hostname, observers := range nobs.obs { + for _, observer := range observers { + nobs.ns.debugf("Notifying observers of %s", hostname) + observer.Notify(nobs.ns.Lookup(hostname)) + } + } +}