Skip to content

Commit

Permalink
Merge pull request #37 from DanielKrawisz/addr_race
Browse files Browse the repository at this point in the history
Fixed bug that causes the server to run out of peers eventually.
  • Loading branch information
ishbir committed Jun 20, 2015
2 parents 9cf17f8 + 491dc5d commit e33ed8e
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 151 deletions.
12 changes: 9 additions & 3 deletions addrmgr/addrmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddress) {
}

addr := NetAddressKey(netAddr)
// Is the address already known to the address manager?
ka := a.find(netAddr)
if ka != nil {
// TODO(oga) only update addresses periodically.
Expand Down Expand Up @@ -217,9 +218,14 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddress) {
// XXX time penalty?
}

bucket := a.getNewBucket(netAddr, srcAddr)
// If we get to this part of the function, then netAddr is not in a tried
// bucket and might be in one or more new buckets. If this is the first time
// we've seen the address, then it will not be in any bucket. If it is already
// in a new bucket, it might be added to anothter new bucket.

// Already exists?
// This is a random number, but it might happen to be the same as it was
// before.
bucket := a.getNewBucket(netAddr, srcAddr)
if _, ok := a.addrNew[bucket][addr]; ok {
return
}
Expand All @@ -235,7 +241,7 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *wire.NetAddress) {
a.addrNew[bucket][addr] = ka

log.Tracef("Added new address %s for a total of %d addresses", addr,
a.nTried+a.nNew)
a.nNew+a.nTried)
}

// expireNew makes space in the new buckets by expiring the really bad entries.
Expand Down
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ const (
defaultDbType = "memdb"
defaultPort = 8444
defaultRPCPort = 8442
defaultMaxUpPerPeer = 1024 * 1024 // 1MBps
defaultMaxDownPerPeer = 1024 * 1024
defaultMaxUpPerPeer = 2 * 1024 * 1024 // 2MBps
defaultMaxDownPerPeer = 2 * 1024 * 1024 // 2MBps
defaultMaxOutbound = 10
defaultRequestTimeout = time.Minute * 3
)
Expand Down
45 changes: 23 additions & 22 deletions objectmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ const (
// name.
objectDbNamePrefix = "objects"

// MaxPeerRequests is the maximum number of outstanding object requests a
// maxPeerRequests is the maximum number of outstanding object requests a
// may have at any given time.
MaxPeerRequests = 120
maxPeerRequests = 120
)

// newPeerMsg signifies a newly connected peer to the object manager.
Expand Down Expand Up @@ -151,9 +151,12 @@ func (om *ObjectManager) handleObjectMsg(omsg *objectMsg) {
// thus disconnecting legitimate peers. We want to prevent against such
// an attack by checking that objects came from peers that we requested
// from.
// NOTE: PyBitmessage does not do this, so if this actually happens it
// should be considered more likely to be an indication of a bug in bmd
// itself rather than a malicious peer.
objmgrLog.Error(omsg.peer.addr.String(),
" Disconnecting because of unrequested object ", invVect.Hash.String()[:8], " received.")
omsg.peer.disconnect()
om.server.disconPeers <- omsg.peer
return
}

