Skip to content

Commit

Permalink
Merge pull request #555 from rade/478_more_discriminate_connectivity
Browse files Browse the repository at this point in the history
more discriminate connectivity

Closes #478.
  • Loading branch information
rade committed Apr 15, 2015
2 parents 9bd2afc + 339f85c commit ab49eb4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 40 deletions.
74 changes: 49 additions & 25 deletions router/connection_maker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ const (
)

type ConnectionMaker struct {
ourself *LocalPeer
peers *Peers
normalisePeerAddr func(string) string
targets map[string]*Target
cmdLinePeers map[string]string
actionChan chan<- ConnectionMakerAction
ourself *LocalPeer
peers *Peers
port int
targets map[string]*Target
cmdLinePeers map[string]*net.TCPAddr
actionChan chan<- ConnectionMakerAction
}

// Information about an address where we may find a peer
Expand All @@ -33,13 +33,13 @@ type Target struct {

type ConnectionMakerAction func() bool

func NewConnectionMaker(ourself *LocalPeer, peers *Peers, normalisePeerAddr func(string) string) *ConnectionMaker {
func NewConnectionMaker(ourself *LocalPeer, peers *Peers, port int) *ConnectionMaker {
return &ConnectionMaker{
ourself: ourself,
peers: peers,
normalisePeerAddr: normalisePeerAddr,
cmdLinePeers: make(map[string]string),
targets: make(map[string]*Target)}
ourself: ourself,
peers: peers,
port: port,
cmdLinePeers: make(map[string]*net.TCPAddr),
targets: make(map[string]*Target)}
}

func (cm *ConnectionMaker) Start() {
Expand All @@ -49,16 +49,19 @@ func (cm *ConnectionMaker) Start() {
}

func (cm *ConnectionMaker) InitiateConnection(peer string) error {
addr, err := net.ResolveTCPAddr("tcp4", cm.normalisePeerAddr(peer))
host, port, err := net.SplitHostPort(peer)
if err != nil {
host = peer
port = "0" // we use that as an indication that "no port was supplied"
}
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%s", host, port))
if err != nil {
return err
}
address := addr.String()

cm.actionChan <- func() bool {
cm.cmdLinePeers[peer] = address
cm.cmdLinePeers[peer] = addr
// curtail any existing reconnect interval
if target, found := cm.targets[address]; found {
if target, found := cm.targets[addr.String()]; found {
target.tryAfter, target.tryInterval = tryImmediately()
}
return true
Expand Down Expand Up @@ -138,7 +141,7 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
// Copy the set of things we are connected to, so we can access
// them without locking. Also clear out any entries in cm.targets
// for existing connections.
ourConnectedPeers, ourConnectedTargets := cm.ourConnections()
ourConnectedPeers, ourConnectedTargets, ourInboundIPs := cm.ourConnections()

addTarget := func(address string) {
if _, connected := ourConnectedTargets[address]; connected {
Expand All @@ -154,9 +157,23 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
}

// Add command-line targets that are not connected
for _, address := range cm.cmdLinePeers {
addTarget(address)
for _, addr := range cm.cmdLinePeers {
completeAddr := *addr
attempt := true
if completeAddr.Port == 0 {
completeAddr.Port = cm.port
// If a peer was specified w/o a port, then we do not
// attempt to connect to it if we have any inbound
// connections from that IP.
if _, connected := ourInboundIPs[completeAddr.IP.String()]; connected {
attempt = false
}
}
address := completeAddr.String()
cmdLineTarget[address] = void
if attempt {
addTarget(address)
}
}

// Add targets for peers that someone else is connected to, but we
Expand All @@ -166,18 +183,25 @@ func (cm *ConnectionMaker) checkStateAndAttemptConnections() time.Duration {
return cm.connectToTargets(validTarget, cmdLineTarget)
}

func (cm *ConnectionMaker) ourConnections() (PeerNameSet, map[string]struct{}) {
func (cm *ConnectionMaker) ourConnections() (PeerNameSet, map[string]struct{}, map[string]struct{}) {
var (
ourConnectedPeers = make(PeerNameSet)
ourConnectedTargets = make(map[string]struct{})
ourInboundIPs = make(map[string]struct{})
)
for conn := range cm.ourself.Connections() {
address := conn.RemoteTCPAddr()
delete(cm.targets, address)
ourConnectedPeers[conn.Remote().Name] = void
ourConnectedTargets[address] = void
delete(cm.targets, address)
if conn.Outbound() {
continue
}
if ip, _, err := net.SplitHostPort(address); err == nil { // should always succeed
ourInboundIPs[ip] = void
}
}
return ourConnectedPeers, ourConnectedTargets
return ourConnectedPeers, ourConnectedTargets, ourInboundIPs
}

func (cm *ConnectionMaker) addPeerTargets(ourConnectedPeers PeerNameSet, addTarget func(string)) {
Expand All @@ -196,8 +220,8 @@ func (cm *ConnectionMaker) addPeerTargets(ourConnectedPeers PeerNameSet, addTarg
if conn.Outbound() {
addTarget(address)
}
if host, _, err := net.SplitHostPort(address); err == nil {
addTarget(cm.normalisePeerAddr(host))
if ip, _, err := net.SplitHostPort(address); err == nil {
addTarget(fmt.Sprintf("%s:%d", ip, cm.port))
}
}
})
Expand Down
6 changes: 2 additions & 4 deletions router/local_peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,8 @@ func (peer *LocalPeer) CreateConnection(peerAddr string, acceptNewPeer bool) err
if err := peer.checkConnectionLimit(); err != nil {
return err
}
// We're dialing the remote so that means connections will come from random ports
addrStr := peer.router.NormalisePeerAddr(peerAddr)
tcpAddr, tcpErr := net.ResolveTCPAddr("tcp4", addrStr)
udpAddr, udpErr := net.ResolveUDPAddr("udp4", addrStr)
tcpAddr, tcpErr := net.ResolveTCPAddr("tcp4", peerAddr)
udpAddr, udpErr := net.ResolveUDPAddr("udp4", peerAddr)
if tcpErr != nil || udpErr != nil {
// they really should have the same value, but just in case...
if tcpErr == nil {
Expand Down
12 changes: 1 addition & 11 deletions router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewRouter(config RouterConfig, name PeerName, nickName string) *Router {
router.Peers = NewPeers(router.Ourself.Peer, onPeerGC)
router.Peers.FetchWithDefault(router.Ourself.Peer)
router.Routes = NewRoutes(router.Ourself.Peer, router.Peers)
router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.NormalisePeerAddr)
router.ConnectionMaker = NewConnectionMaker(router.Ourself, router.Peers, router.Port)
router.TopologyGossip = router.NewGossip("topology", router)
return router
}
Expand Down Expand Up @@ -417,13 +417,3 @@ func (router *Router) applyTopologyUpdate(update []byte) (PeerNameSet, PeerNameS
}
return origUpdate, newUpdate, nil
}

// given an address like '1.2.3.4:567', return the address if it has a port,
// otherwise return the address with the default port number for the router
func (router *Router) NormalisePeerAddr(peerAddr string) string {
_, _, err := net.SplitHostPort(peerAddr)
if err == nil {
return peerAddr
}
return fmt.Sprintf("%s:%d", peerAddr, router.Port)
}

0 comments on commit ab49eb4

Please sign in to comment.