Skip to content

Commit

Permalink
mixclient: Remove submit queue channel
Browse files Browse the repository at this point in the history
The submit queue channel was not actually increasing any performance.
(*peer).submit() would synchronously wait for all error results, and the call
to (*Wallet).SubmitMixMessage was already synchronized by the mixpool mutex.

Furthermore, this also fixes a deadlock that was observed after a mixing
wallet with the RPC syncer mode reconnected to a restarted dcrd.  Pair request
messages were being submitted onto the channel with the client mutex held in
(*Client).Dicemix.  However, handleSubmitQueue had already exited and the
client had not yet been restarted after dcrd reconnect, and was unable to be
started due to the locked mutex.
  • Loading branch information
jrick authored and davecgh committed Jul 6, 2024
1 parent 02a286d commit ef183de
Showing 1 changed file with 3 additions and 39 deletions.
42 changes: 3 additions & 39 deletions mixing/mixclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,6 @@ type sessionRun struct {
cj *CoinJoin
}

type queueMsg struct {
message mixing.Message
res chan error
}

type queueWork struct {
p *peer
f func(p *peer) error
Expand All @@ -278,9 +273,8 @@ type Client struct {
height uint32
mu sync.Mutex

warming chan struct{}
submitQueue chan *queueMsg
workQueue chan *queueWork
warming chan struct{}
workQueue chan *queueWork

pairingWG sync.WaitGroup

Expand Down Expand Up @@ -310,7 +304,6 @@ func NewClient(w Wallet) *Client {
mixpool: w.Mixpool(),
pairings: make(map[string]*pairedSessions),
warming: make(chan struct{}),
submitQueue: make(chan *queueMsg, 200),
workQueue: make(chan *queueWork, runtime.NumCPU()),
blake256Hasher: blake256.New(),
epoch: w.Mixpool().Epoch(),
Expand Down Expand Up @@ -379,9 +372,6 @@ func (c *Client) Run(ctx context.Context) error {
g.Go(func() error {
return c.epochTicker(ctx)
})
g.Go(func() error {
return c.handleSubmitQueue(ctx)
})
for i := 0; i < runtime.NumCPU(); i++ {
g.Go(func() error {
return c.peerWorker(ctx)
Expand Down Expand Up @@ -572,21 +562,7 @@ func (p *peer) signAndHash(m mixing.Message) error {
}

func (p *peer) submit(m mixing.Message) error {
qmsg := &queueMsg{
message: m,
res: make(chan error, 1),
}
select {
case <-p.ctx.Done():
return p.ctx.Err()
case p.client.submitQueue <- qmsg:
}
select {
case <-p.ctx.Done():
return p.ctx.Err()
case err := <-qmsg.res:
return err
}
return p.client.wallet.SubmitMixMessage(p.ctx, m)
}

func (p *peer) signAndSubmit(m mixing.Message) error {
Expand Down Expand Up @@ -713,18 +689,6 @@ func (c *Client) epochTicker(ctx context.Context) error {
}
}

func (c *Client) handleSubmitQueue(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case qmsg := <-c.submitQueue:
err := c.wallet.SubmitMixMessage(ctx, qmsg.message)
qmsg.res <- err
}
}
}

// Dicemix performs a new mixing session for a coinjoin mix transaction.
func (c *Client) Dicemix(ctx context.Context, cj *CoinJoin) error {
select {
Expand Down

0 comments on commit ef183de

Please sign in to comment.