This repository has been archived by the owner. It is now read-only.
Permalink
Cannot retrieve contributors at this time
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
591 lines (493 sloc)
14.5 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package connmgr | |
| import ( | |
| "context" | |
| "sort" | |
| "sync" | |
| "sync/atomic" | |
| "time" | |
| "github.com/libp2p/go-libp2p-core/connmgr" | |
| "github.com/libp2p/go-libp2p-core/network" | |
| "github.com/libp2p/go-libp2p-core/peer" | |
| logging "github.com/ipfs/go-log" | |
| ma "github.com/multiformats/go-multiaddr" | |
| ) | |
| var SilencePeriod = 10 * time.Second | |
| var log = logging.Logger("connmgr") | |
| // BasicConnMgr is a ConnManager that trims connections whenever the count exceeds the | |
| // high watermark. New connections are given a grace period before they're subject | |
| // to trimming. Trims are automatically run on demand, only if the time from the | |
| // previous trim is higher than 10 seconds. Furthermore, trims can be explicitly | |
| // requested through the public interface of this struct (see TrimOpenConns). | |
| // | |
| // See configuration parameters in NewConnManager. | |
| type BasicConnMgr struct { | |
| *decayer | |
| cfg *BasicConnManagerConfig | |
| segments segments | |
| plk sync.RWMutex | |
| protected map[peer.ID]map[string]struct{} | |
| // channel-based semaphore that enforces only a single trim is in progress | |
| trimRunningCh chan struct{} | |
| trimTrigger chan chan<- struct{} | |
| connCount int32 | |
| lastTrimMu sync.RWMutex | |
| lastTrim time.Time | |
| ctx context.Context | |
| cancel func() | |
| } | |
| var ( | |
| _ connmgr.ConnManager = (*BasicConnMgr)(nil) | |
| _ connmgr.Decayer = (*BasicConnMgr)(nil) | |
| ) | |
| type segment struct { | |
| sync.Mutex | |
| peers map[peer.ID]*peerInfo | |
| } | |
| type segments [256]*segment | |
| func (ss *segments) get(p peer.ID) *segment { | |
| return ss[byte(p[len(p)-1])] | |
| } | |
| func (ss *segments) countPeers() (count int) { | |
| for _, seg := range ss { | |
| seg.Lock() | |
| count += len(seg.peers) | |
| seg.Unlock() | |
| } | |
| return count | |
| } | |
| func (s *segment) tagInfoFor(p peer.ID) *peerInfo { | |
| pi, ok := s.peers[p] | |
| if ok { | |
| return pi | |
| } | |
| // create a temporary peer to buffer early tags before the Connected notification arrives. | |
| pi = &peerInfo{ | |
| id: p, | |
| firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives. | |
| temp: true, | |
| tags: make(map[string]int), | |
| decaying: make(map[*decayingTag]*connmgr.DecayingValue), | |
| conns: make(map[network.Conn]time.Time), | |
| } | |
| s.peers[p] = pi | |
| return pi | |
| } | |
| // NewConnManager creates a new BasicConnMgr with the provided params: | |
| // * lo and hi are watermarks governing the number of connections that'll be maintained. | |
| // When the peer count exceeds the 'high watermark', as many peers will be pruned (and | |
| // their connections terminated) until 'low watermark' peers remain. | |
| // * grace is the amount of time a newly opened connection is given before it becomes | |
| // subject to pruning. | |
| func NewConnManager(low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr { | |
| ctx, cancel := context.WithCancel(context.Background()) | |
| cfg := &BasicConnManagerConfig{ | |
| highWater: hi, | |
| lowWater: low, | |
| gracePeriod: grace, | |
| silencePeriod: SilencePeriod, | |
| } | |
| for _, o := range opts { | |
| // TODO we're ignoring errors from options because we have no way to | |
| // return them, or otherwise act on them. | |
| _ = o(cfg) | |
| } | |
| if cfg.decayer == nil { | |
| // Set the default decayer config. | |
| cfg.decayer = (&DecayerCfg{}).WithDefaults() | |
| } | |
| cm := &BasicConnMgr{ | |
| cfg: cfg, | |
| trimRunningCh: make(chan struct{}, 1), | |
| trimTrigger: make(chan chan<- struct{}), | |
| protected: make(map[peer.ID]map[string]struct{}, 16), | |
| ctx: ctx, | |
| cancel: cancel, | |
| segments: func() (ret segments) { | |
| for i := range ret { | |
| ret[i] = &segment{ | |
| peers: make(map[peer.ID]*peerInfo), | |
| } | |
| } | |
| return ret | |
| }(), | |
| } | |
| decay, _ := NewDecayer(cfg.decayer, cm) | |
| cm.decayer = decay | |
| go cm.background() | |
| return cm | |
| } | |
| func (cm *BasicConnMgr) Close() error { | |
| if err := cm.decayer.Close(); err != nil { | |
| return err | |
| } | |
| cm.cancel() | |
| return nil | |
| } | |
| func (cm *BasicConnMgr) Protect(id peer.ID, tag string) { | |
| cm.plk.Lock() | |
| defer cm.plk.Unlock() | |
| tags, ok := cm.protected[id] | |
| if !ok { | |
| tags = make(map[string]struct{}, 2) | |
| cm.protected[id] = tags | |
| } | |
| tags[tag] = struct{}{} | |
| } | |
| func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) { | |
| cm.plk.Lock() | |
| defer cm.plk.Unlock() | |
| tags, ok := cm.protected[id] | |
| if !ok { | |
| return false | |
| } | |
| if delete(tags, tag); len(tags) == 0 { | |
| delete(cm.protected, id) | |
| return false | |
| } | |
| return true | |
| } | |
| func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) { | |
| cm.plk.Lock() | |
| defer cm.plk.Unlock() | |
| tags, ok := cm.protected[id] | |
| if !ok { | |
| return false | |
| } | |
| if tag == "" { | |
| return true | |
| } | |
| _, protected = tags[tag] | |
| return protected | |
| } | |
| // peerInfo stores metadata for a given peer. | |
| type peerInfo struct { | |
| id peer.ID | |
| tags map[string]int // value for each tag | |
| decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags | |
| value int // cached sum of all tag values | |
| temp bool // this is a temporary entry holding early tags, and awaiting connections | |
| conns map[network.Conn]time.Time // start time of each connection | |
| firstSeen time.Time // timestamp when we began tracking this peer. | |
| } | |
| // TrimOpenConns closes the connections of as many peers as needed to make the peer count | |
| // equal the low watermark. Peers are sorted in ascending order based on their total value, | |
| // pruning those peers with the lowest scores first, as long as they are not within their | |
| // grace period. | |
| // | |
| // This function blocks until a trim is completed. If a trim is underway, a new | |
| // one won't be started, and instead it'll wait until that one is completed before | |
| // returning. | |
| func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { | |
| // TODO: error return value so we can cleanly signal we are aborting because: | |
| // (a) there's another trim in progress, or (b) the silence period is in effect. | |
| // Trigger a trim. | |
| ch := make(chan struct{}) | |
| select { | |
| case cm.trimTrigger <- ch: | |
| case <-cm.ctx.Done(): | |
| case <-ctx.Done(): | |
| // TODO: return an error? | |
| } | |
| // Wait for the trim. | |
| select { | |
| case <-ch: | |
| case <-cm.ctx.Done(): | |
| case <-ctx.Done(): | |
| // TODO: return an error? | |
| } | |
| } | |
| func (cm *BasicConnMgr) background() { | |
| ticker := time.NewTicker(time.Minute) | |
| defer ticker.Stop() | |
| for { | |
| var waiting chan<- struct{} | |
| select { | |
| case <-ticker.C: | |
| if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) { | |
| // Below high water, skip. | |
| continue | |
| } | |
| case waiting = <-cm.trimTrigger: | |
| case <-cm.ctx.Done(): | |
| return | |
| } | |
| cm.trim() | |
| // Notify anyone waiting on this trim. | |
| if waiting != nil { | |
| close(waiting) | |
| } | |
| for { | |
| select { | |
| case waiting = <-cm.trimTrigger: | |
| if waiting != nil { | |
| close(waiting) | |
| } | |
| continue | |
| default: | |
| } | |
| break | |
| } | |
| } | |
| } | |
| func (cm *BasicConnMgr) trim() { | |
| cm.lastTrimMu.RLock() | |
| // read the last trim time under the lock | |
| lastTrim := cm.lastTrim | |
| cm.lastTrimMu.RUnlock() | |
| // skip this attempt to trim if the last one just took place. | |
| if time.Since(lastTrim) < cm.cfg.silencePeriod { | |
| return | |
| } | |
| // do the actual trim. | |
| defer log.EventBegin(cm.ctx, "connCleanup").Done() | |
| for _, c := range cm.getConnsToClose() { | |
| log.Info("closing conn: ", c.RemotePeer()) | |
| log.Event(cm.ctx, "closeConn", c.RemotePeer()) | |
| c.Close() | |
| } | |
| // finally, update the last trim time. | |
| cm.lastTrimMu.Lock() | |
| cm.lastTrim = time.Now() | |
| cm.lastTrimMu.Unlock() | |
| } | |
| // getConnsToClose runs the heuristics described in TrimOpenConns and returns the | |
| // connections to close. | |
| func (cm *BasicConnMgr) getConnsToClose() []network.Conn { | |
| if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 { | |
| // disabled | |
| return nil | |
| } | |
| nconns := int(atomic.LoadInt32(&cm.connCount)) | |
| if nconns <= cm.cfg.lowWater { | |
| log.Info("open connection count below limit") | |
| return nil | |
| } | |
| npeers := cm.segments.countPeers() | |
| candidates := make([]*peerInfo, 0, npeers) | |
| ncandidates := 0 | |
| gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod) | |
| cm.plk.RLock() | |
| for _, s := range cm.segments { | |
| s.Lock() | |
| for id, inf := range s.peers { | |
| if _, ok := cm.protected[id]; ok { | |
| // skip over protected peer. | |
| continue | |
| } | |
| if inf.firstSeen.After(gracePeriodStart) { | |
| // skip peers in the grace period. | |
| continue | |
| } | |
| candidates = append(candidates, inf) | |
| ncandidates += len(inf.conns) | |
| } | |
| s.Unlock() | |
| } | |
| cm.plk.RUnlock() | |
| if ncandidates < cm.cfg.lowWater { | |
| log.Info("open connection count above limit but too many are in the grace period") | |
| // We have too many connections but fewer than lowWater | |
| // connections out of the grace period. | |
| // | |
| // If we trimmed now, we'd kill potentially useful connections. | |
| return nil | |
| } | |
| // Sort peers according to their value. | |
| sort.Slice(candidates, func(i, j int) bool { | |
| left, right := candidates[i], candidates[j] | |
| // temporary peers are preferred for pruning. | |
| if left.temp != right.temp { | |
| return left.temp | |
| } | |
| // otherwise, compare by value. | |
| return left.value < right.value | |
| }) | |
| target := ncandidates - cm.cfg.lowWater | |
| // slightly overallocate because we may have more than one conns per peer | |
| selected := make([]network.Conn, 0, target+10) | |
| for _, inf := range candidates { | |
| if target <= 0 { | |
| break | |
| } | |
| // lock this to protect from concurrent modifications from connect/disconnect events | |
| s := cm.segments.get(inf.id) | |
| s.Lock() | |
| if len(inf.conns) == 0 && inf.temp { | |
| // handle temporary entries for early tags -- this entry has gone past the grace period | |
| // and still holds no connections, so prune it. | |
| delete(s.peers, inf.id) | |
| } else { | |
| for c := range inf.conns { | |
| selected = append(selected, c) | |
| } | |
| } | |
| target -= len(inf.conns) | |
| s.Unlock() | |
| } | |
| return selected | |
| } | |
| // GetTagInfo is called to fetch the tag information associated with a given | |
| // peer, nil is returned if p refers to an unknown peer. | |
| func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo { | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| pi, ok := s.peers[p] | |
| if !ok { | |
| return nil | |
| } | |
| out := &connmgr.TagInfo{ | |
| FirstSeen: pi.firstSeen, | |
| Value: pi.value, | |
| Tags: make(map[string]int), | |
| Conns: make(map[string]time.Time), | |
| } | |
| for t, v := range pi.tags { | |
| out.Tags[t] = v | |
| } | |
| for t, v := range pi.decaying { | |
| out.Tags[t.name] = v.Value | |
| } | |
| for c, t := range pi.conns { | |
| out.Conns[c.RemoteMultiaddr().String()] = t | |
| } | |
| return out | |
| } | |
| // TagPeer is called to associate a string and integer with a given peer. | |
| func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| pi := s.tagInfoFor(p) | |
| // Update the total value of the peer. | |
| pi.value += val - pi.tags[tag] | |
| pi.tags[tag] = val | |
| } | |
| // UntagPeer is called to disassociate a string and integer from a given peer. | |
| func (cm *BasicConnMgr) UntagPeer(p peer.ID, tag string) { | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| pi, ok := s.peers[p] | |
| if !ok { | |
| log.Info("tried to remove tag from untracked peer: ", p) | |
| return | |
| } | |
| // Update the total value of the peer. | |
| pi.value -= pi.tags[tag] | |
| delete(pi.tags, tag) | |
| } | |
| // UpsertTag is called to insert/update a peer tag | |
| func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) { | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| pi := s.tagInfoFor(p) | |
| oldval := pi.tags[tag] | |
| newval := upsert(oldval) | |
| pi.value += newval - oldval | |
| pi.tags[tag] = newval | |
| } | |
| // CMInfo holds the configuration for BasicConnMgr, as well as status data. | |
| type CMInfo struct { | |
| // The low watermark, as described in NewConnManager. | |
| LowWater int | |
| // The high watermark, as described in NewConnManager. | |
| HighWater int | |
| // The timestamp when the last trim was triggered. | |
| LastTrim time.Time | |
| // The configured grace period, as described in NewConnManager. | |
| GracePeriod time.Duration | |
| // The current connection count. | |
| ConnCount int | |
| } | |
| // GetInfo returns the configuration and status data for this connection manager. | |
| func (cm *BasicConnMgr) GetInfo() CMInfo { | |
| cm.lastTrimMu.RLock() | |
| lastTrim := cm.lastTrim | |
| cm.lastTrimMu.RUnlock() | |
| return CMInfo{ | |
| HighWater: cm.cfg.highWater, | |
| LowWater: cm.cfg.lowWater, | |
| LastTrim: lastTrim, | |
| GracePeriod: cm.cfg.gracePeriod, | |
| ConnCount: int(atomic.LoadInt32(&cm.connCount)), | |
| } | |
| } | |
| // Notifee returns a sink through which Notifiers can inform the BasicConnMgr when | |
| // events occur. Currently, the notifee only reacts upon connection events | |
| // {Connected, Disconnected}. | |
| func (cm *BasicConnMgr) Notifee() network.Notifiee { | |
| return (*cmNotifee)(cm) | |
| } | |
| type cmNotifee BasicConnMgr | |
| func (nn *cmNotifee) cm() *BasicConnMgr { | |
| return (*BasicConnMgr)(nn) | |
| } | |
| // Connected is called by notifiers to inform that a new connection has been established. | |
| // The notifee updates the BasicConnMgr to start tracking the connection. If the new connection | |
| // count exceeds the high watermark, a trim may be triggered. | |
| func (nn *cmNotifee) Connected(n network.Network, c network.Conn) { | |
| cm := nn.cm() | |
| p := c.RemotePeer() | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| id := c.RemotePeer() | |
| pinfo, ok := s.peers[id] | |
| if !ok { | |
| pinfo = &peerInfo{ | |
| id: id, | |
| firstSeen: time.Now(), | |
| tags: make(map[string]int), | |
| decaying: make(map[*decayingTag]*connmgr.DecayingValue), | |
| conns: make(map[network.Conn]time.Time), | |
| } | |
| s.peers[id] = pinfo | |
| } else if pinfo.temp { | |
| // we had created a temporary entry for this peer to buffer early tags before the | |
| // Connected notification arrived: flip the temporary flag, and update the firstSeen | |
| // timestamp to the real one. | |
| pinfo.temp = false | |
| pinfo.firstSeen = time.Now() | |
| } | |
| _, ok = pinfo.conns[c] | |
| if ok { | |
| log.Error("received connected notification for conn we are already tracking: ", p) | |
| return | |
| } | |
| pinfo.conns[c] = time.Now() | |
| atomic.AddInt32(&cm.connCount, 1) | |
| } | |
| // Disconnected is called by notifiers to inform that an existing connection has been closed or terminated. | |
| // The notifee updates the BasicConnMgr accordingly to stop tracking the connection, and performs housekeeping. | |
| func (nn *cmNotifee) Disconnected(n network.Network, c network.Conn) { | |
| cm := nn.cm() | |
| p := c.RemotePeer() | |
| s := cm.segments.get(p) | |
| s.Lock() | |
| defer s.Unlock() | |
| cinf, ok := s.peers[p] | |
| if !ok { | |
| log.Error("received disconnected notification for peer we are not tracking: ", p) | |
| return | |
| } | |
| _, ok = cinf.conns[c] | |
| if !ok { | |
| log.Error("received disconnected notification for conn we are not tracking: ", p) | |
| return | |
| } | |
| delete(cinf.conns, c) | |
| if len(cinf.conns) == 0 { | |
| delete(s.peers, p) | |
| } | |
| atomic.AddInt32(&cm.connCount, -1) | |
| } | |
| // Listen is no-op in this implementation. | |
| func (nn *cmNotifee) Listen(n network.Network, addr ma.Multiaddr) {} | |
| // ListenClose is no-op in this implementation. | |
| func (nn *cmNotifee) ListenClose(n network.Network, addr ma.Multiaddr) {} | |
| // OpenedStream is no-op in this implementation. | |
| func (nn *cmNotifee) OpenedStream(network.Network, network.Stream) {} | |
| // ClosedStream is no-op in this implementation. | |
| func (nn *cmNotifee) ClosedStream(network.Network, network.Stream) {} |