-
Notifications
You must be signed in to change notification settings - Fork 7
/
database.go
244 lines (201 loc) · 4.94 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
package keydb
import (
"bytes"
"github.com/nightlyone/lockfile"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"sync"
"sync/atomic"
)
// Database reference is obtained via Open()
type Database struct {
sync.Mutex
tables map[string]*internalTable
open bool
closing bool
transactions map[uint64]*Transaction
path string
wg sync.WaitGroup
nextSegID uint64
lockfile lockfile.Lockfile
// if non-nil an asynchronous error has occurred, and the database cannot be used
err error
}
type internalTable struct {
sync.Mutex
segments []segment
transactions int
name string
}
// LookupIterator iterator interface for table scanning. all iterators should be read until completion
type LookupIterator interface {
// returns EndOfIterator when complete, if err is nil, then key and value are valid
Next() (key []byte, value []byte, err error)
// returns the next non-deleted key in the index
peekKey() ([]byte, error)
}
var global_lock sync.RWMutex
// Open a database. The database can only be opened by a single process, but the *Database
// reference can be shared across Go routines. The path is a directory name.
// if createIfNeeded is true, them if the db doesn't exist it will be created
// Additional tables can be added on subsequent opens, but there is no current way to delete a table,
// except for deleting the table related files from the directory
func Open(path string, createIfNeeded bool) (*Database, error) {
global_lock.Lock()
defer global_lock.Unlock()
db, err := open(path)
if err == NoDatabaseFound && createIfNeeded == true {
return create(path)
}
return db, err
}
func open(path string) (*Database, error) {
path = filepath.Clean(path)
err := IsValidDatabase(path)
if err != nil {
return nil, err
}
abs, err := filepath.Abs(path + "/lockfile")
if err != nil {
return nil, err
}
lf, err := lockfile.New(abs)
if err != nil {
return nil, err
}
err = lf.TryLock()
if err != nil {
return nil, DatabaseInUse
}
db := &Database{path: path, open: true}
db.lockfile = lf
db.transactions = make(map[uint64]*Transaction)
db.tables = make(map[string]*internalTable)
db.wg.Add(1)
go mergeDiskSegments(db)
return db, nil
}
func create(path string) (*Database, error) {
path = filepath.Clean(path)
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return nil, err
}
return open(path)
}
// Remove the database, deleting all files. the caller must be able to
// gain exclusive multi to the database
func Remove(path string) error {
global_lock.Lock()
defer global_lock.Unlock()
path = filepath.Clean(path)
err := IsValidDatabase(path)
if err != nil {
return err
}
abs, err := filepath.Abs(path + "/lockfile")
if err != nil {
return err
}
lf, err := lockfile.New(abs)
if err != nil {
return err
}
err = lf.TryLock()
if err != nil {
return DatabaseInUse
}
return os.RemoveAll(path)
}
// IsValidDatabase checks if the path points to a valid database or empty directory (which is also valid)
func IsValidDatabase(path string) error {
fi, err := os.Stat(path)
if err != nil {
return NoDatabaseFound
}
if !fi.IsDir() {
return NotADirectory
}
infos, err := ioutil.ReadDir(path)
if err != nil {
return err
}
for _, f := range infos {
if "lockfile" == f.Name() {
continue
}
if f.Name() == filepath.Base(path) {
continue
}
if matched, _ := regexp.Match(".*\\.(keys|data)\\..*", []byte(f.Name())); !matched {
return NotValidDatabase
}
}
return nil
}
// Close the database. any memory segments are persisted to disk.
// The resulting segments are merged until the default maxSegments is reached
func (db *Database) Close() error {
global_lock.Lock()
defer global_lock.Unlock()
if !db.open {
return DatabaseClosed
}
if len(db.transactions) > 0 {
return DatabaseHasOpenTransactions
}
db.Lock()
db.closing = true
db.Unlock()
db.wg.Wait()
err := mergeDiskSegments0(db, maxSegments)
for _, table := range db.tables {
for _, segment := range table.segments {
segment.Close()
}
}
db.lockfile.Unlock()
db.open = false
return err
}
// CloseWithMerge closes the database with control of the segment count. if segmentCount is 0, then
// the merge process is skipped
func (db *Database) CloseWithMerge(segmentCount int) error {
global_lock.Lock()
defer global_lock.Unlock()
if !db.open {
return DatabaseClosed
}
if db.err != nil {
return db.err
}
if len(db.transactions) > 0 {
return DatabaseHasOpenTransactions
}
db.Lock()
db.closing = true
db.Unlock()
db.wg.Wait()
if segmentCount > 0 {
mergeDiskSegments0(db, segmentCount)
}
for _, table := range db.tables {
for _, segment := range table.segments {
segment.Close()
}
}
db.lockfile.Unlock()
db.open = false
return nil
}
func (db *Database) nextSegmentID() uint64 {
return atomic.AddUint64(&db.nextSegID, 1)
}
func less(a []byte, b []byte) bool {
return bytes.Compare(a, b) < 0
}
func equal(a []byte, b []byte) bool {
return bytes.Equal(a, b)
}