Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TTL for servers in Rendezvous #1773

Merged
merged 1 commit into from Dec 4, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
85 changes: 55 additions & 30 deletions insonmnia/npp/rendezvous/server.go
Expand Up @@ -63,41 +63,53 @@ type peerCandidate struct {
C chan<- Peer
}

// Meeting describes the meeting room for servers and clients for a specific
// NPP identifier.
type meeting struct {
mu sync.Mutex
// Here we have a map where all clients are placed when there are no
// servers available.
// We allow multiple clients to be waited for servers.
clients map[PeerID]peerCandidate
// Also we allow the opposite: multiple servers can be registered for
// fault tolerance, but it is unlikely.
servers map[PeerID]peerCandidate
clients map[PeerID]*peerCandidate
// We allow multiple servers can be registered for fault tolerance, but
// this is unlikely.
servers map[PeerID]*peerCandidate
// The timestamp shows when the last server for this ID has been seen.
// It is updated each time the server de-announces itself and can be
// safely ignored when the "servers" map has at least one element in it.
serversLastSeenTime time.Time
}

func newMeeting() *meeting {
return &meeting{
servers: map[PeerID]peerCandidate{},
clients: map[PeerID]peerCandidate{},
servers: map[PeerID]*peerCandidate{},
clients: map[PeerID]*peerCandidate{},
}
}

func (m *meeting) AddServer(peer Peer, c chan<- Peer) {
m.mu.Lock()
defer m.mu.Unlock()

m.servers[peer.ID] = peerCandidate{Peer: peer, C: c}
m.servers[peer.ID] = &peerCandidate{Peer: peer, C: c}
}

func (m *meeting) RemoveServer(id PeerID) {
m.mu.Lock()
defer m.mu.Unlock()

delete(m.servers, id)

if len(m.servers) == 0 {
m.serversLastSeenTime = time.Now()
}
}

func (m *meeting) AddClient(peer Peer, c chan<- Peer) {
m.mu.Lock()
defer m.mu.Unlock()

m.clients[peer.ID] = peerCandidate{Peer: peer, C: c}
m.clients[peer.ID] = &peerCandidate{Peer: peer, C: c}
}

func (m *meeting) RemoveClient(id PeerID) {
Expand All @@ -108,29 +120,43 @@ func (m *meeting) RemoveClient(id PeerID) {
}

func (m *meeting) PopRandomServer() *peerCandidate {
return m.popRandomPeerCandidate(m.servers)
}
m.mu.Lock()
defer m.mu.Unlock()

func (m *meeting) PopRandomClient() *peerCandidate {
return m.popRandomPeerCandidate(m.clients)
candidate := m.popRandomPeerCandidate(m.servers)
if len(m.servers) == 0 {
m.serversLastSeenTime = time.Now()
}

return candidate
}

func (m *meeting) popRandomPeerCandidate(candidates map[PeerID]peerCandidate) *peerCandidate {
func (m *meeting) PopRandomClient() *peerCandidate {
m.mu.Lock()
defer m.mu.Unlock()

return m.popRandomPeerCandidate(m.clients)
}

func (m *meeting) popRandomPeerCandidate(candidates map[PeerID]*peerCandidate) *peerCandidate {
if len(candidates) == 0 {
return nil
}
var keys []PeerID
keys := make([]PeerID, 0, len(candidates))
for key := range candidates {
keys = append(keys, key)
}

k := keys[rand.Intn(len(keys))]
v := candidates[k]
delete(candidates, k)
return &v
return v
}

func (m *meeting) IsServerInactive() bool {
const expireDuration = 30 * time.Second

return time.Now().After(m.serversLastSeenTime.Add(expireDuration))
}

// Server represents a rendezvous server.
Expand Down Expand Up @@ -214,7 +240,10 @@ func (m *Server) Resolve(ctx context.Context, request *sonm.ConnectRequest) (*so

peerHandle := NewPeer(*peerInfo, request.PrivateAddrs)

c := m.addServerWatch(id, peerHandle)
c, err := m.addServerWatch(id, peerHandle)
if err != nil {
return nil, err
}
defer m.removeServerWatch(id, peerHandle)

select {
Expand Down Expand Up @@ -284,7 +313,7 @@ func (m *Server) Publish(ctx context.Context, request *sonm.PublishRequest) (*so
}
}

func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) <-chan Peer {
func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) (<-chan Peer, error) {
c := make(chan Peer, 2)

m.mu.Lock()
Expand All @@ -297,15 +326,17 @@ func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) <-chan Peer {
c <- server.Peer
server.C <- peer
} else {
if meeting.IsServerInactive() {
return nil, errPeerNotFound()
}

meeting.AddClient(peer, c)
}
} else {
meeting := newMeeting()
meeting.AddClient(peer, c)
m.rv[id] = meeting
return nil, errPeerNotFound()
}

return c
return c, nil
}

func (m *Server) addClientWatch(id nppc.ResourceID, peer Peer) <-chan Peer {
Expand Down Expand Up @@ -338,9 +369,8 @@ func (m *Server) removeClientWatch(id nppc.ResourceID, peer Peer) {
candidates, ok := m.rv[id]
if ok {
candidates.RemoveServer(peer.ID)
m.maybeCleanMeeting(id, candidates)
}

m.maybeCleanMeeting(id, candidates)
}

func (m *Server) removeServerWatch(id nppc.ResourceID, peer Peer) {
Expand All @@ -350,17 +380,12 @@ func (m *Server) removeServerWatch(id nppc.ResourceID, peer Peer) {
candidates, ok := m.rv[id]
if ok {
candidates.RemoveClient(peer.ID)
m.maybeCleanMeeting(id, candidates)
}

m.maybeCleanMeeting(id, candidates)
}

func (m *Server) maybeCleanMeeting(id nppc.ResourceID, candidates *meeting) {
if candidates == nil {
return
}

if len(candidates.clients) == 0 && len(candidates.servers) == 0 {
if len(candidates.clients) == 0 && len(candidates.servers) == 0 && candidates.IsServerInactive() {
delete(m.rv, id)
}
}
Expand Down