Skip to content

Commit

Permalink
Merge pull request #91 from kcalvinalvin/2023-12-19-add-ability-to-qu…
Browse files Browse the repository at this point in the history
…eue-multiple-invs

peer, main: allow for queuing multiple invvects
  • Loading branch information
kcalvinalvin committed Dec 21, 2023
2 parents 09a0c64 + 4480a0e commit 56725df
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 28 deletions.
59 changes: 35 additions & 24 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ type Peer struct {
outputQueue chan outMsg
sendQueue chan outMsg
sendDoneQueue chan struct{}
outputInvChan chan *wire.InvVect
outputInvChan chan []*wire.InvVect
inQuit chan struct{}
queueQuit chan struct{}
outQuit chan struct{}
Expand Down Expand Up @@ -1609,23 +1609,27 @@ out:
val := pendingMsgs.Remove(next)
p.sendQueue <- val.(outMsg)

case iv := <-p.outputInvChan:
// No handshake? They'll find out soon enough.
if p.VersionKnown() {
// If this is a new block, then we'll blast it
// out immediately, sipping the inv trickle
// queue.
if iv.Type == wire.InvTypeBlock ||
iv.Type == wire.InvTypeUtreexoBlock ||
iv.Type == wire.InvTypeWitnessBlock ||
iv.Type == wire.InvTypeWitnessUtreexoBlock {

invMsg := wire.NewMsgInvSizeHint(1)
invMsg.AddInvVect(iv)
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
} else {
invSendQueue.PushBack(iv)
case ivs := <-p.outputInvChan:
for i := range ivs {
iv := ivs[i]

// No handshake? They'll find out soon enough.
if p.VersionKnown() {
// If this is a new block, then we'll blast it
// out immediately, sipping the inv trickle
// queue.
if iv.Type == wire.InvTypeBlock ||
iv.Type == wire.InvTypeUtreexoBlock ||
iv.Type == wire.InvTypeWitnessBlock ||
iv.Type == wire.InvTypeWitnessUtreexoBlock {

invMsg := wire.NewMsgInvSizeHint(1)
invMsg.AddInvVect(iv)
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
} else {
invSendQueue.PushBack(iv)
}
}
}

Expand Down Expand Up @@ -1849,10 +1853,17 @@ func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct
// Inventory that the peer is already known to have is ignored.
//
// This function is safe for concurrent access.
func (p *Peer) QueueInventory(invVect *wire.InvVect) {
// Don't add the inventory to the send queue if the peer is already
// known to have it.
if p.knownInventory.Contains(invVect) {
func (p *Peer) QueueInventory(invVects []*wire.InvVect) {
for i := 0; i < len(invVects); i++ {
// Don't add the inventory to the send queue if the peer is already
// known to have it.
if p.knownInventory.Contains(invVects[i]) {
invVects = append(invVects[:i], invVects[i+1:]...)
}
}

// Return if we have nothing left.
if len(invVects) == 0 {
return
}

Expand All @@ -1863,7 +1874,7 @@ func (p *Peer) QueueInventory(invVect *wire.InvVect) {
return
}

p.outputInvChan <- invVect
p.outputInvChan <- invVects
}

// Connected returns whether or not the peer is currently connected.
Expand Down Expand Up @@ -2261,7 +2272,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer {
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
outputInvChan: make(chan []*wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
Expand Down
6 changes: 3 additions & 3 deletions peer/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -664,9 +664,9 @@ func TestOutboundPeer(t *testing.T) {
fakeInv := wire.NewInvVect(wire.InvTypeBlock, fakeBlockHash)

// Should be noops as the peer could not connect.
p.QueueInventory(fakeInv)
p.QueueInventory([]*wire.InvVect{fakeInv})
p.AddKnownInventory(fakeInv)
p.QueueInventory(fakeInv)
p.QueueInventory([]*wire.InvVect{fakeInv})

fakeMsg := wire.NewMsgVerAck()
p.QueueMessage(fakeMsg, nil)
Expand Down Expand Up @@ -710,7 +710,7 @@ func TestOutboundPeer(t *testing.T) {
}

// Test Queue Inv after connection
p1.QueueInventory(fakeInv)
p1.QueueInventory([]*wire.InvVect{fakeInv})
p1.Disconnect()

// Test regression
Expand Down
2 changes: 1 addition & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,7 +2035,7 @@ func (s *server) handleRelayInvMsg(state *peerState, msg relayMsg) {
// Queue the inventory to be relayed with the next batch.
// It will be ignored if the peer is already known to
// have the inventory.
sp.QueueInventory(msg.invVect)
sp.QueueInventory([]*wire.InvVect{msg.invVect})
})
}

Expand Down

0 comments on commit 56725df

Please sign in to comment.