Skip to content

Commit

Permalink
mixing: introduce module
Browse files Browse the repository at this point in the history
This introduces the mixing module and mixpool package which implements
a mixing message pool.  Similar to the transaction mempool, the
mixpool records recently observed mixing messages and allows these
messages to be temporarily stored in memory to be relayed through the
P2P network.  It handles message acceptance, expiry, UTXO ownership
proof checks, and that previously referenced messages have also been
accepted to the mixpool.

The mixpool is also designed with wallet usage in mind, providing most
of these same acceptance rules to mixing messages received by wallets,
and implements queries for messages matching compatible pairings and
ongoing sessions.
  • Loading branch information
jrick committed Aug 28, 2023
1 parent cefb02c commit 45a1a7e
Show file tree
Hide file tree
Showing 22 changed files with 2,524 additions and 30 deletions.
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1y
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7/go.mod h1:q4W45IWZaF22tdD+VEXcAWRA037jwmWEB5VWYORlTpc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
Expand Down
5 changes: 5 additions & 0 deletions internal/blockchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ type BlockChain struct {
bulkImportMode bool
}

// ChainParams returns the chain parameters.
func (b *BlockChain) ChainParams() *chaincfg.Params {
return b.chainParams
}

const (
// stakeMajorityCacheKeySize is comprised of the stake version and the
// hash size. The stake version is a little endian uint32, hence we
Expand Down
5 changes: 5 additions & 0 deletions internal/netsync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package netsync

import (
"github.com/decred/dcrd/dcrutil/v4"
"github.com/decred/dcrd/mixing"

Check failure on line 9 in internal/netsync/interface.go

View workflow job for this annotation

GitHub Actions / Go CI (1.19)

no required module provides package github.com/decred/dcrd/mixing; to add it:

Check failure on line 9 in internal/netsync/interface.go

View workflow job for this annotation

GitHub Actions / Go CI (1.20)

no required module provides package github.com/decred/dcrd/mixing; to add it:
)

// PeerNotifier provides an interface to notify peers of status changes related
Expand All @@ -14,4 +15,8 @@ type PeerNotifier interface {
// AnnounceNewTransactions generates and relays inventory vectors and
// notifies websocket clients of the passed transactions.
AnnounceNewTransactions(txns []*dcrutil.Tx)

// AnnounceMixMessage generates and relays inventory vectors of the
// passed messages.
AnnounceMixMessages(msgs []mixing.Message)
}
163 changes: 137 additions & 26 deletions internal/netsync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/decred/dcrd/internal/mempool"
"github.com/decred/dcrd/internal/progresslog"
"github.com/decred/dcrd/math/uint256"
"github.com/decred/dcrd/mixing"
"github.com/decred/dcrd/mixing/mixpool"

Check failure on line 28 in internal/netsync/manager.go

View workflow job for this annotation

GitHub Actions / Go CI (1.19)

no required module provides package github.com/decred/dcrd/mixing/mixpool; to add it:

Check failure on line 28 in internal/netsync/manager.go

View workflow job for this annotation

GitHub Actions / Go CI (1.20)

no required module provides package github.com/decred/dcrd/mixing/mixpool; to add it:
peerpkg "github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/wire"
)
Expand Down Expand Up @@ -65,6 +67,10 @@ const (
maxRejectedTxns = 62500
rejectedTxnsFPRate = 0.0000001

// XXX these numbers were copied from rejected txns
maxRejectedMixMsgs = 625000
rejectedMixMsgsFPRate = 0.0000001

// maxRequestedBlocks is the maximum number of requested block
// hashes to store in memory.
maxRequestedBlocks = wire.MaxInvPerMsg
Expand All @@ -73,6 +79,10 @@ const (
// hashes to store in memory.
maxRequestedTxns = wire.MaxInvPerMsg

// maxRequestedMixMsgs is the maximum number of hashes of in-flight
// mixing messages.
maxRequestedMixMsgs = wire.MaxInvPerMsg

// maxExpectedHeaderAnnouncementsPerMsg is the maximum number of headers in
// a single message that is expected when determining when the message
// appears to be announcing new blocks.
Expand Down Expand Up @@ -180,14 +190,22 @@ type processBlockMsg struct {
reply chan processBlockResponse
}

// mixMsg is a message type to be sent across the message channel for requesting
// a message's acceptence to the mixing pool.
type mixMsg struct {
msg mixing.Message
peer *peerpkg.Peer
}

// syncMgrPeer extends a peer to maintain additional state maintained by the
// sync manager.
type syncMgrPeer struct {
*peerpkg.Peer

syncCandidate bool
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
syncCandidate bool
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedMixMsgs map[chainhash.Hash]struct{}

// initialStateRequested tracks whether or not the initial state data has
// been requested from the peer.
Expand Down Expand Up @@ -281,13 +299,15 @@ type SyncManager struct {
// time.
minKnownWork *uint256.Uint256

rejectedTxns *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *syncMgrPeer
msgChan chan interface{}
peers map[*peerpkg.Peer]*syncMgrPeer
rejectedTxns *apbf.Filter
rejectedMixMsgs *apbf.Filter
requestedTxns map[chainhash.Hash]struct{}
requestedBlocks map[chainhash.Hash]struct{}
requestedMixMsgs map[chainhash.Hash]struct{}
progressLogger *progresslog.Logger
syncPeer *syncMgrPeer
msgChan chan interface{}
peers map[*peerpkg.Peer]*syncMgrPeer

// hdrSyncState houses the state used to track the initial header sync
// process and related stall handling.
Expand Down Expand Up @@ -570,6 +590,9 @@ func maybeRequestInitialState(peer *syncMgrPeer) {
err := m.AddTypes(wire.InitStateHeadBlocks,
wire.InitStateHeadBlockVotes,
wire.InitStateTSpends)
if err == nil && peer.ProtocolVersion() >= wire.MixVersion {
err = m.AddType(wire.InitStateMixPRs)
}
if err != nil {
log.Errorf("Unexpected error building getinitstate msg: %v", err)
return
Expand Down Expand Up @@ -618,10 +641,11 @@ func (m *SyncManager) handleNewPeerMsg(ctx context.Context, peer *peerpkg.Peer)
// Initialize the peer state
isSyncCandidate := m.isSyncCandidate(peer)
m.peers[peer] = &syncMgrPeer{
Peer: peer,
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
Peer: peer,
syncCandidate: isSyncCandidate,
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
}

// Start syncing by choosing the best candidate if needed.
Expand Down Expand Up @@ -688,6 +712,22 @@ BlockHashes:
// No peers found that have announced this data.
delete(m.requestedBlocks, blockHash)
}
inv.Type = wire.InvTypeMix
MixHashes:
for mixHash := range peer.requestedMixMsgs {
inv.Hash = mixHash
for pp, mgrPeer := range m.peers {
if !pp.IsKnownInventory(&inv) {
continue
}
invs := append(requestQueues[pp], inv)
requestQueues[pp] = invs
mgrPeer.requestedMixMsgs[mixHash] = struct{}{}
continue MixHashes
}
// No peers found that have announced this data.
delete(m.requestedMixMsgs, mixHash)
}
for pp, requestQueue := range requestQueues {
var numRequested int32
gdmsg := wire.NewMsgGetData()
Expand Down Expand Up @@ -779,6 +819,25 @@ func (m *SyncManager) handleTxMsg(tmsg *txMsg) {
m.cfg.PeerNotifier.AnnounceNewTransactions(acceptedTxs)
}

// handleMixMsg handles mixing messages from all peers.
func (m *SyncManager) handleMixMsg(mmsg *mixMsg) {
peer := lookupPeer(mmsg.peer, m.peers)
if peer == nil {
return
}

accepted, err := m.cfg.MixPool.AcceptMessage(mmsg.msg)
if err != nil {
log.Errorf("Failed to process mixing message: %v", mmsg.msg)
return
}
if accepted == nil {
return
}

m.cfg.PeerNotifier.AnnounceMixMessages([]mixing.Message{accepted})
}

// maybeUpdateIsCurrent potentially updates the manager to signal it believes
// the chain is considered synced.
//
Expand Down Expand Up @@ -1309,6 +1368,19 @@ func (m *SyncManager) needTx(hash *chainhash.Hash) bool {
return true
}

// needMixMsg returns whether or not the mixing message needs to be downloaded.
func (m *SyncManager) needMixMsg(hash *chainhash.Hash) bool {
if m.rejectedMixMsgs.Contains(hash[:]) {
return false
}

if m.cfg.MixPool.HaveMessage(hash) {
return false
}

return true
}

// handleInvMsg handles inv messages from all peers. This entails examining the
// inventory advertised by the remote peer for block and transaction
// announcements and acting accordingly.
Expand Down Expand Up @@ -1369,6 +1441,29 @@ func (m *SyncManager) handleInvMsg(imsg *invMsg) {
limitAdd(peer.requestedTxns, iv.Hash, maxRequestedTxns)
requestQueue = append(requestQueue, iv)
}

case wire.InvTypeMix:
// Add the mix message to the cache of known inventory
// for the peer. This helps avoid sending mix messages
// to the peer that it is already known to have.
peer.AddKnownInventory(iv)

// Ignore mixing messages before the chain is current or
// if the messages are not needed. Pair request (PR)
// messages reference unspent outputs that must be
// checked to exist and be unspent before they are
// accepted, and all later messages must reference an
// existing PR recorded in the mixing pool.
if !isCurrent || !m.needMixMsg(&iv.Hash) {
continue
}

// Request the mixing message if it is not already pending.
if _, exists := m.requestedMixMsgs[iv.Hash]; !exists {
limitAdd(m.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
limitAdd(peer.requestedMixMsgs, iv.Hash, maxRequestedMixMsgs)
requestQueue = append(requestQueue, iv)
}
}
}

Expand Down Expand Up @@ -1463,6 +1558,9 @@ out:
case *headersMsg:
m.handleHeadersMsg(msg)

case *mixMsg:
m.handleMixMsg(msg)

case *notFoundMsg:
m.handleNotFoundMsg(msg)

Expand Down Expand Up @@ -1576,6 +1674,13 @@ func (m *SyncManager) QueueHeaders(headers *wire.MsgHeaders, peer *peerpkg.Peer)
}
}

func (m *SyncManager) QueueMixMsg(msg mixing.Message, peer *peerpkg.Peer) {
select {
case m.msgChan <- &mixMsg{msg: msg, peer: peer}:
case <-m.quit:
}
}

// QueueNotFound adds the passed notfound message and peer to the event handling
// queue.
func (m *SyncManager) QueueNotFound(notFound *wire.MsgNotFound, peer *peerpkg.Peer) {
Expand Down Expand Up @@ -1847,6 +1952,10 @@ type Config struct {
// and querying the most recently confirmed transactions. It is useful for
// preventing duplicate requests.
RecentlyConfirmedTxns *apbf.Filter

// MixPool specifies the mixing pool to use for transient mixing
// messages broadcast across the network.
MixPool *mixpool.Pool
}

// New returns a new network chain synchronization manager. Use Run to begin
Expand All @@ -1863,17 +1972,19 @@ func New(config *Config) *SyncManager {
}

return &SyncManager{
cfg: *config,
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
peers: make(map[*peerpkg.Peer]*syncMgrPeer),
minKnownWork: minKnownWork,
hdrSyncState: makeHeaderSyncState(),
progressLogger: progresslog.New("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
quit: make(chan struct{}),
syncHeight: config.Chain.BestSnapshot().Height,
isCurrent: config.Chain.IsCurrent(),
cfg: *config,
rejectedTxns: apbf.NewFilter(maxRejectedTxns, rejectedTxnsFPRate),
rejectedMixMsgs: apbf.NewFilter(maxRejectedMixMsgs, rejectedMixMsgsFPRate),
requestedTxns: make(map[chainhash.Hash]struct{}),
requestedBlocks: make(map[chainhash.Hash]struct{}),
requestedMixMsgs: make(map[chainhash.Hash]struct{}),
peers: make(map[*peerpkg.Peer]*syncMgrPeer),
minKnownWork: minKnownWork,
hdrSyncState: makeHeaderSyncState(),
progressLogger: progresslog.New("Processed", log),
msgChan: make(chan interface{}, config.MaxPeers*3),
quit: make(chan struct{}),
syncHeight: config.Chain.BestSnapshot().Height,
isCurrent: config.Chain.IsCurrent(),
}
}
4 changes: 4 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/decred/dcrd/internal/mining/cpuminer"
"github.com/decred/dcrd/internal/netsync"
"github.com/decred/dcrd/internal/rpcserver"
"github.com/decred/dcrd/mixing/mixpool"
"github.com/decred/dcrd/peer/v3"
"github.com/decred/dcrd/txscript/v4"
"github.com/decred/slog"
Expand Down Expand Up @@ -68,6 +69,7 @@ var (
feesLog = backendLog.Logger("FEES")
indxLog = backendLog.Logger("INDX")
minrLog = backendLog.Logger("MINR")
mixpLog = backendLog.Logger("MIXP")
peerLog = backendLog.Logger("PEER")
rpcsLog = backendLog.Logger("RPCS")
scrpLog = backendLog.Logger("SCRP")
Expand All @@ -89,6 +91,7 @@ func init() {
indexers.UseLogger(indxLog)
mempool.UseLogger(txmpLog)
mining.UseLogger(minrLog)
mixpool.UseLogger(mixpLog)
cpuminer.UseLogger(minrLog)
peer.UseLogger(peerLog)
rpcserver.UseLogger(rpcsLog)
Expand All @@ -109,6 +112,7 @@ var subsystemLoggers = map[string]slog.Logger{
"FEES": feesLog,
"INDX": indxLog,
"MINR": minrLog,
"MIXP": mixpLog,
"PEER": peerLog,
"RPCS": rpcsLog,
"SCRP": scrpLog,
Expand Down
Loading

0 comments on commit 45a1a7e

Please sign in to comment.