Skip to content

Commit

Permalink
Merge pull request #3951 from joostjager/exclusive-group-sweeper
Browse files Browse the repository at this point in the history
sweep: add exclusive groups
  • Loading branch information
joostjager committed Feb 14, 2020
2 parents 8904b68 + 9dc3494 commit b75259a
Show file tree
Hide file tree
Showing 3 changed files with 173 additions and 19 deletions.
45 changes: 45 additions & 0 deletions sweep/bucket_list.go
@@ -0,0 +1,45 @@
package sweep

// bucket contains a set of inputs that are not mutually exclusive.
type bucket pendingInputs

// tryAdd tries to add a new input to this bucket.
func (b bucket) tryAdd(input *pendingInput) bool {
exclusiveGroup := input.params.ExclusiveGroup
if exclusiveGroup != nil {
for _, input := range b {
existingGroup := input.params.ExclusiveGroup
if existingGroup != nil &&
*existingGroup == *exclusiveGroup {

return false
}
}
}

b[*input.OutPoint()] = input

return true
}

// bucketList is a list of buckets that contain non-mutually exclusive inputs.
type bucketList struct {
buckets []bucket
}

// add adds a new input. If the input is not accepted by any of the existing
// buckets, a new bucket will be created.
func (b *bucketList) add(input *pendingInput) {
for _, existingBucket := range b.buckets {
if existingBucket.tryAdd(input) {
return
}
}

// Create a new bucket and add the input. It is not necessary to check
// the return value of tryAdd because it will always succeed on an empty
// bucket.
newBucket := make(bucket)
newBucket.tryAdd(input)
b.buckets = append(b.buckets, newBucket)
}
87 changes: 68 additions & 19 deletions sweep/sweeper.go
Expand Up @@ -50,6 +50,11 @@ var (
// request from a client whom did not specify a fee preference.
ErrNoFeePreference = errors.New("no fee preference specified")

// ErrExclusiveGroupSpend is returned in case a different input of the
// same exclusive group was spent.
ErrExclusiveGroupSpend = errors.New("other member of exclusive group " +
"was spent")

// ErrSweeperShuttingDown is an error returned when a client attempts to
// make a request to the UtxoSweeper, but it is unable to handle it as
// it is/has already been stoppepd.
Expand All @@ -71,11 +76,16 @@ type Params struct {
// Force indicates whether the input should be swept regardless of
// whether it is economical to do so.
Force bool

// ExclusiveGroup is an identifier that, if set, prevents other inputs
// with the same identifier from being batched together.
ExclusiveGroup *uint64
}

// String returns a human readable interpretation of the sweep parameters.
func (p Params) String() string {
return fmt.Sprintf("fee=%v, force=%v", p.Fee, p.Force)
return fmt.Sprintf("fee=%v, force=%v, exclusive_group=%v",
p.Fee, p.Force, p.ExclusiveGroup)
}

// pendingInput is created when an input reaches the main loop for the first
Expand Down Expand Up @@ -402,10 +412,9 @@ func (s *UtxoSweeper) SweepInput(input input.Input,
}

log.Infof("Sweep request received: out_point=%v, witness_type=%v, "+
"time_lock=%v, amount=%v, fee_preference=%v, force=%v",
"time_lock=%v, amount=%v, params=(%v)",
input.OutPoint(), input.WitnessType(), input.BlocksToMaturity(),
btcutil.Amount(input.SignDesc().Output.Value),
params.Fee, params.Force)
btcutil.Amount(input.SignDesc().Output.Value), params)

sweeperInput := &sweepInputMessage{
input: input,
Expand Down Expand Up @@ -553,7 +562,7 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
// registration, deleted from pendingInputs but
// the ntfn was in-flight already. Or this could
// be not one of our inputs.
_, ok := s.pendingInputs[outpoint]
input, ok := s.pendingInputs[outpoint]
if !ok {
continue
}
Expand All @@ -569,6 +578,14 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
Tx: spend.SpendingTx,
Err: err,
})

// Remove all other inputs in this exclusive
// group.
if input.params.ExclusiveGroup != nil {
s.removeExclusiveGroup(
*input.params.ExclusiveGroup,
)
}
}

// Now that an input of ours is spent, we can try to
Expand Down Expand Up @@ -640,6 +657,31 @@ func (s *UtxoSweeper) collector(blockEpochs <-chan *chainntnfs.BlockEpoch) {
}
}

