Skip to content

Commit

Permalink
A simple observers mechanism for names
Browse files Browse the repository at this point in the history
WebSocket interface for observing names
  • Loading branch information
inercia committed Nov 5, 2015
1 parent 27d915d commit 6e75f20
Show file tree
Hide file tree
Showing 3 changed files with 261 additions and 1 deletion.
136 changes: 136 additions & 0 deletions nameserver/http.go
Expand Up @@ -3,15 +3,37 @@ package nameserver
import (
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/gorilla/mux"
"github.com/gorilla/websocket"
"github.com/miekg/dns"

"github.com/weaveworks/weave/common/docker"
"github.com/weaveworks/weave/net/address"
)

const (
// Maximum message size allowed from peer.
maxMessageSize = 512

// Time allowed to write a message to the peer.
writeWait = 10 * time.Second

// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second

// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
)

var websocketsUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}

func (n *Nameserver) badRequest(w http.ResponseWriter, err error) {
http.Error(w, err.Error(), http.StatusBadRequest)
n.infof("%v", err)
Expand Down Expand Up @@ -55,6 +77,120 @@ func (n *Nameserver) HandleHTTP(router *mux.Router, dockerCli *docker.Client) {
w.WriteHeader(204)
})

router.Path("/name/ws").HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// observation request
type ObserveRequest struct {
Name string
}

// updates message
type ObserveUpdate struct {
Addresses []address.Address
}

n.debugf("[websocket][%s] trying to upgrade connection", r.RemoteAddr)
ws, err := websocketsUpgrader.Upgrade(w, r, nil)
if err != nil {
n.errorf("[websocket][%s] upgrade failed", r.RemoteAddr)
return
}
defer ws.Close()

// write writes a message with the given message type and payload.
send := func(mt int, payload []byte) error {
ws.SetWriteDeadline(time.Now().Add(writeWait))
return ws.WriteMessage(mt, payload)
}

sendAddresses := func(as []address.Address) error {
update := ObserveUpdate{Addresses: as}
updateJSON, err := json.Marshal(update)
if err != nil {
n.errorf("[websocket][%s] encoding update mesage: %s", r.RemoteAddr, err)
return err
}
n.debugf("[websocket][%s] sending '%s'", r.RemoteAddr, updateJSON)
if err := send(websocket.TextMessage, updateJSON); err != nil {
n.errorf("[websocket][%s] sending update mesage: %s", r.RemoteAddr, err)
return err
}
return nil
}

// wait (for a reasonable time) for an observation request
ws.SetReadLimit(maxMessageSize)
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error {
n.debugf("[websocket][%s] PONG", r.RemoteAddr)
ws.SetReadDeadline(time.Now().Add(pongWait))
return nil
})

for {
_, message, err := ws.ReadMessage()
if err != nil {
n.errorf("[websocket][%s] reading mesage: %s", r.RemoteAddr, err)
return
}

// decode the request
var m ObserveRequest
if err := json.Unmarshal(message, &m); err == io.EOF {
n.debugf("[websocket][%s] EOF", r.RemoteAddr)
send(websocket.CloseMessage, []byte{})
return
} else if err != nil {
n.errorf("[websocket][%s] could not decode watch request: %s", r.RemoteAddr, err)
}

// we do not let observe names that do not currently exist
fullName := fqdnWithDomain(m.Name, n.domain)
addrs := n.Lookup(fullName)
if len(addrs) == 0 {
n.errorf("[websocket][%s] cannot observe '%s': it does not exist", r.RemoteAddr, fullName)
send(websocket.CloseMessage, []byte{})
return
}
if err := sendAddresses(addrs); err != nil {
return
}

// create an observer
n.debugf("[websocket][%s] installing observer for %s", r.RemoteAddr, m.Name)
updates, err := n.Observe(m.Name, r.RemoteAddr)
if err != nil {
n.errorf("[websocket][%s] could not install observer: %s", r.RemoteAddr, err)
send(websocket.CloseMessage, []byte{})
return
}
defer n.Forget(m.Name, r.RemoteAddr)

// loop waiting for updates for than name, and forwarding those updates to the client
n.debugf("[websocket][%s] waiting for %s (%d secs ping interval)", r.RemoteAddr, m.Name, pingPeriod/time.Second)
ticker := time.NewTicker(pingPeriod)
defer ticker.Stop()
for {
select {
case addresses, ok := <-updates:
if !ok {
n.debugf("[websocket][%s] closing connection: name disappeared", r.RemoteAddr)
send(websocket.CloseMessage, []byte{})
return
}
if err := sendAddresses(addresses); err != nil {
return
}
case <-ticker.C:
n.debugf("[websocket][%s] PING", r.RemoteAddr)
if err := send(websocket.PingMessage, []byte{}); err != nil {
n.errorf("[websocket][%s] when sending PING: %s", r.RemoteAddr, err)
return
}
}
}
}
})

deleteHandler := func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

