From ef183de1d2536431fcb5cfd4df888ce6e3408e9c Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Fri, 5 Jul 2024 16:12:10 +0000 Subject: [PATCH] mixclient: Remove submit queue channel The submit queue channel was not actually increasing any performance. (*peer).submit() would synchronously wait for all error results, and the call to (*Wallet).SubmitMixMessage was already synchronized by the mixpool mutex. Furthermore, this also fixes a deadlock that was observed after a mixing wallet with the RPC syncer mode reconnected to a restarted dcrd. Pair request messages were being submitted onto the channel with the client mutex held in (*Client).Dicemix. However, handleSubmitQueue had already exited and the client had not yet been restarted after dcrd reconnect, and was unable to be started due to the locked mutex. --- mixing/mixclient/client.go | 42 +++----------------------------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/mixing/mixclient/client.go b/mixing/mixclient/client.go index f1e7516b9..8536d3e47 100644 --- a/mixing/mixclient/client.go +++ b/mixing/mixclient/client.go @@ -256,11 +256,6 @@ type sessionRun struct { cj *CoinJoin } -type queueMsg struct { - message mixing.Message - res chan error -} - type queueWork struct { p *peer f func(p *peer) error @@ -278,9 +273,8 @@ type Client struct { height uint32 mu sync.Mutex - warming chan struct{} - submitQueue chan *queueMsg - workQueue chan *queueWork + warming chan struct{} + workQueue chan *queueWork pairingWG sync.WaitGroup @@ -310,7 +304,6 @@ func NewClient(w Wallet) *Client { mixpool: w.Mixpool(), pairings: make(map[string]*pairedSessions), warming: make(chan struct{}), - submitQueue: make(chan *queueMsg, 200), workQueue: make(chan *queueWork, runtime.NumCPU()), blake256Hasher: blake256.New(), epoch: w.Mixpool().Epoch(), @@ -379,9 +372,6 @@ func (c *Client) Run(ctx context.Context) error { g.Go(func() error { return c.epochTicker(ctx) }) - g.Go(func() error { - return c.handleSubmitQueue(ctx) - }) for i := 0; i < runtime.NumCPU(); i++ { g.Go(func() error { return c.peerWorker(ctx) @@ -572,21 +562,7 @@ func (p *peer) signAndHash(m mixing.Message) error { } func (p *peer) submit(m mixing.Message) error { - qmsg := &queueMsg{ - message: m, - res: make(chan error, 1), - } - select { - case <-p.ctx.Done(): - return p.ctx.Err() - case p.client.submitQueue <- qmsg: - } - select { - case <-p.ctx.Done(): - return p.ctx.Err() - case err := <-qmsg.res: - return err - } + return p.client.wallet.SubmitMixMessage(p.ctx, m) } func (p *peer) signAndSubmit(m mixing.Message) error { @@ -713,18 +689,6 @@ func (c *Client) epochTicker(ctx context.Context) error { } } -func (c *Client) handleSubmitQueue(ctx context.Context) error { - for { - select { - case <-ctx.Done(): - return ctx.Err() - case qmsg := <-c.submitQueue: - err := c.wallet.SubmitMixMessage(ctx, qmsg.message) - qmsg.res <- err - } - } -} - // Dicemix performs a new mixing session for a coinjoin mix transaction. func (c *Client) Dicemix(ctx context.Context, cj *CoinJoin) error { select {