diff --git a/mixing/mixclient/client.go b/mixing/mixclient/client.go index 5805b5e222..d07cd3e64b 100644 --- a/mixing/mixclient/client.go +++ b/mixing/mixclient/client.go @@ -194,8 +194,9 @@ func NewSession(w Wallet, rand io.Reader, coinjoin *CoinJoin) (*Session, error) coinjoin: coinjoin, expires: coinjoin.prExpiry, mcount: coinjoin.mcount, - freshGen: false, + freshGen: true, } + return ses, nil } @@ -267,37 +268,55 @@ func (s *Session) Dicemix(ctx context.Context) error { } } - sort.Slice(prs, func(i, j int) bool { - a := prs[i].Hash() - b := prs[j].Hash() - return bytes.Compare(a[:], b[:]) == -1 - }) - - prHashes := make([]chainhash.Hash, len(prs)) - for i := range prs { - prHashes[i] = prs[i].Hash() - } - sid := mixing.DeriveSessionID(prHashes) - s.sid = sid - - // Session expires with the earliest PR expiry + newSession := true var expiry int64 - for _, pr := range prs { - if expiry == 0 || pr.Expiry < expiry { - expiry = pr.Expiry - } - } - s.deadlines.reset(epoch) for i := uint32(0); ; i++ { + // Calculate new deadlines for reruns and repaired sessions. if i != 0 { s.deadlines.rerun() } + if newSession { + sort.Slice(prs, func(i, j int) bool { + a := prs[i].Hash() + b := prs[j].Hash() + return bytes.Compare(a[:], b[:]) == -1 + }) + + prHashes := make([]chainhash.Hash, len(prs)) + for i := range prs { + prHashes[i] = prs[i].Hash() + } + sid := mixing.DeriveSessionID(prHashes) + s.sid = sid + + // Session expires with the earliest PR expiry + for _, pr := range prs { + if expiry == 0 || pr.Expiry < expiry { + expiry = pr.Expiry + } + } + + i = 0 + newSession = false + } err := s.run(ctx, i, expiry, prs) - var rr rerunError - if errors.As(err, &rr) { - prs = rr.Rerun() + var recreatedSessionErr *recreatedSessionError + if errors.As(err, &recreatedSessionErr) { + prs = recreatedSessionErr.prs + + if len(prs) < MinPeers { + return ErrTooFewPeers + } + + newSession = true + continue + } + + var excludePeersErr excludePeersError + if errors.As(err, &excludePeersErr) { + prs = excludePeersErr.Rerun() continue } if err != nil { @@ -313,7 +332,13 @@ var ( errRerun = errors.New("rerun") ) -type rerunError interface { +type recreatedSessionError struct { + prs []*wire.MsgMixPR +} + +func (e *recreatedSessionError) Error() string { return "recreated session" } + +type excludePeersError interface { error Exclude() []identity Rerun() []*wire.MsgMixPR @@ -417,15 +442,17 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire myStart: myStart, } - s.prngSeed = new([32]byte) - _, err = io.ReadFull(s.rand, s.prngSeed[:]) - if err != nil { - return err - } - s.prng = chacha20prng.New(s.prngSeed[:], 0) - if run == 0 || s.freshGen { + if s.freshGen { s.freshGen = false + // Generate a new PRNG seed + s.prngSeed = new([32]byte) + _, err = io.ReadFull(s.rand, s.prngSeed[:]) + if err != nil { + return err + } + s.prng = chacha20prng.New(s.prngSeed[:], run) + // Generate fresh keys from this run's PRNG var err error s.kx, err = mixing.NewKX(s.prng) @@ -457,6 +484,9 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire return err } } + } else { + // Generate a new PRNG from existing seed and this run number. + s.prng = chacha20prng.New(s.prngSeed[:], run) } seenPRs := make([]chainhash.Hash, len(prs)) @@ -490,10 +520,10 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire // // If this is not the case, we must find a different session that // other peers are able to participate in. This must be a subset of - // the original PRs that peers have seen, and should optimize for - // including as many mixed outputs as possible. This is done using - // mp.ReceiveKEs(), which returns all KEs matching a PR pairing, even - // if they began in different sessions. + // the original PRs that peers have seen, and optimizes for including + // the most peers. This is done using mp.ReceiveKEs(), which returns + // all KEs matching a PR pairing, even if they began in different + // sessions. rcv := new(mixpool.Received) rcv.Run = run rcv.Sid = s.sid @@ -504,18 +534,50 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire rcvCtxCancel() kes := rcv.KEs if len(kes) != len(vk) || errors.Is(err, errRunStageTimeout) { - if run == 0 { - // Based on the pairing data available, begin a new - // session. - if err := s.sleepUntil(ctx, s.deadlines.recvKE); err != nil { - return err + if run != 0 { + // Remove unresponsive peers and rerun + s.logf("received %d KEs for %d peers; rerunning", len(kes), len(vk)) + return errRerun + } + + // Based on the seen pairing data available, find an + // alternative session. + if err := s.sleepUntil(ctx, s.deadlines.recvKE); err != nil { + return err + } + prs := mp.CompatiblePRs(pairingID) // XXX: shadowed + kes := mp.ReceiveKEs(pairingID) + + counts := make(map[chainhash.Hash]int) + maxCount := 0 + for _, ke := range kes { + for _, prHash := range ke.SeenPRs { + counts[prHash]++ + count := counts[prHash] + if maxCount < count { + maxCount = count + } } - kes = mp.ReceiveKEs(pairingID) - // XXX } - // Blame peers - s.logf("received %d KEs for %d peers; rerunning", len(kes), len(vk)) - return errRerun + kept := 0 + keptIdentities := make(map[identity]struct{}) + for _, pr := range prs { + prHash := pr.Hash() + if counts[prHash] == maxCount { + prs[kept] = pr + keptIdentities[pr.Identity] = struct{}{} + } + } + prs = prs[:kept] + + if len(prs) != maxCount { + // We have not seen the PR messages required to + // participate in the new session. + return errors.New("aborted session") + } + + // Signal caller to begin a new session with these PRs. + return &recreatedSessionError{prs: prs} } if err != nil { return err @@ -730,12 +792,8 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire // Add outputs for each mixed message for i := uint32(0); i < res.N; i++ { mixedMsg := res.M(i) - s.logf("peer %d mixed message: %x", r.myVk, mixedMsg) s.coinjoin.addMixedMessage(mixedMsg) } - for i := range s.dcMsg { - s.logf("dcmsg %v: %x", i, s.dcMsg[i]) - } s.coinjoin.sort() // Confirm that our messages and change are present