Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
jrick committed Oct 9, 2023
1 parent feeffb8 commit 04b6105
Showing 1 changed file with 108 additions and 50 deletions.
158 changes: 108 additions & 50 deletions mixing/mixclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,9 @@ func NewSession(w Wallet, rand io.Reader, coinjoin *CoinJoin) (*Session, error)
coinjoin: coinjoin,
expires: coinjoin.prExpiry,
mcount: coinjoin.mcount,
freshGen: false,
freshGen: true,
}

return ses, nil
}

Expand Down Expand Up @@ -267,37 +268,55 @@ func (s *Session) Dicemix(ctx context.Context) error {
}
}

sort.Slice(prs, func(i, j int) bool {
a := prs[i].Hash()
b := prs[j].Hash()
return bytes.Compare(a[:], b[:]) == -1
})

prHashes := make([]chainhash.Hash, len(prs))
for i := range prs {
prHashes[i] = prs[i].Hash()
}
sid := mixing.DeriveSessionID(prHashes)
s.sid = sid

// Session expires with the earliest PR expiry
newSession := true
var expiry int64
for _, pr := range prs {
if expiry == 0 || pr.Expiry < expiry {
expiry = pr.Expiry
}
}

s.deadlines.reset(epoch)
for i := uint32(0); ; i++ {
// Calculate new deadlines for reruns and repaired sessions.
if i != 0 {
s.deadlines.rerun()
}
if newSession {
sort.Slice(prs, func(i, j int) bool {
a := prs[i].Hash()
b := prs[j].Hash()
return bytes.Compare(a[:], b[:]) == -1
})

prHashes := make([]chainhash.Hash, len(prs))
for i := range prs {
prHashes[i] = prs[i].Hash()
}
sid := mixing.DeriveSessionID(prHashes)
s.sid = sid

// Session expires with the earliest PR expiry
for _, pr := range prs {
if expiry == 0 || pr.Expiry < expiry {
expiry = pr.Expiry
}
}

i = 0
newSession = false
}

err := s.run(ctx, i, expiry, prs)
var rr rerunError
if errors.As(err, &rr) {
prs = rr.Rerun()
var recreatedSessionErr *recreatedSessionError
if errors.As(err, &recreatedSessionErr) {
prs = recreatedSessionErr.prs

if len(prs) < MinPeers {
return ErrTooFewPeers
}

newSession = true
continue
}

var excludePeersErr excludePeersError
if errors.As(err, &excludePeersErr) {
prs = excludePeersErr.Rerun()
continue
}
if err != nil {
Expand All @@ -313,7 +332,13 @@ var (
errRerun = errors.New("rerun")
)

type rerunError interface {
type recreatedSessionError struct {
prs []*wire.MsgMixPR
}

func (e *recreatedSessionError) Error() string { return "recreated session" }

type excludePeersError interface {
error
Exclude() []identity
Rerun() []*wire.MsgMixPR
Expand Down Expand Up @@ -417,15 +442,17 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire
myStart: myStart,
}

s.prngSeed = new([32]byte)
_, err = io.ReadFull(s.rand, s.prngSeed[:])
if err != nil {
return err
}
s.prng = chacha20prng.New(s.prngSeed[:], 0)
if run == 0 || s.freshGen {
if s.freshGen {
s.freshGen = false

// Generate a new PRNG seed
s.prngSeed = new([32]byte)
_, err = io.ReadFull(s.rand, s.prngSeed[:])
if err != nil {
return err
}
s.prng = chacha20prng.New(s.prngSeed[:], run)

// Generate fresh keys from this run's PRNG
var err error
s.kx, err = mixing.NewKX(s.prng)
Expand Down Expand Up @@ -457,6 +484,9 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire
return err
}
}
} else {
// Generate a new PRNG from existing seed and this run number.
s.prng = chacha20prng.New(s.prngSeed[:], run)
}

seenPRs := make([]chainhash.Hash, len(prs))
Expand Down Expand Up @@ -490,10 +520,10 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire
//
// If this is not the case, we must find a different session that
// other peers are able to participate in. This must be a subset of
// the original PRs that peers have seen, and should optimize for
// including as many mixed outputs as possible. This is done using
// mp.ReceiveKEs(), which returns all KEs matching a PR pairing, even
// if they began in different sessions.
// the original PRs that peers have seen, and optimizes for including
// the most peers. This is done using mp.ReceiveKEs(), which returns
// all KEs matching a PR pairing, even if they began in different
// sessions.
rcv := new(mixpool.Received)
rcv.Run = run
rcv.Sid = s.sid
Expand All @@ -504,18 +534,50 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire
rcvCtxCancel()
kes := rcv.KEs
if len(kes) != len(vk) || errors.Is(err, errRunStageTimeout) {
if run == 0 {
// Based on the pairing data available, begin a new
// session.
if err := s.sleepUntil(ctx, s.deadlines.recvKE); err != nil {
return err
if run != 0 {
// Remove unresponsive peers and rerun
s.logf("received %d KEs for %d peers; rerunning", len(kes), len(vk))
return errRerun
}

// Based on the seen pairing data available, find an
// alternative session.
if err := s.sleepUntil(ctx, s.deadlines.recvKE); err != nil {
return err
}
prs := mp.CompatiblePRs(pairingID) // XXX: shadowed
kes := mp.ReceiveKEs(pairingID)

counts := make(map[chainhash.Hash]int)
maxCount := 0
for _, ke := range kes {
for _, prHash := range ke.SeenPRs {
counts[prHash]++
count := counts[prHash]
if maxCount < count {
maxCount = count
}
}
kes = mp.ReceiveKEs(pairingID)
// XXX
}
// Blame peers
s.logf("received %d KEs for %d peers; rerunning", len(kes), len(vk))
return errRerun
kept := 0
keptIdentities := make(map[identity]struct{})
for _, pr := range prs {
prHash := pr.Hash()
if counts[prHash] == maxCount {
prs[kept] = pr
keptIdentities[pr.Identity] = struct{}{}
}
}
prs = prs[:kept]

if len(prs) != maxCount {
// We have not seen the PR messages required to
// participate in the new session.
return errors.New("aborted session")
}

// Signal caller to begin a new session with these PRs.
return &recreatedSessionError{prs: prs}
}
if err != nil {
return err
Expand Down Expand Up @@ -730,12 +792,8 @@ func (s *Session) run(ctx context.Context, run uint32, expiry int64, prs []*wire
// Add outputs for each mixed message
for i := uint32(0); i < res.N; i++ {
mixedMsg := res.M(i)
s.logf("peer %d mixed message: %x", r.myVk, mixedMsg)
s.coinjoin.addMixedMessage(mixedMsg)
}
for i := range s.dcMsg {
s.logf("dcmsg %v: %x", i, s.dcMsg[i])
}
s.coinjoin.sort()

// Confirm that our messages and change are present
Expand Down

0 comments on commit 04b6105

Please sign in to comment.