forked from lightningnetwork/lnd
/
utxonursery.go
305 lines (262 loc) · 9.83 KB
/
utxonursery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package main
import (
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
// utxoNursery is a system dedicated to incubating time-locked outputs created
// by the broadcast of a commitment transaction either by us, or the remote
// peer. The nursery accepts outputs and "incubates" them until they've reached
// maturity, then sweep the outputs into the source wallet. An output is
// considered mature after the relative time-lock within the pkScript has
// passed. As outputs reach their maturity age, they're sweeped in batches into
// the source wallet, returning the outputs so they can be used within future
// channels, or regular Bitcoin transactions.
type utxoNursery struct {
sync.RWMutex
notifier chainntnfs.ChainNotifier
wallet *lnwallet.LightningWallet
db channeldb.DB
requests chan *incubationRequest
// TODO(roasbeef): persist to disk afterwards
unstagedOutputs map[wire.OutPoint]*immatureOutput
stagedOutputs map[uint32][]*immatureOutput
started uint32
stopped uint32
quit chan struct{}
wg sync.WaitGroup
}
// newUtxoNursery creates a new instance of the utxoNursery from a
// ChainNotifier and LightningWallet instance.
func newUtxoNursery(notifier chainntnfs.ChainNotifier,
wallet *lnwallet.LightningWallet) *utxoNursery {
return &utxoNursery{
notifier: notifier,
wallet: wallet,
requests: make(chan *incubationRequest),
unstagedOutputs: make(map[wire.OutPoint]*immatureOutput),
stagedOutputs: make(map[uint32][]*immatureOutput),
quit: make(chan struct{}),
}
}
// Start launches all goroutines the utxoNursery needs to properly carry out
// its duties.
func (u *utxoNursery) Start() error {
u.wg.Add(1)
go u.incubator()
return nil
}
// Stop gracefully shutsdown any lingering goroutines launched during normal
// operation of the utxoNursery.
func (u *utxoNursery) Stop() error {
close(u.quit)
u.wg.Wait()
return nil
}
// incubator is tasked with watching over all immature outputs until they've
// reached "maturity", after which they'll be sweeped into the underlying
// wallet in batches within a single transaction. Immature outputs can be
// divided into three stages: early stage, mid stage, and final stage. During
// the early stage, the transaction containing the output has not yet been
// confirmed. Once the txn creating the output is confirmed, then output moves
// to the mid stage wherein a dedicated goroutine waits until it has reached
// "maturity". Once an output is mature, it will be sweeped into the wallet at
// the earlier possible height.
func (u *utxoNursery) incubator() {
// Register with the notifier to receive notifications for each newly
// connected block.
newBlocks, err := u.notifier.RegisterBlockEpochNtfn()
if err != nil {
utxnLog.Errorf("unable to register for block epoch "+
"notifications: %v", err)
}
// Outputs that are transitioning from early to mid-stage are sent over
// this channel by each output's dedicated watcher goroutine.
midStageOutputs := make(chan *immatureOutput)
out:
for {
select {
case earlyStagers := <-u.requests:
utxnLog.Infof("Incubating %v new outputs",
len(earlyStagers.outputs))
for _, immatureUtxo := range earlyStagers.outputs {
outpoint := immatureUtxo.outPoint
sourceTXID := outpoint.Hash
// Register for a confirmation once the
// generating txn has been confirmed.
confChan, err := u.notifier.RegisterConfirmationsNtfn(&sourceTXID, 1)
if err != nil {
utxnLog.Errorf("unable to register for confirmations "+
"for txid: %v", sourceTXID)
continue
}
// TODO(roasbeef): should be an on-disk
// at-least-once task queue
u.unstagedOutputs[outpoint] = immatureUtxo
// Launch a dedicated goroutine which will send
// the output back to the incubator once the
// source txn has been confirmed.
go func() {
confHeight, ok := <-confChan.Confirmed
if !ok {
utxnLog.Errorf("notification chan "+
"closed, can't advance output %v", outpoint)
}
utxnLog.Infof("Outpoint %v confirmed in "+
"block %v moving to mid-stage",
outpoint, confHeight)
immatureUtxo.confHeight = uint32(confHeight)
midStageOutputs <- immatureUtxo
}()
}
case midUtxo := <-midStageOutputs:
// The transaction creating the output has been
// created, so we move it from early stage to
// mid-stage.
delete(u.unstagedOutputs, midUtxo.outPoint)
// TODO(roasbeef): your off-by-one sense are tingling...
maturityHeight := midUtxo.confHeight + midUtxo.blocksToMaturity
u.stagedOutputs[maturityHeight] = append(u.stagedOutputs[maturityHeight], midUtxo)
utxnLog.Infof("Outpoint %v now mid-stage, will mature "+
"at height %v (delay of %v)", midUtxo.outPoint,
maturityHeight, midUtxo.blocksToMaturity)
case epoch := <-newBlocks.Epochs:
// A new block has just been connected, check to see if
// we have any new outputs that can be swept into the
// wallet.
newHeight := uint32(epoch.Height)
matureOutputs, ok := u.stagedOutputs[newHeight]
if !ok {
continue
}
utxnLog.Infof("New block: height=%v hash=%v, "+
"sweeping %v mature outputs", newHeight,
epoch.Hash, len(matureOutputs))
// Create a transation which sweeps all the newly
// mature outputs into a output controlled by the
// wallet.
// TODO(roasbeef): can be more intelligent about
// buffering outputs to be more efficient on-chain.
sweepTx, err := u.createSweepTx(matureOutputs)
if err != nil {
// TODO(roasbeef): retry logic?
utxnLog.Errorf("unable to create sweep tx: %v", err)
continue
}
utxnLog.Infof("Sweeping %v time-locked outputs "+
"with sweep tx: %v", len(matureOutputs),
newLogClosure(func() string {
return spew.Sdump(sweepTx)
}))
// With the sweep transaction fully signed, broadcast
// the transaction to the network. Additionally, we can
// stop tracking these outputs as they've just been
// sweeped.
err = u.wallet.PublishTransaction(sweepTx)
if err != nil {
utxnLog.Errorf("unable to broadcast sweep tx: %v, %v",
err, spew.Sdump(sweepTx))
continue
}
delete(u.stagedOutputs, newHeight)
case <-u.quit:
break out
}
}
u.wg.Done()
}
// createSweepTx creates a final sweeping transaction with all witnesses
// inplace for all inputs. The created transaction has a single output sending
// all the funds back to the source wallet.
func (u *utxoNursery) createSweepTx(matureOutputs []*immatureOutput) (*wire.MsgTx, error) {
sweepAddr, err := u.wallet.NewAddress(lnwallet.WitnessPubKey, false)
if err != nil {
return nil, err
}
pkScript, err := txscript.PayToAddrScript(sweepAddr)
if err != nil {
return nil, err
}
var totalSum btcutil.Amount
for _, o := range matureOutputs {
totalSum += o.amt
}
sweepTx := wire.NewMsgTx()
sweepTx.Version = 2
sweepTx.AddTxOut(&wire.TxOut{
PkScript: pkScript,
Value: int64(totalSum - 1000),
})
for _, utxo := range matureOutputs {
sweepTx.AddTxIn(&wire.TxIn{
PreviousOutPoint: utxo.outPoint,
// TODO(roasbeef): assumes pure block delays
Sequence: utxo.blocksToMaturity,
})
}
// TODO(roasbeef): insert fee calculation
// * remove hardcoded fee above
// With all the inputs in place, use each output's unique witness
// function to generate the final witness required for spending.
hashCache := txscript.NewTxSigHashes(sweepTx)
for i, txIn := range sweepTx.TxIn {
witness, err := matureOutputs[i].witnessFunc(sweepTx, hashCache, i)
if err != nil {
return nil, err
}
txIn.Witness = witness
}
return sweepTx, nil
}
// witnessFunc represents a function which is able to generate the final
// witness for a particular public key script. This function acts as an
// abstraction layer, hidiing the details of the underlying script from the
// utxoNursery.
type witnessGenerator func(tx *wire.MsgTx, hc *txscript.TxSigHashes, inputIndex int) ([][]byte, error)
// immatureOutput encapsulates an immature output. The struct includes a
// witnessGenerator closure which will be used to generate the witness required
// to sweep the output once it's mature.
// TODO(roasbeef): make into interface? can't gob functions
type immatureOutput struct {
amt btcutil.Amount
outPoint wire.OutPoint
witnessFunc witnessGenerator
// TODO(roasbeef): using block timeouts everywhere currently, will need
// to modify logic later to account for MTP based timeouts.
blocksToMaturity uint32
confHeight uint32
}
// incubationRequest is a request to the utxoNursery to incubate a set of
// outputs until their mature, finally sweeping them into the wallet once
// available.
type incubationRequest struct {
outputs []*immatureOutput
}
// incubateOutputs sends a request to utxoNursery to incubate the outputs
// defined within the summary of a closed channel. Induvidually, as all outputs
// reach maturity they'll be sweeped back into the wallet.
func (u *utxoNursery) incubateOutputs(closeSummary *lnwallet.ForceCloseSummary) {
// TODO(roasbeef): should use factory func here based on an interface
// * spend here also assumes delay is blocked bsaed, and in range
witnessFunc := func(tx *wire.MsgTx, hc *txscript.TxSigHashes, inputIndex int) ([][]byte, error) {
desc := closeSummary.SelfOutputSignDesc
desc.SigHashes = hc
desc.InputIndex = inputIndex
return lnwallet.CommitSpendTimeout(u.wallet.Signer, desc, tx)
}
outputAmt := btcutil.Amount(closeSummary.SelfOutputSignDesc.Output.Value)
selfOutput := &immatureOutput{
amt: outputAmt,
outPoint: closeSummary.SelfOutpoint,
witnessFunc: witnessFunc,
blocksToMaturity: closeSummary.SelfOutputMaturity,
}
u.requests <- &incubationRequest{
outputs: []*immatureOutput{selfOutput},
}
}