Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

what I have. kinda buggy #96

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 121 additions & 25 deletions netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,9 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
// request parent blocks of orphans if we receive one we already have.
// Finally, attempt to detect potential stalls due to long side chains
// we already have and request more blocks to prevent them.
for i, iv := range invVects {
for i := 0; i < len(invVects); i++ {
iv := invVects[i]

// Ignore unsupported inventory types.
switch iv.Type {
case wire.InvTypeBlock:
Expand All @@ -1229,6 +1231,12 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeWitnessTx:
case wire.InvTypeUtreexoTx:
case wire.InvTypeWitnessUtreexoTx:
case wire.InvTypeUtreexoProofHash:
log.Infof("skipping proof hash of %v", iv.Hash)
// If the inv is a utreexo proof hash, then it means that
// we've already skipped/added the tx that it belongs to or
// we're in headers first mode.
continue
default:
continue
}
Expand Down Expand Up @@ -1269,6 +1277,28 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {

// Add it to the request queue.
state.requestQueue = append(state.requestQueue, iv)

// If the inv is for a utreexo tx, then also pop off the utreexo
// proof hash invs and add it to the request queue.
//if iv.Type&wire.InvTypeUtreexoTx == wire.InvTypeUtreexoTx {
if peer.IsUtreexoEnabled() {
switch iv.Type {
case wire.InvTypeTx:
case wire.InvTypeWitnessTx:
case wire.InvTypeUtreexoTx:
case wire.InvTypeWitnessUtreexoTx:
default:
continue
}

for j := i + 1; j < len(invVects); j++ {
if invVects[j].Type != wire.InvTypeUtreexoProofHash {
break
}
state.requestQueue = append(state.requestQueue, invVects[j])
}
}

continue
}

Expand Down Expand Up @@ -1339,14 +1369,16 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {

if peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessBlock
}

if peer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeWitnessUtreexoBlock
}
} else {
if peer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeUtreexoBlock
}
amUtreexoNode := sm.chain.IsUtreexoViewActive()
//if !peer.IsUtreexoEnabled() && amUtreexoNode {
// // Return now as non-utreexo nodes cannot serve us the
// // required proofs.
// return
//}
if amUtreexoNode {
iv.Type |= wire.InvUtreexoFlag
}

gdmsg.AddInvVect(iv)
Expand All @@ -1360,29 +1392,93 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
case wire.InvTypeUtreexoTx:
fallthrough
case wire.InvTypeTx:
// Request the transaction if there is not already a
// pending request.
if _, exists := sm.requestedTxns[iv.Hash]; !exists {
limitAdd(sm.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns)

// If the peer is capable, request the txn
// including all witness data.
if peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessTx
amUtreexoNode := sm.chain.IsUtreexoViewActive()
if amUtreexoNode {
//if !peer.IsUtreexoEnabled() {
// log.Debugf("skipping tx %v as peer %v isn't utreexo enabled", iv.Hash.String(), peer.Addr())
// // Return now as non-utreexo nodes cannot serve us the
// // required proofs.
// return
//}

// Request the transaction if there is not already a
// pending request.
if _, exists := sm.requestedTxns[iv.Hash]; !exists {
var packedProofs []chainhash.Hash
if len(requestQueue) == 0 {
log.Debugf("got inv for tx %v but no packed positions even "+
"though utreexo view is active", iv.Hash)
}

for len(requestQueue) > 0 && requestQueue[0].Type == wire.InvTypeUtreexoProofHash {
proofInv := requestQueue[0]
packedProofs = append(packedProofs, proofInv.Hash)

if peer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeWitnessUtreexoTx
requestQueue[0] = nil
requestQueue = requestQueue[1:]
}
} else {
if peer.IsUtreexoEnabled() {
iv.Type = wire.InvTypeUtreexoTx

log.Debugf("for tx %s(%v), got %v packed positions, which are %v",
iv.Hash, iv.Type.String(), packedProofs, chainhash.PackedHashesToUint64(packedProofs))

// Check that the proof invs+the current tx inv and all
// other requested invs do not go over the max inv per
// message limit.
if len(packedProofs)+1+numRequested+1 >= wire.MaxInvPerMsg {
break
}

var packedPositions []chainhash.Hash
if len(packedProofs) > 0 {
packedPositions = sm.chain.GetNeededPositions(packedProofs)
}

log.Debugf("need %v to prove tx %v", chainhash.PackedHashesToUint64(packedPositions), iv.Hash)

limitAdd(sm.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns)

// If the peer is capable, request the txn
// including all witness data.
if peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessTx
}

iv.Type |= wire.InvUtreexoFlag

log.Debugf("requesting inv type %v", iv.Type.String())

gdmsg.AddInvVect(iv)
numRequested++

for i := range packedPositions {
gdmsg.AddInvVect(wire.NewInvVect(
wire.InvTypeUtreexoProofHash,
&packedPositions[i]),
)
numRequested++
}
}
} else {
// Request the transaction if there is not already a
// pending request.
if _, exists := sm.requestedTxns[iv.Hash]; !exists {
limitAdd(sm.requestedTxns, iv.Hash, maxRequestedTxns)
limitAdd(state.requestedTxns, iv.Hash, maxRequestedTxns)

// If the peer is capable, request the txn
// including all witness data.
if peer.IsWitnessEnabled() {
iv.Type = wire.InvTypeWitnessTx
}

gdmsg.AddInvVect(iv)
numRequested++
gdmsg.AddInvVect(iv)
numRequested++
}
}
case wire.InvTypeUtreexoProofHash:
log.Debugf("skip here %v(%v)", iv.Type.String(), iv.Hash.String())
continue
}