Expand Down Expand Up @@ -245,12 +248,14 @@ func (om *ObjectManager) handleInvMsg(imsg *invMsg) {

// If the request can fit, we just request it from the peer that told us
// about it in the first place.
if numInvs <= MaxPeerRequests-imsg.peer.inventory.NumRequests() {
if numInvs <= maxPeerRequests-imsg.peer.inventory.NumRequests() {

for _, iv := range requestList[:numInvs] {
now := time.Now()
objmgrLog.Trace(imsg.peer.addr.String(), " requests ", iv.Hash.String()[:8], " at ", now)
om.requested[*iv] = &peerRequest{
peer: imsg.peer,
timestamp: time.Now(),
timestamp: now,
}
}
objmgrLog.Trace("All are assigned to peer ", imsg.peer.addr.String(),
Expand Down Expand Up @@ -323,7 +328,7 @@ func (om *ObjectManager) handleReadyPeer(peer *bmpeer) {
objmgrLog.Trace("Assigning objects to peer ", peer.addr.String())

// If the object is already downloading too much, return.
max := MaxPeerRequests - peer.inventory.NumRequests()
max := maxPeerRequests - peer.inventory.NumRequests()
if max <= 0 {
return
}
Expand Down Expand Up @@ -357,9 +362,11 @@ func (om *ObjectManager) handleReadyPeer(peer *bmpeer) {
requestList[assigned] = &newiv
delete(om.unknown, iv)

now := time.Now()
objmgrLog.Trace(peer.addr.String(), " requests ", iv.Hash.String()[:8], " at ", now)
om.requested[iv] = &peerRequest{
peer: peer,
timestamp: time.Now(),
timestamp: now,
}

assigned++
Expand Down Expand Up @@ -402,8 +409,9 @@ loop:
// request and set it to timeout within the set duration.
func (om *ObjectManager) clearRequests(d time.Duration) {
now := time.Now()
// Collect all peers to be disconnected in one map or we lock up the
// objectHandler with too many requests at once.
// Collect all peers to be disconnected in one map so that we don't try
// to disconnect a peer more than once. Otherwise a channel could be locked
// up if many requests from the same peer timed out at once.
peerDisconnect := make(map[*bmpeer]struct{})
for hash, p := range om.requested {
// if request has expired
Expand All @@ -423,20 +431,13 @@ func (om *ObjectManager) clearRequests(d time.Duration) {
p.timestamp.Add(d*time.Duration(peerQueueSize)).Before(now),
"; cond 2 = ", p.peer.lastReceipt.Add(d).Before(now))))

om.server.disconPeers <- p.peer // we're done with this malicious peer

peerDisconnect[p.peer] = struct{}{}
}
}
}
}

// Disconnect the peers gradually in a separate go routine so as not
// to deluge the object manager with requests.
go func() {
for peer := range peerDisconnect {
peer.disconnect() // we're done with this malicious peer
time.Sleep(time.Second)
}
}()
}

// handleRelayInvMsg deals with relaying inventory to peers that are not already
Expand Down Expand Up @@ -496,6 +497,9 @@ func (om *ObjectManager) objectHandler() {
case m := <-om.msgChan:
switch msg := m.(type) {

case *readyPeerMsg:
om.handleReadyPeer((*bmpeer)(msg))

case *newPeerMsg:
om.handleNewPeer(msg.peer)

Expand All @@ -507,9 +511,6 @@ func (om *ObjectManager) objectHandler() {

case *donePeerMsg:
om.handleDonePeer(msg.peer)

case *readyPeerMsg:
om.handleReadyPeer((*bmpeer)(msg))
}
}
}
Expand All @@ -532,7 +533,7 @@ func (om *ObjectManager) ReadyPeer(p *bmpeer) {
return
}

om.msgChan <- &p
om.msgChan <- (*readyPeerMsg)(p)
}

