From 102e9d3bb8593c0a4d1cd3210c87cc4e6a22c62e Mon Sep 17 00:00:00 2001 From: Josh Rickmar Date: Thu, 6 Jun 2024 18:12:30 +0000 Subject: [PATCH] add in some message jitter --- mixing/internal/uniform/rand.go | 202 ++++++++++++++++++++++++++++++++ mixing/mixclient/blame.go | 8 +- mixing/mixclient/client.go | 189 ++++++++++++++++++++++++++++-- 3 files changed, 383 insertions(+), 16 deletions(-) create mode 100644 mixing/internal/uniform/rand.go diff --git a/mixing/internal/uniform/rand.go b/mixing/internal/uniform/rand.go new file mode 100644 index 0000000000..4eacef1f3a --- /dev/null +++ b/mixing/internal/uniform/rand.go @@ -0,0 +1,202 @@ +// Copyright (c) 2024 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. +// +// Uniform random algorithms modified from the Go math/rand/v2 package with +// the following license: +// +// Copyright (c) 2009 The Go Authors. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +// Package uniform provides uniformly distributed, cryptographically secure +// random numbers with randomness obtained from a crypto/rand.Reader or other +// CSPRNG reader. +// +// Random sources are required to never error; any errors reading the random +// source will result in a panic. +package uniform + +import ( + "encoding/binary" + "fmt" + "io" + "math/bits" + "time" +) + +func read(rand io.Reader, buf []byte) { + _, err := io.ReadFull(rand, buf) + if err != nil { + panic(fmt.Errorf("uniform: read of random source errored: %w", err)) + } +} + +// Uint32 returns a uniform random uint32. +func Uint32(rand io.Reader) uint32 { + b := make([]byte, 4) + read(rand, b) + return binary.LittleEndian.Uint32(b) +} + +// Uint64 returns a uniform random uint64. +func Uint64(rand io.Reader) uint64 { + b := make([]byte, 8) + read(rand, b) + return binary.LittleEndian.Uint64(b) +} + +// Uint32n returns a random uint32 in range [0,n) without modulo bias. +func Uint32n(rand io.Reader, n uint32) uint32 { + if n&(n-1) == 0 { // n is power of two, can mask + return uint32(Uint64(rand)) & (n - 1) + } + // On 64-bit systems we still use the uint64 code below because + // the probability of a random uint64 lo being < a uint32 n is near zero, + // meaning the unbiasing loop almost never runs. + // On 32-bit systems, here we need to implement that same logic in 32-bit math, + // both to preserve the exact output sequence observed on 64-bit machines + // and to preserve the optimization that the unbiasing loop almost never runs. + // + // We want to compute + // hi, lo := bits.Mul64(r.Uint64(), n) + // In terms of 32-bit halves, this is: + // x1:x0 := r.Uint64() + // 0:hi, lo1:lo0 := bits.Mul64(x1:x0, 0:n) + // Writing out the multiplication in terms of bits.Mul32 allows + // using direct hardware instructions and avoiding + // the computations involving these zeros. + x := Uint64(rand) + lo1a, lo0 := bits.Mul32(uint32(x), n) + hi, lo1b := bits.Mul32(uint32(x>>32), n) + lo1, c := bits.Add32(lo1a, lo1b, 0) + hi += c + if lo1 == 0 && lo0 < uint32(n) { + n64 := uint64(n) + thresh := uint32(-n64 % n64) + for lo1 == 0 && lo0 < thresh { + x := Uint64(rand) + lo1a, lo0 = bits.Mul32(uint32(x), n) + hi, lo1b = bits.Mul32(uint32(x>>32), n) + lo1, c = bits.Add32(lo1a, lo1b, 0) + hi += c + } + } + return hi +} + +const is32bit = ^uint(0)>>32 == 0 + +// Uint64n returns a random uint32 in range [0,n) without modulo bias. +func Uint64n(rand io.Reader, n uint64) uint64 { + if is32bit && uint64(uint32(n)) == n { + return uint64(Uint32n(rand, uint32(n))) + } + if n&(n-1) == 0 { // n is power of two, can mask + return Uint64(rand) & (n - 1) + } + + // Suppose we have a uint64 x uniform in the range [0,2⁶⁴) + // and want to reduce it to the range [0,n) preserving exact uniformity. + // We can simulate a scaling arbitrary precision x * (n/2⁶⁴) by + // the high bits of a double-width multiply of x*n, meaning (x*n)/2⁶⁴. + // Since there are 2⁶⁴ possible inputs x and only n possible outputs, + // the output is necessarily biased if n does not divide 2⁶⁴. + // In general (x*n)/2⁶⁴ = k for x*n in [k*2⁶⁴,(k+1)*2⁶⁴). + // There are either floor(2⁶⁴/n) or ceil(2⁶⁴/n) possible products + // in that range, depending on k. + // But suppose we reject the sample and try again when + // x*n is in [k*2⁶⁴, k*2⁶⁴+(2⁶⁴%n)), meaning rejecting fewer than n possible + // outcomes out of the 2⁶⁴. + // Now there are exactly floor(2⁶⁴/n) possible ways to produce + // each output value k, so we've restored uniformity. + // To get valid uint64 math, 2⁶⁴ % n = (2⁶⁴ - n) % n = -n % n, + // so the direct implementation of this algorithm would be: + // + // hi, lo := bits.Mul64(r.Uint64(), n) + // thresh := -n % n + // for lo < thresh { + // hi, lo = bits.Mul64(r.Uint64(), n) + // } + // + // That still leaves an expensive 64-bit division that we would rather avoid. + // We know that thresh < n, and n is usually much less than 2⁶⁴, so we can + // avoid the last four lines unless lo < n. + // + // See also: + // https://lemire.me/blog/2016/06/27/a-fast-alternative-to-the-modulo-reduction + // https://lemire.me/blog/2016/06/30/fast-random-shuffling + hi, lo := bits.Mul64(Uint64(rand), n) + if lo < n { + thresh := -n % n + for lo < thresh { + hi, lo = bits.Mul64(Uint64(rand), n) + } + } + return hi +} + +// Int32 returns a random 31-bit non-negative integer as an int32 without +// modulo bias. +func Int32(rand io.Reader) int32 { + return int32(Uint32(rand) & 0x7FFFFFFF) +} + +// Int32n returns, as an int32, a random 31-bit non-negative integer in [0,n) +// without modulo bias. +// Panics if n <= 0. +func Int32n(rand io.Reader, n int32) int32 { + if n <= 0 { + panic("uniform: invalid argument to Int32n") + } + return int32(Uint32n(rand, uint32(n))) +} + +// Int64 returns a random 63-bit non-negative integer as an int64 without +// modulo bias. +func Int64(rand io.Reader) int64 { + return int64(Uint64(rand) & 0x7FFFFFFF_FFFFFFFF) +} + +// Int64n returns, as an int64, a random 63-bit non-negative integer in [0,n) +// without modulo bias. +// Panics if n <= 0. +func Int64n(rand io.Reader, n int64) int64 { + if n <= 0 { + panic("uniform: invalid argument to Int64n") + } + return int64(Uint64n(rand, uint64(n))) +} + +// Duration returns a random duration in [0,n) without modulo bias. +// Panics if n <= 0. +func Duration(rand io.Reader, n time.Duration) time.Duration { + if n <= 0 { + panic("uniform: invalid argument to Duration") + } + return time.Duration(Uint64n(rand, uint64(n))) +} diff --git a/mixing/mixclient/blame.go b/mixing/mixclient/blame.go index 248d633ff6..bb613a01a3 100644 --- a/mixing/mixclient/blame.go +++ b/mixing/mixclient/blame.go @@ -65,13 +65,13 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) { } }() - err = c.forLocalPeers(ctx, sesRun, func(p *peer) error { + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { // Send initial secrets messages from any peers who detected // misbehavior. if !p.triggeredBlame { return nil } - return p.signAndSubmit(p.rs) + return p.rs }) if err != nil { return err @@ -88,13 +88,13 @@ func (c *Client) blame(ctx context.Context, sesRun *sessionRun) (err error) { } // Send remaining secrets messages. - err = c.forLocalPeers(ctx, sesRun, func(p *peer) error { + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { if p.triggeredBlame { p.triggeredBlame = false return nil } p.rs.SeenSecrets = rsHashes - return p.signAndSubmit(p.rs) + return p.rs }) if err != nil { return err diff --git a/mixing/mixclient/client.go b/mixing/mixclient/client.go index 9794bbe1f7..7504ebffbd 100644 --- a/mixing/mixclient/client.go +++ b/mixing/mixclient/client.go @@ -26,6 +26,7 @@ import ( "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/decred/dcrd/mixing" "github.com/decred/dcrd/mixing/internal/chacha20prng" + "github.com/decred/dcrd/mixing/internal/uniform" "github.com/decred/dcrd/mixing/mixpool" "github.com/decred/dcrd/txscript/v4" "github.com/decred/dcrd/txscript/v4/stdaddr" @@ -39,6 +40,10 @@ const MinPeers = 4 const pairingFlags byte = 0 +const timeoutDuration = 30 * time.Second + +const maxJitter = timeoutDuration / 10 + // expiredPRErr indicates that a dicemix session failed to complete due to the // submitted pair request expiring. func expiredPRErr(pr *wire.MsgMixPairReq) error { @@ -126,8 +131,6 @@ type deadlines struct { recvCM time.Time } -const timeoutDuration = 30 * time.Second - func (d *deadlines) start(begin time.Time) { t := begin add := func() time.Time { @@ -180,6 +183,7 @@ type peer struct { ke *wire.MsgMixKeyExchange ct *wire.MsgMixCiphertexts sr *wire.MsgMixSlotReserve + fp *wire.MsgMixFactoredPoly dc *wire.MsgMixDCNet cm *wire.MsgMixConfirm @@ -431,6 +435,63 @@ func (c *Client) forLocalPeers(ctx context.Context, s *sessionRun, f func(p *pee return errors.Join(errs...) } +type delayedMsg struct { + t time.Time + m mixing.Message + p *peer +} + +func (c *Client) sendLocalPeerMsgs(ctx context.Context, s *sessionRun, m func(p *peer) mixing.Message) error { + msgs := make([]delayedMsg, 0, len(s.peers)) + + now := time.Now() + for _, p := range s.peers { + if p.remote { + continue + } + msg := m(p) + if p.triggeredBlame { + msg = p.rs + } + if msg == nil { + continue + } + msgs = append(msgs, delayedMsg{ + t: now.Add(uniform.Duration(cryptorand.Reader, maxJitter)), + m: msg, + p: p, + }) + } + sort.Slice(msgs, func(i, j int) bool { + return msgs[i].t.Before(msgs[j].t) + }) + + resChans := make([]chan error, 0, len(s.peers)) + for i := range msgs { + res := make(chan error, 1) + resChans = append(resChans, res) + m := msgs[i] + time.Sleep(time.Until(m.t)) + qsend := &queueWork{ + p: m.p, + f: func(p *peer) error { + return p.signAndSubmit(m.m) + }, + res: res, + } + select { + case <-ctx.Done(): + res <- ctx.Err() + case c.workQueue <- qsend: + } + } + var errs = make([]error, len(resChans)) + for i := range errs { + errs[i] = <-resChans[i] + } + return errors.Join(errs...) +} + // waitForEpoch blocks until the next epoch, or errors when the context is // cancelled early. Returns the calculated epoch for stage timeout // calculations. @@ -445,10 +506,44 @@ func (c *Client) waitForEpoch(ctx context.Context) (time.Time, error) { <-timer.C } return epoch, ctx.Err() + case <-c.testTickC: + if !timer.Stop() { + <-timer.C + } + return epoch, nil case <-timer.C: return epoch, nil + } +} + +// prDelay waits until an appropriate time before the PR should be authored +// and published. PRs will not be sent within +/-30s of the epoch, and a +// small amount of jitter is added to help avoid timing deanonymization +// attacks. +func (c *Client) prDelay(ctx context.Context) error { + now := time.Now().UTC() + epoch := now.Truncate(c.epoch).Add(c.epoch) + sendBefore := epoch.Add(-timeoutDuration - maxJitter) + sendAfter := epoch.Add(timeoutDuration) + var wait time.Duration + if now.After(sendBefore) { + wait = sendAfter.Sub(now) + } + wait += uniform.Duration(cryptorand.Reader, maxJitter) + timer := time.NewTimer(wait) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() case <-c.testTickC: - return epoch, nil + if !timer.Stop() { + <-timer.C + } + return nil + case <-timer.C: + return nil } } @@ -494,6 +589,9 @@ func (p *peer) submit(m mixing.Message) error { } func (p *peer) signAndSubmit(m mixing.Message) error { + if m == nil { + return nil + } err := p.signAndHash(m) if err != nil { return err @@ -634,6 +732,11 @@ func (c *Client) Dicemix(ctx context.Context, rand io.Reader, cj *CoinJoin) erro return ctx.Err() } + err := c.prDelay(ctx) + if err != nil { + return err + } + pub, priv, err := generateSecp256k1(rand) if err != nil { return err @@ -775,6 +878,7 @@ func (c *Client) pairSession(ctx context.Context, ps *pairedSessions, prs []*wir p.ke = nil p.ct = nil p.sr = nil + p.fp = nil p.dc = nil p.cm = nil p.rs = nil @@ -1047,7 +1151,16 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) p.ke = ke p.rs = rs - return p.signAndSubmit(ke) + return nil + }) + if err != nil { + c.logf("%v", err) + } + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + if p.ke == nil { + return nil + } + return p.ke }) if err != nil { c.logf("%v", err) @@ -1255,8 +1368,20 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) ct := wire.NewMsgMixCiphertexts(*p.id, sesRun.sid, 0, pqct, seenKEs) p.ct = ct c.testHook(hookBeforePeerCTPublish, sesRun, p) - return p.signAndSubmit(ct) + return nil + }) + if err != nil { + c.logf("%v", err) + } + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + if p.ct == nil { + return nil + } + return p.ct }) + if err != nil { + c.logf("%v", err) + } // Receive all ciphertext messages rcv := new(mixpool.Received) @@ -1333,8 +1458,11 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) sr := wire.NewMsgMixSlotReserve(*p.id, sesRun.sid, 0, srMixBytes, seenCTs) p.sr = sr c.testHook(hookBeforePeerSRPublish, sesRun, p) - return p.signAndSubmit(sr) + return nil }) + if err != nil { + c.logf("%v", err) + } if len(blamedMap) > 0 { for id := range blamedMap { blamed = append(blamed, id) @@ -1345,6 +1473,15 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) if err != nil { return err } + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + if p.sr == nil { + return nil + } + return p.sr + }) + if err != nil { + c.logf("%v", err) + } // Receive all slot reservation messages rcv.CTs = nil @@ -1415,7 +1552,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) dc := wire.NewMsgMixDCNet(*p.id, sesRun.sid, 0, p.dcNet, seenSRs) p.dc = dc c.testHook(hookBeforePeerDCPublish, sesRun, p) - return p.signAndSubmit(dc) + return nil }) if errors.Is(err, errTriggeredBlame) { return err @@ -1423,6 +1560,15 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) if err != nil { sesLog.logf("DC-net error: %v", err) } + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + if p.dc == nil { + return nil + } + return p.dc + }) + if err != nil { + c.logf("%v", err) + } // Receive all DC messages rcv.SRs = nil @@ -1495,7 +1641,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) cm := wire.NewMsgMixConfirm(*p.id, sesRun.sid, 0, p.coinjoin.Tx().Copy(), seenDCs) p.cm = cm - return p.signAndSubmit(cm) + return nil }) if errors.Is(err, errTriggeredBlame) { return err @@ -1503,6 +1649,15 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) if err != nil { sesLog.logf("Confirm error: %v", err) } + err = c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + if p.cm == nil { + return nil + } + return p.cm + }) + if err != nil { + c.logf("%v", err) + } // Receive all CM messages rcv.DCs = nil @@ -1558,6 +1713,7 @@ func (c *Client) run(ctx context.Context, ps *pairedSessions, madePairing *bool) return err } + time.Sleep(uniform.Duration(cryptorand.Reader, maxJitter)) err = c.wallet.PublishTransaction(context.Background(), cj.tx) if err != nil { return err @@ -1600,12 +1756,21 @@ func (c *Client) roots(ctx context.Context, seenSRs []chainhash.Hash, for i, root := range roots { rootBytes[i] = root.Bytes() } - err = c.forLocalPeers(ctx, sesRun, func(p *peer) error { - fp := wire.NewMsgMixFactoredPoly(*p.id, sesRun.sid, + c.forLocalPeers(ctx, sesRun, func(p *peer) error { + p.fp = wire.NewMsgMixFactoredPoly(*p.id, sesRun.sid, 0, rootBytes, seenSRs) - return p.signAndSubmit(fp) + return nil }) - return roots, err + // Don't wait for these messages to send. + go func() { + err := c.sendLocalPeerMsgs(ctx, sesRun, func(p *peer) mixing.Message { + return p.fp + }) + if err != nil { + c.logf("%v", err) + } + }() + return roots, nil } // Clients unable to solve their own roots must wait for solutions.