if numRequested >= wire.MaxInvPerMsg {
Expand Down
127 changes: 111 additions & 16 deletions peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,8 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
p.cfg.Listeners.OnRead(p, n, msg, err)
}
if err != nil {
log.Infof("readMessage err %v. Peer %v, isutreexo?:%v",
err, p.Addr(), p.IsUtreexoEnabled())
return nil, nil, err
}

Expand All @@ -1052,6 +1054,19 @@ func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte,
return spew.Sdump(buf)
}))

if p.IsUtreexoEnabled() {
if msg.Command() == wire.CmdTx {
switch rmsg := msg.(type) {
case *wire.MsgTx:
log.Infof("readmessage got msg %v(%v) with encoding %v for peer %v",
msg.Command(), rmsg.TxHash().String(), encoding, p.Addr())
}
} else {
log.Infof("readmessage got msg %v with encoding %v for peer %v",
msg.Command(), encoding, p.Addr())
}
}

return msg, buf, nil
}

Expand Down Expand Up @@ -1089,6 +1104,18 @@ func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error {
// Write the message to the peer.
n, err := wire.WriteMessageWithEncodingN(p.conn, msg,
p.ProtocolVersion(), p.cfg.ChainParams.Net, enc)
if p.IsUtreexoEnabled() {
if msg.Command() == wire.CmdTx {
switch rmsg := msg.(type) {
case *wire.MsgTx:
log.Infof("writeMessage queuing msg %v(%v) with encoding %v for peer %v, err %v",
msg.Command(), rmsg.TxHash().String(), enc, p.Addr(), err)
}
} else {
log.Infof("writeMessage queuing msg %v with encoding %v for peer %v",
msg.Command(), enc, p.Addr())
}
}
atomic.AddUint64(&p.bytesSent, uint64(n))
if p.cfg.Listeners.OnWrite != nil {
p.cfg.Listeners.OnWrite(p, n, msg, err)
Expand Down Expand Up @@ -1350,6 +1377,15 @@ out:
// is done. The timer is reset below for the next iteration if
// needed.
rmsg, buf, err := p.readMessage(p.wireEncoding)

if p.IsUtreexoEnabled() {
if rmsg != nil {
if rmsg.Command() == wire.CmdGetData {
log.Infof("Received msg %v(%v) from peer %v, err %v", rmsg, rmsg.Command(), p.Addr(), err)
}
}
}

idleTimer.Stop()
if err != nil {
// In order to allow regression tests with malformed messages, don't
Expand Down Expand Up @@ -1591,6 +1627,10 @@ out:
for {
select {
case msg := <-p.outputQueue:
if p.IsUtreexoEnabled() {
log.Infof("queuing msg %v with encoding %v for peer %v",
msg.msg.Command(), msg.encoding, p.Addr())
}
waiting = queuePacket(msg, pendingMsgs, waiting)

// This channel is notified when a message has been sent across
Expand All @@ -1610,25 +1650,77 @@ out:
p.sendQueue <- val.(outMsg)

case ivs := <-p.outputInvChan:
for i := range ivs {
iv := ivs[i]
// Only utreexo txs get more than one inv at a time. We send these
// out immediately.
if len(ivs) > 1 {
invTypes := make([]string, 0, len(ivs))
for _, iv := range ivs {
invTypes = append(invTypes, iv.Type.String())
}

log.Infof("iv types queued %v for peer %v", invTypes, p.Addr())

// Don't send anything if we're disconnecting or there
// is no queued inventory.
// version is known if send queue has any entries.
if atomic.LoadInt32(&p.disconnect) != 0 {
continue
}

// No handshake? They'll find out soon enough.
if p.VersionKnown() {
// If this is a new block, then we'll blast it
// out immediately, sipping the inv trickle
// queue.
if iv.Type == wire.InvTypeBlock ||
iv.Type == wire.InvTypeUtreexoBlock ||
iv.Type == wire.InvTypeWitnessBlock ||
iv.Type == wire.InvTypeWitnessUtreexoBlock {

invMsg := wire.NewMsgInvSizeHint(1)
invMsg := wire.NewMsgInvSizeHint(uint(len(ivs)))
for i := range ivs {
iv := ivs[i]
// Don't send inventory that became known after
// the initial check.
if p.knownInventory.Contains(iv) {
continue
}

if iv.Type == wire.InvTypeTx ||
iv.Type == wire.InvTypeWitnessTx ||
iv.Type == wire.InvTypeUtreexoTx ||
iv.Type == wire.InvTypeWitnessUtreexoTx {
// Add the inventory that is being relayed to
// the known inventory for the peer.
p.AddKnownInventory(ivs[i])
} else if iv.Type == wire.InvTypeUtreexoProofHash {
} else {
continue
}

log.Infof("adding type %v to the queue for peer %v",
iv.Type.String(), p.Addr())
invMsg.AddInvVect(iv)
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
} else {
invSendQueue.PushBack(iv)
}

log.Infof("Immediately queuing invs (%d) for peer %v, waiting %v",
len(ivs), p.Addr(), waiting)
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
}
} else {
for i := range ivs {
iv := ivs[i]

// No handshake? They'll find out soon enough.
if p.VersionKnown() {
// If this is a new block, then we'll blast it
// out immediately, skipping the inv trickle
// queue.
if iv.Type == wire.InvTypeBlock ||
iv.Type == wire.InvTypeUtreexoBlock ||
iv.Type == wire.InvTypeWitnessBlock ||
iv.Type == wire.InvTypeWitnessUtreexoBlock {

invMsg := wire.NewMsgInvSizeHint(1)
invMsg.AddInvVect(iv)
waiting = queuePacket(outMsg{msg: invMsg},
pendingMsgs, waiting)
} else {
invSendQueue.PushBack(iv)
}
}
}
}
Expand Down Expand Up @@ -1857,7 +1949,9 @@ func (p *Peer) QueueInventory(invVects []*wire.InvVect) {
for i := 0; i < len(invVects); i++ {
// Don't add the inventory to the send queue if the peer is already
// known to have it.
if p.knownInventory.Contains(invVects[i]) {
if invVects[i].Type != wire.InvTypeUtreexoProofHash &&
p.knownInventory.Contains(invVects[i]) {

invVects = append(invVects[:i], invVects[i+1:]...)
}
}
Expand Down Expand Up @@ -1960,6 +2054,7 @@ func (p *Peer) readRemoteVersionMsg() error {
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeUtreexo == wire.SFNodeUtreexo {
log.Infof("peer %v is a utreexo node")
p.utreexoEnabled = true
}
p.flagsMtx.Unlock()
Expand Down
Loading
Loading