Skip to content

Commit

Permalink
Merge 12c97b4 into cec1efb
Browse files Browse the repository at this point in the history
  • Loading branch information
rade committed Apr 9, 2015
2 parents cec1efb + 12c97b4 commit 5b1d3aa
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 17 deletions.
8 changes: 7 additions & 1 deletion router/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type LocalConnection struct {
stackFrag bool
effectivePMTU int
SessionKey *[32]byte
heartbeatTCP *time.Ticker
heartbeatTimeout *time.Timer
heartbeatFrame *ForwardedFrame
heartbeat *time.Ticker
Expand Down Expand Up @@ -339,6 +340,7 @@ func (conn *LocalConnection) run(actionChan <-chan ConnectionAction, finished ch
// goroutine, at least not without some synchronisation.

func (conn *LocalConnection) initHeartbeats() error {
conn.heartbeatTCP = time.NewTicker(TCPHeartbeat)
conn.heartbeatTimeout = time.NewTimer(HeartbeatTimeout)
heartbeatFrameBytes := make([]byte, EthernetOverhead+8)
binary.BigEndian.PutUint64(heartbeatFrameBytes[EthernetOverhead:], conn.uid)
Expand All @@ -360,6 +362,8 @@ func (conn *LocalConnection) actorLoop(actionChan <-chan ConnectionAction) (err
break
}
err = action()
case <-conn.heartbeatTCP.C:
err = conn.sendSimpleProtocolMsg(ProtocolHeartbeat)
case <-conn.heartbeatTimeout.C:
err = fmt.Errorf("timed out waiting for UDP heartbeat")
case <-tickerChan(conn.heartbeat):
Expand All @@ -386,6 +390,7 @@ func (conn *LocalConnection) shutdown() {
conn.heartbeatTimeout.Stop()
}

stopTicker(conn.heartbeatTCP)
stopTicker(conn.heartbeat)
stopTicker(conn.fragTest)

Expand Down Expand Up @@ -439,6 +444,7 @@ func (conn *LocalConnection) receiveTCP(decoder *gob.Decoder) {

func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte) error {
switch tag {
case ProtocolHeartbeat:
case ProtocolConnectionEstablished:
// We sent fast heartbeats to the remote peer, which has now
// received at least one of them and told us via this message.
Expand Down Expand Up @@ -469,7 +475,7 @@ func (conn *LocalConnection) handleProtocolMsg(tag ProtocolTag, payload []byte)
}

func (conn *LocalConnection) extendReadDeadline() {
conn.TCPConn.SetReadDeadline(time.Now().Add(ReadTimeout))
conn.TCPConn.SetReadDeadline(time.Now().Add(TCPHeartbeat * 2))
}

func (conn *LocalConnection) sendFastHeartbeats() error {
Expand Down
2 changes: 1 addition & 1 deletion router/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ const (
UDPNonceSendAt = 8192
FragTestSize = 60001
PMTUDiscoverySize = 60000
TCPHeartbeat = 30 * time.Second
FastHeartbeat = 500 * time.Millisecond
SlowHeartbeat = 10 * time.Second
FragTestInterval = 5 * time.Minute
ReadTimeout = 1 * time.Minute
PMTUVerifyAttempts = 8
PMTUVerifyTimeout = 10 * time.Millisecond // gets doubled with every attempt
MaxDuration = time.Duration(math.MaxInt64)
Expand Down
50 changes: 36 additions & 14 deletions router/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (router *Router) NewGossip(channelName string, g Gossiper) Gossip {

func (router *Router) SendAllGossip() {
for _, channel := range router.GossipChannels {
channel.Send(channel.gossiper.Gossip())
channel.Send(router.Ourself.Name, channel.gossiper.Gossip())
}
}

Expand Down Expand Up @@ -167,34 +167,56 @@ func (c *GossipChannel) deliverBroadcast(srcName PeerName, _ []byte, dec *gob.De
return c.relayBroadcast(srcName, data)
}

func (c *GossipChannel) deliver(_ PeerName, _ []byte, dec *gob.Decoder) error {
func (c *GossipChannel) deliver(srcName PeerName, _ []byte, dec *gob.Decoder) error {
var payload []byte
if err := dec.Decode(&payload); err != nil {
return err
}
if data, err := c.gossiper.OnGossip(payload); err != nil {
return err
} else if data != nil {
c.Send(data)
c.Send(srcName, data)
}
return nil
}

func (c *GossipChannel) Send(data GossipData) {
connections := c.ourself.Connections() // do this outside the lock so they don't nest
retainedSenders := make(connectionSenders)
func (c *GossipChannel) Send(srcName PeerName, data GossipData) {
// do this outside the lock below so we avoid lock nesting
c.ourself.Router.Routes.EnsureRecalculated()
selectedConnections := make(ConnectionSet)
for name := range c.ourself.Router.Routes.RandomNeighbours(srcName) {
if conn, found := c.ourself.ConnectionTo(name); found {
selectedConnections[conn] = void
}
}
if len(selectedConnections) == 0 {
return
}
connections := c.ourself.Connections()
c.Lock()
defer c.Unlock()
for conn := range connections {
c.sendDown(conn, data)
retainedSenders[conn] = c.senders[conn]
delete(c.senders, conn)
// GC - randomly (courtesy of go's map iterator) pick some
// existing entries and stop&remove them if the associated
// connection is no longer active. We stop as soon as we
// encounter a valid entry; the idea being that when there is
// little or no garbage then this executes close to O(1)[1],
// whereas when there is lots of garbage we remove it quickly.
//
// [1] TODO Unfortunately, due to the desire to avoid nested
// locks, instead of simply invoking Peer.ConnectionTo(name)
// below, we have that Peer.Connections() invocation above. That
// is O(n_our_connections) at best.
for conn, sender := range c.senders {
if _, found := connections[conn]; !found {
delete(c.senders, conn)
sender.Stop()
} else {
break
}
}
// stop any senders for connections that are gone
for _, sender := range c.senders {
sender.Stop()
for conn := range selectedConnections {
c.sendDown(conn, data)
}
c.senders = retainedSenders
}

func (c *GossipChannel) SendDown(conn Connection, data GossipData) {
Expand Down
3 changes: 2 additions & 1 deletion router/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ const (
type ProtocolTag byte

const (
ProtocolConnectionEstablished ProtocolTag = iota
ProtocolHeartbeat ProtocolTag = iota
ProtocolConnectionEstablished
ProtocolFragmentationReceived
ProtocolStartFragmentationTest
ProtocolNonce
Expand Down
31 changes: 31 additions & 0 deletions router/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package router
import (
"bytes"
"fmt"
"math"
"sync"
)

Expand Down Expand Up @@ -77,6 +78,36 @@ func (routes *Routes) BroadcastAll(name PeerName) []PeerName {
return hops
}

// Choose min(log2(n_peers), n_neighbouring_peers) neighbours, with a
// random distribution that is topology-sensitive, favouring
// neighbours at the end of "bottleneck links". We determine the
// latter based on the unicast routing table. If a neighbour appears
// as the value more frequently than others - meaning that we reach a
// higher proportion of peers via that neighbour than other neighbours
// - then it is chosen with a higher probability.
//
// Note that we choose log2(n_peers) *neighbours*, not
// peers. Consequently, on sparsely connected peers this function
// returns a higher proportion of neighbours than elsewhere. In
// extremis, on peers with fewer than log2(n_peers) neighbours, all
// neighbours are returned.
func (routes *Routes) RandomNeighbours(except PeerName) PeerNameSet {
res := make(PeerNameSet)
routes.RLock()
defer routes.RUnlock()
count := int(math.Log2(float64(len(routes.unicastAll))))
// depends on go's random map iteration
for _, dst := range routes.unicastAll {
if dst != UnknownPeerName && dst != except {
res[dst] = void
if len(res) >= count {
break
}
}
}
return res
}

func (routes *Routes) String() string {
var buf bytes.Buffer
routes.RLock()
Expand Down

0 comments on commit 5b1d3aa

Please sign in to comment.