// QueueObject adds the passed object message and peer to the object handling
Expand Down
32 changes: 12 additions & 20 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type bmpeer struct {
inbound bool
lastReceipt time.Time
invReceived bool
signalReady int
knownAddresses map[string]struct{}
StatsMtx sync.RWMutex // protects all statistics below here.
versionKnown bool
Expand Down Expand Up @@ -169,21 +170,21 @@ func (p *bmpeer) PushVerAckMsg() {
peerLog.Debug(p.peer.PrependAddr("Ver ack message sent."))
}

func max(x, y int) int {
if x > y {
return x
}
return y
}

// PushGetDataMsg creates a GetData message and sends it to the remote peer.
func (p *bmpeer) PushGetDataMsg(ivl []*wire.InvVect) {
if len(ivl) == 0 {
return
}

p.inventory.AddRequest(len(ivl))
peerLog.Debug(p.peer.PrependAddr(fmt.Sprint(len(ivl), " requests assigned for a total of ", p.inventory.NumRequests(), ".")))
if p.inventory.NumRequests() > maxPeerRequests/2 {
p.signalReady = p.inventory.NumRequests() / 2
} else {
p.signalReady = 0
}
peerLog.Debug(p.peer.PrependAddr(fmt.Sprint(len(ivl),
" requests assigned for a total of ", p.inventory.NumRequests(),
"; signal ready at ", p.signalReady)))

x := 0
for len(ivl)-x > wire.MaxInvPerMsg {
Expand All @@ -196,15 +197,6 @@ func (p *bmpeer) PushGetDataMsg(ivl []*wire.InvVect) {
p.QueueMessage(&wire.MsgGetData{InvList: ivl[x:]})
peerLog.Debug(p.peer.PrependAddr(fmt.Sprint("Get data message sent with ", len(ivl)-x, " hashes.")))
}

// TODO the peer should signal the object manager when it is ready to download more.
// This can be done once the object manager downloads more intelligently.
/*if p.inventory.NumRequests() > cfg.MaxPeerRequests/2 {
p.signalReady = p.inventory.NumRequests() / 2
} else {
p.signalReady = 0
p.server.objectManager.ReadyPeer(p)
}*/
}

// PushInvMsg creates and sends an Inv message and sends it to the remote peer.
Expand Down Expand Up @@ -431,10 +423,10 @@ func (p *bmpeer) HandleObjectMsg(msg *wire.MsgObject) error {

p.server.objectManager.QueueObject(msg, p)

// TODO signal object manager that we are ready to download more.
/*if p.signalReady == p.inventory.NumRequests() {
// signal object manager that we are ready to download more.
if p.signalReady == p.inventory.NumRequests() || p.inventory.NumRequests() == 0 {
p.server.objectManager.ReadyPeer(p)
}*/
}
p.server.addrManager.Connected(p.na)

return nil
Expand Down
5 changes: 2 additions & 3 deletions peer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (pc *connection) WriteMessage(msg wire.Message) error {
pc.bytesSent += uint64(n)
pc.lastWrite = time.Now()
pc.maxUp.Transfer(float64(n))
pc.idleTimer.Reset(pc.idleTimeout)
pc.sentMtx.Unlock()

if err != nil {
Expand Down Expand Up @@ -97,6 +98,7 @@ func (pc *connection) ReadMessage() (wire.Message, error) {
pc.bytesReceived += uint64(n)
pc.lastRead = time.Now()
pc.maxDown.Transfer(float64(n))
pc.idleTimer.Reset(pc.idleTimeout)
pc.receivedMtx.Unlock()

if err != nil {
Expand All @@ -107,8 +109,6 @@ func (pc *connection) ReadMessage() (wire.Message, error) {
return nil, err
}

pc.idleTimer.Reset(pc.idleTimeout)

return msg, nil
}

Expand Down Expand Up @@ -203,7 +203,6 @@ func NewConnection(addr net.Addr, maxDown, maxUp int64) Connection {

pc.idleTimer = time.AfterFunc(pc.idleTimeout, func() {
pc.WriteMessage(&wire.MsgPong{})
pc.idleTimer.Reset(idleTimeout)
})

pc.idleTimer.Stop()
Expand Down
10 changes: 5 additions & 5 deletions peer/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
)

const (
// queueSize is the size of data and message queues. TODO arbitrarily set as
// 20.
queueSize = 20
// sendQueueSize is the size of data and message queues. The number should be
// small if possible, and ideally we would use an unbuffered channel eventually.
sendQueueSize = 5
)

// Send handles everything that is to be sent to the remote peer eventually.
Expand Down Expand Up @@ -309,8 +309,8 @@ func (send *send) PrependAddr(str string) string {
func NewSend(inventory *Inventory, db database.Db) Send {
return &send{
trickleTime: time.Second * 10,
msgQueue: make(chan wire.Message, queueSize),
dataQueue: make(chan wire.Message, queueSize),
msgQueue: make(chan wire.Message, sendQueueSize),
dataQueue: make(chan wire.Message, sendQueueSize),
outputInvChan: make(chan []*wire.InvVect, outputBufferSize),
requestQueue: make(chan []*wire.InvVect, outputBufferSize),
quit: make(chan struct{}),
Expand Down
8 changes: 7 additions & 1 deletion peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,7 +976,11 @@ func TestOutboundPeerHandshake(t *testing.T) {
InteractionComplete: true,
DisconnectExpected: true,
},
// TODO send more improperly timed messages here. GetData, inv, and object all need to be tested for disconnection.
&PeerAction{
Messages: []wire.Message{testObj[0]},
InteractionComplete: true,
DisconnectExpected: true,
},
}

localAddr := &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8333}
Expand All @@ -994,6 +998,7 @@ func TestOutboundPeerHandshake(t *testing.T) {

for testCase, response := range responses {
defer resetCfg(cfg)()

NewConn = handshakePeerBuilder(response)

// Create server and start it.
Expand Down Expand Up @@ -1089,6 +1094,7 @@ func TestInboundPeerHandshake(t *testing.T) {

for testCase, open := range openingMsg {
defer resetCfg(cfg)()

// Create server and start it.
listeners := []string{net.JoinHostPort("", "8445")}
var err error
Expand Down

0 comments on commit e33ed8e

Please sign in to comment.