-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
db.go
223 lines (191 loc) · 5.4 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
package migration_01_to_11
import (
"bytes"
"encoding/binary"
"fmt"
"os"
"path/filepath"
"time"
"github.com/lightningnetwork/lnd/channeldb/kvdb"
)
const (
dbName = "channel.db"
dbFilePermission = 0600
)
// migration is a function which takes a prior outdated version of the database
// instances and mutates the key/bucket structure to arrive at a more
// up-to-date version of the database.
type migration func(tx kvdb.RwTx) error
var (
// Big endian is the preferred byte order, due to cursor scans over
// integer keys iterating in order.
byteOrder = binary.BigEndian
)
// DB is the primary datastore for the lnd daemon. The database stores
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
kvdb.Backend
dbPath string
graph *ChannelGraph
now func() time.Time
}
// Open opens an existing channeldb. Any necessary schemas migrations due to
// updates will take place as necessary.
func Open(dbPath string, modifiers ...OptionModifier) (*DB, error) {
path := filepath.Join(dbPath, dbName)
if !fileExists(path) {
if err := createChannelDB(dbPath); err != nil {
return nil, err
}
}
opts := DefaultOptions()
for _, modifier := range modifiers {
modifier(&opts)
}
// Specify bbolt freelist options to reduce heap pressure in case the
// freelist grows to be very large.
bdb, err := kvdb.Open(
kvdb.BoltBackendName, path,
opts.NoFreelistSync, opts.DBTimeout,
)
if err != nil {
return nil, err
}
chanDB := &DB{
Backend: bdb,
dbPath: dbPath,
now: time.Now,
}
chanDB.graph = newChannelGraph(
chanDB, opts.RejectCacheSize, opts.ChannelCacheSize,
)
return chanDB, nil
}
// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
// within the database are created.
func createChannelDB(dbPath string) error {
if !fileExists(dbPath) {
if err := os.MkdirAll(dbPath, 0700); err != nil {
return err
}
}
path := filepath.Join(dbPath, dbName)
bdb, err := kvdb.Create(
kvdb.BoltBackendName, path, false, kvdb.DefaultDBTimeout,
)
if err != nil {
return err
}
err = kvdb.Update(bdb, func(tx kvdb.RwTx) error {
if _, err := tx.CreateTopLevelBucket(openChannelBucket); err != nil {
return err
}
if _, err := tx.CreateTopLevelBucket(closedChannelBucket); err != nil {
return err
}
if _, err := tx.CreateTopLevelBucket(invoiceBucket); err != nil {
return err
}
if _, err := tx.CreateTopLevelBucket(paymentBucket); err != nil {
return err
}
nodes, err := tx.CreateTopLevelBucket(nodeBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(aliasIndexBucket)
if err != nil {
return err
}
_, err = nodes.CreateBucket(nodeUpdateIndexBucket)
if err != nil {
return err
}
edges, err := tx.CreateTopLevelBucket(edgeBucket)
if err != nil {
return err
}
if _, err := edges.CreateBucket(edgeIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(edgeUpdateIndexBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(channelPointBucket); err != nil {
return err
}
if _, err := edges.CreateBucket(zombieBucket); err != nil {
return err
}
graphMeta, err := tx.CreateTopLevelBucket(graphMetaBucket)
if err != nil {
return err
}
_, err = graphMeta.CreateBucket(pruneLogBucket)
if err != nil {
return err
}
if _, err := tx.CreateTopLevelBucket(metaBucket); err != nil {
return err
}
meta := &Meta{
DbVersionNumber: 0,
}
return putMeta(meta, tx)
}, func() {})
if err != nil {
return fmt.Errorf("unable to create new channeldb")
}
return bdb.Close()
}
// fileExists returns true if the file exists, and false otherwise.
func fileExists(path string) bool {
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}
// FetchClosedChannels attempts to fetch all closed channels from the database.
// The pendingOnly bool toggles if channels that aren't yet fully closed should
// be returned in the response or not. When a channel was cooperatively closed,
// it becomes fully closed after a single confirmation. When a channel was
// forcibly closed, it will become fully closed after _all_ the pending funds
// (if any) have been swept.
func (d *DB) FetchClosedChannels(pendingOnly bool) ([]*ChannelCloseSummary, error) {
var chanSummaries []*ChannelCloseSummary
if err := kvdb.View(d, func(tx kvdb.RTx) error {
closeBucket := tx.ReadBucket(closedChannelBucket)
if closeBucket == nil {
return ErrNoClosedChannels
}
return closeBucket.ForEach(func(chanID []byte, summaryBytes []byte) error {
summaryReader := bytes.NewReader(summaryBytes)
chanSummary, err := deserializeCloseChannelSummary(summaryReader)
if err != nil {
return err
}
// If the query specified to only include pending
// channels, then we'll skip any channels which aren't
// currently pending.
if !chanSummary.IsPending && pendingOnly {
return nil
}
chanSummaries = append(chanSummaries, chanSummary)
return nil
})
}, func() {
chanSummaries = nil
}); err != nil {
return nil, err
}
return chanSummaries, nil
}
// ChannelGraph returns a new instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return d.graph
}