Expand Down
10 changes: 9 additions & 1 deletion nameserver/nameserver.go
Expand Up @@ -34,6 +34,7 @@ const (
// - Update is O(n) for now
type Nameserver struct {
sync.RWMutex
*Observers
ourName router.PeerName
domain string
gossip router.Gossip
Expand All @@ -43,12 +44,14 @@ type Nameserver struct {
}

func New(ourName router.PeerName, domain string, isKnownPeer func(router.PeerName) bool) *Nameserver {
return &Nameserver{
ns := &Nameserver{
ourName: ourName,
domain: dns.Fqdn(domain),
isKnownPeer: isKnownPeer,
quit: make(chan struct{}),
}
ns.Observers = NewObservers(ns)
return ns
}

func (n *Nameserver) SetGossip(gossip router.Gossip) {
Expand Down Expand Up @@ -88,6 +91,7 @@ func (n *Nameserver) AddEntry(hostname, containerid string, origin router.PeerNa
n.Lock()
entry := n.entries.add(hostname, containerid, origin, addr)
n.Unlock()
n.Notify()
return n.broadcastEntries(entry)
}

Expand Down Expand Up @@ -143,6 +147,7 @@ func (n *Nameserver) ContainerDied(ident string) {
func (n *Nameserver) PeerGone(peer router.PeerName) {
n.infof("peer %s gone", peer.String())
n.Lock()
defer n.Notify()
defer n.Unlock()
n.entries.filter(func(e *Entry) bool {
return e.Origin != peer
Expand All @@ -169,11 +174,13 @@ func (n *Nameserver) Delete(hostname, containerid, ipStr string, ip address.Addr
return true
})
n.Unlock()
n.Notify()
return n.broadcastEntries(entries...)
}

func (n *Nameserver) deleteTombstones() {
n.Lock()
defer n.Notify()
defer n.Unlock()
now := time.Now().Unix()
n.entries.filter(func(e *Entry) bool {
Expand Down Expand Up @@ -206,6 +213,7 @@ func (n *Nameserver) receiveGossip(msg []byte) (router.GossipData, router.Gossip
}

n.Lock()
defer n.Notify()
defer n.Unlock()

gossip.Entries.filter(func(e *Entry) bool {
Expand Down
116 changes: 116 additions & 0 deletions nameserver/observer.go
@@ -0,0 +1,116 @@
package nameserver

import (
"errors"
"reflect"
"strings"
"sync"

"github.com/miekg/dns"

"github.com/weaveworks/weave/net/address"
)

var (
errUnknownName = errors.New("unknown name")
)

func fqdnWithDomain(hostname string, domain string) string {
fqdnName := dns.Fqdn(hostname)
if !strings.HasSuffix(fqdnName, domain) { // "x." -> "x.weave.local.", "x.y." -> "x.y.weave.local."
fqdnName = fqdnName + domain
}
return fqdnName
}

// Observer is a name observer
type Observer struct {
ID string
Last []address.Address
Addrs chan []address.Address
}

// NewObserver is an Observer constructor
func NewObserver(id string) *Observer {
return &Observer{ID: id, Addrs: make(chan []address.Address)}
}

// Notify is a observer notifier
func (obs *Observer) Notify(addrs []address.Address) {
if !reflect.DeepEqual(obs.Last, addrs) {
obs.Addrs <- addrs
obs.Last = make([]address.Address, len(addrs))
copy(obs.Last, addrs)
}
}

// Observers is the list of names that are being observed
type Observers struct {
sync.RWMutex
ns *Nameserver
obs map[string][]*Observer
}

// NewObservers is a Observers constructor
func NewObservers(ns *Nameserver) *Observers {
return &Observers{
ns: ns,
obs: make(map[string][]*Observer),
}
}

// Observe starts observing a name
func (nobs *Observers) Observe(hostname, id string) (chan []address.Address, error) {
nobs.Lock()
defer nobs.Unlock()

hostname = fqdnWithDomain(hostname, nobs.ns.domain)

nobs.ns.debugf("%s is observing %s", id, hostname)
observer := NewObserver(id)
if _, found := nobs.obs[hostname]; found {
// TODO: check the (hostname, id) does not exist
nobs.obs[hostname] = append(nobs.obs[hostname], observer)
} else {
nobs.obs[hostname] = []*Observer{observer}
}
return observer.Addrs, nil
}

// Forget disconnects an observer from a name
func (nobs *Observers) Forget(hostname, id string) {
nobs.Lock()
defer nobs.Unlock()

hostname = fqdnWithDomain(hostname, nobs.ns.domain)

observers, found := nobs.obs[hostname]
if !found {
return
}

// filter out the `id`
var newObservers []*Observer
for _, v := range observers {
if v.ID != id {
newObservers = append(newObservers, v)
}
}
nobs.obs[hostname] = newObservers
}

// Notify is a observers notifier
func (nobs *Observers) Notify() {
nobs.Lock()
defer nobs.Unlock()

// we do not expect many observers (probably just a couple), so we just
// perform a lookups for all observed names...
// TODO: implement a proper observation mechanism
for hostname, observers := range nobs.obs {
for _, observer := range observers {
nobs.ns.debugf("Notifying observers of %s", hostname)
observer.Notify(nobs.ns.Lookup(hostname))
}
}
}

0 comments on commit 6e75f20

Please sign in to comment.