// removeExclusiveGroup removes all inputs in the given exclusive group. This
// function is called when one of the exclusive group inputs has been spent. The
// other inputs won't ever be spendable and can be removed. This also prevents
// them from being part of future sweep transactions that would fail.
func (s *UtxoSweeper) removeExclusiveGroup(group uint64) {
for outpoint, input := range s.pendingInputs {
outpoint := outpoint

// Skip inputs that aren't exclusive.
if input.params.ExclusiveGroup == nil {
continue
}

// Skip inputs from other exclusive groups.
if *input.params.ExclusiveGroup != group {
continue
}

// Signal result channels.
s.signalAndRemove(&outpoint, Result{
Err: ErrExclusiveGroupSpend,
})
}
}

// sweepCluster tries to sweep the given input cluster.
func (s *UtxoSweeper) sweepCluster(cluster inputCluster,
currentHeight int32) error {
Expand Down Expand Up @@ -680,7 +722,7 @@ func (s *UtxoSweeper) bucketForFeeRate(
// sweep fee rate, which is determined by calculating the average fee rate of
// all inputs within that cluster.
func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
bucketInputs := make(map[int]pendingInputs)
bucketInputs := make(map[int]*bucketList)
inputFeeRates := make(map[wire.OutPoint]chainfee.SatPerKWeight)

// First, we'll group together all inputs with similar fee rates. This
Expand All @@ -693,30 +735,37 @@ func (s *UtxoSweeper) clusterBySweepFeeRate() []inputCluster {
}
feeGroup := s.bucketForFeeRate(feeRate)

inputs, ok := bucketInputs[feeGroup]
// Create a bucket list for this fee rate if there isn't one
// yet.
buckets, ok := bucketInputs[feeGroup]
if !ok {
inputs = make(pendingInputs)
bucketInputs[feeGroup] = inputs
buckets = &bucketList{}
bucketInputs[feeGroup] = buckets
}

// Request the bucket list to add this input. The bucket list
// will take into account exclusive group constraints.
buckets.add(input)

input.lastFeeRate = feeRate
inputs[op] = input
inputFeeRates[op] = feeRate
}

// We'll then determine the sweep fee rate for each set of inputs by
// calculating the average fee rate of the inputs within each set.
inputClusters := make([]inputCluster, 0, len(bucketInputs))
for _, inputs := range bucketInputs {
var sweepFeeRate chainfee.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
for _, buckets := range bucketInputs {
for _, inputs := range buckets.buckets {
var sweepFeeRate chainfee.SatPerKWeight
for op := range inputs {
sweepFeeRate += inputFeeRates[op]
}
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}
sweepFeeRate /= chainfee.SatPerKWeight(len(inputs))
inputClusters = append(inputClusters, inputCluster{
sweepFeeRate: sweepFeeRate,
inputs: inputs,
})
}

return inputClusters
Expand Down
60 changes: 60 additions & 0 deletions sweep/sweeper_test.go
Expand Up @@ -1232,3 +1232,63 @@ func TestBumpFeeRBF(t *testing.T) {

ctx.finish(1)
}

// TestExclusiveGroup tests the sweeper exclusive group functionality.
func TestExclusiveGroup(t *testing.T) {
ctx := createSweeperTestContext(t)

// Sweep three inputs in the same exclusive group.
var results []chan Result
for i := 0; i < 3; i++ {
exclusiveGroup := uint64(1)
result, err := ctx.sweeper.SweepInput(
spendableInputs[i], Params{
Fee: FeePreference{ConfTarget: 6},
ExclusiveGroup: &exclusiveGroup,
},
)
if err != nil {
t.Fatal(err)
}
results = append(results, result)
}

// We expect all inputs to be published in separate transactions, even
// though they share the same fee preference.
ctx.tick()
for i := 0; i < 3; i++ {
sweepTx := ctx.receiveTx()
if len(sweepTx.TxOut) != 1 {
t.Fatal("expected a single tx out in the sweep tx")
}

// Remove all txes except for the one that sweeps the first
// input. This simulates the sweeps being conflicting.
if sweepTx.TxIn[0].PreviousOutPoint !=
*spendableInputs[0].OutPoint() {

ctx.backend.deleteUnconfirmed(sweepTx.TxHash())
}
}

// Mine the first sweep tx.
ctx.backend.mine()

// Expect the first input to be swept by the confirmed sweep tx.
result0 := <-results[0]
if result0.Err != nil {
t.Fatal("expected first input to be swept")
}

// Expect the other two inputs to return an error. They have no chance
// of confirming.
result1 := <-results[1]
if result1.Err != ErrExclusiveGroupSpend {
t.Fatal("expected second input to be canceled")
}

result2 := <-results[2]
if result2.Err != ErrExclusiveGroupSpend {
t.Fatal("expected third input to be canceled")
}
}

0 comments on commit b75259a

Please sign in to comment.