diff --git a/insonmnia/npp/rendezvous/server.go b/insonmnia/npp/rendezvous/server.go index 3850a2be2..d42b14503 100644 --- a/insonmnia/npp/rendezvous/server.go +++ b/insonmnia/npp/rendezvous/server.go @@ -63,19 +63,27 @@ type peerCandidate struct { C chan<- Peer } +// Meeting describes the meeting room for servers and clients for a specific +// NPP identifier. type meeting struct { mu sync.Mutex + // Here we have a map where all clients are placed when there are no + // servers available. // We allow multiple clients to be waited for servers. - clients map[PeerID]peerCandidate - // Also we allow the opposite: multiple servers can be registered for - // fault tolerance, but it is unlikely. - servers map[PeerID]peerCandidate + clients map[PeerID]*peerCandidate + // We allow multiple servers can be registered for fault tolerance, but + // this is unlikely. + servers map[PeerID]*peerCandidate + // The timestamp shows when the last server for this ID has been seen. + // It is updated each time the server de-announces itself and can be + // safely ignored when the "servers" map has at least one element in it. + serversLastSeenTime time.Time } func newMeeting() *meeting { return &meeting{ - servers: map[PeerID]peerCandidate{}, - clients: map[PeerID]peerCandidate{}, + servers: map[PeerID]*peerCandidate{}, + clients: map[PeerID]*peerCandidate{}, } } @@ -83,7 +91,7 @@ func (m *meeting) AddServer(peer Peer, c chan<- Peer) { m.mu.Lock() defer m.mu.Unlock() - m.servers[peer.ID] = peerCandidate{Peer: peer, C: c} + m.servers[peer.ID] = &peerCandidate{Peer: peer, C: c} } func (m *meeting) RemoveServer(id PeerID) { @@ -91,13 +99,17 @@ func (m *meeting) RemoveServer(id PeerID) { defer m.mu.Unlock() delete(m.servers, id) + + if len(m.servers) == 0 { + m.serversLastSeenTime = time.Now() + } } func (m *meeting) AddClient(peer Peer, c chan<- Peer) { m.mu.Lock() defer m.mu.Unlock() - m.clients[peer.ID] = peerCandidate{Peer: peer, C: c} + m.clients[peer.ID] = &peerCandidate{Peer: peer, C: c} } func (m *meeting) RemoveClient(id PeerID) { @@ -108,21 +120,29 @@ func (m *meeting) RemoveClient(id PeerID) { } func (m *meeting) PopRandomServer() *peerCandidate { - return m.popRandomPeerCandidate(m.servers) -} + m.mu.Lock() + defer m.mu.Unlock() -func (m *meeting) PopRandomClient() *peerCandidate { - return m.popRandomPeerCandidate(m.clients) + candidate := m.popRandomPeerCandidate(m.servers) + if len(m.servers) == 0 { + m.serversLastSeenTime = time.Now() + } + + return candidate } -func (m *meeting) popRandomPeerCandidate(candidates map[PeerID]peerCandidate) *peerCandidate { +func (m *meeting) PopRandomClient() *peerCandidate { m.mu.Lock() defer m.mu.Unlock() + return m.popRandomPeerCandidate(m.clients) +} + +func (m *meeting) popRandomPeerCandidate(candidates map[PeerID]*peerCandidate) *peerCandidate { if len(candidates) == 0 { return nil } - var keys []PeerID + keys := make([]PeerID, 0, len(candidates)) for key := range candidates { keys = append(keys, key) } @@ -130,7 +150,13 @@ func (m *meeting) popRandomPeerCandidate(candidates map[PeerID]peerCandidate) *p k := keys[rand.Intn(len(keys))] v := candidates[k] delete(candidates, k) - return &v + return v +} + +func (m *meeting) IsServerInactive() bool { + const expireDuration = 30 * time.Second + + return time.Now().After(m.serversLastSeenTime.Add(expireDuration)) } // Server represents a rendezvous server. @@ -214,7 +240,10 @@ func (m *Server) Resolve(ctx context.Context, request *sonm.ConnectRequest) (*so peerHandle := NewPeer(*peerInfo, request.PrivateAddrs) - c := m.addServerWatch(id, peerHandle) + c, err := m.addServerWatch(id, peerHandle) + if err != nil { + return nil, err + } defer m.removeServerWatch(id, peerHandle) select { @@ -284,7 +313,7 @@ func (m *Server) Publish(ctx context.Context, request *sonm.PublishRequest) (*so } } -func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) <-chan Peer { +func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) (<-chan Peer, error) { c := make(chan Peer, 2) m.mu.Lock() @@ -297,15 +326,17 @@ func (m *Server) addServerWatch(id nppc.ResourceID, peer Peer) <-chan Peer { c <- server.Peer server.C <- peer } else { + if meeting.IsServerInactive() { + return nil, errPeerNotFound() + } + meeting.AddClient(peer, c) } } else { - meeting := newMeeting() - meeting.AddClient(peer, c) - m.rv[id] = meeting + return nil, errPeerNotFound() } - return c + return c, nil } func (m *Server) addClientWatch(id nppc.ResourceID, peer Peer) <-chan Peer { @@ -338,9 +369,8 @@ func (m *Server) removeClientWatch(id nppc.ResourceID, peer Peer) { candidates, ok := m.rv[id] if ok { candidates.RemoveServer(peer.ID) + m.maybeCleanMeeting(id, candidates) } - - m.maybeCleanMeeting(id, candidates) } func (m *Server) removeServerWatch(id nppc.ResourceID, peer Peer) { @@ -350,17 +380,12 @@ func (m *Server) removeServerWatch(id nppc.ResourceID, peer Peer) { candidates, ok := m.rv[id] if ok { candidates.RemoveClient(peer.ID) + m.maybeCleanMeeting(id, candidates) } - - m.maybeCleanMeeting(id, candidates) } func (m *Server) maybeCleanMeeting(id nppc.ResourceID, candidates *meeting) { - if candidates == nil { - return - } - - if len(candidates.clients) == 0 && len(candidates.servers) == 0 { + if len(candidates.clients) == 0 && len(candidates.servers) == 0 && candidates.IsServerInactive() { delete(m.rv, id) } }