forked from HouzuoGuo/tiedot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
341 lines (325 loc) · 9.44 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
/* Collection and DB storage management. */
package db
import (
"encoding/json"
"fmt"
"github.com/HouzuoGuo/tiedot/tdlog"
"io"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
"time"
)
const (
PART_NUM_FILE = "number_of_partitions" // DB-collection-partition-number-configuration file name
AUTO_SYNC_INTERVAL = 2000 // Data file auto-save interval in milliseconds (do not set too small)
)
// Database structures.
type DB struct {
path string // Root path of database directory
numParts int // Total number of partitions
cols map[string]*Col // All collections
schemaLock *sync.RWMutex // Control access to collection instances.
autoSync *time.Ticker // Automatically synchronize all data files at regular interval
autoSyncStop chan struct{} // Auto-sync routine stopper (on DB shutdown)
}
// Open database and load all collections & indexes.
func OpenDB(dbPath string) (*DB, error) {
rand.Seed(time.Now().UnixNano()) // document ID generation relies on this RNG
db := &DB{path: dbPath, schemaLock: new(sync.RWMutex), autoSyncStop: make(chan struct{})}
return db, db.load()
}
// Load all collection schema.
func (db *DB) load() error {
// Create DB directory and PART_NUM_FILE if necessary
var numPartsAssumed = false
numPartsFilePath := path.Join(db.path, PART_NUM_FILE)
if err := os.MkdirAll(db.path, 0700); err != nil {
return err
}
if partNumFile, err := os.Stat(numPartsFilePath); err != nil {
// The new database has as many partitions as number of CPUs recognized by OS
if err := ioutil.WriteFile(numPartsFilePath, []byte(strconv.Itoa(runtime.NumCPU())), 0600); err != nil {
return err
}
numPartsAssumed = true
} else if partNumFile.IsDir() {
return fmt.Errorf("Database config file %s is actually a directory, is database path correct?", PART_NUM_FILE)
}
// Get number of partitions from the text file
if numParts, err := ioutil.ReadFile(numPartsFilePath); err != nil {
return err
} else if db.numParts, err = strconv.Atoi(strings.Trim(string(numParts), "\r\n ")); err != nil {
return err
}
// Look for collection directories and open the collections
db.cols = make(map[string]*Col)
dirContent, err := ioutil.ReadDir(db.path)
if err != nil {
return err
}
for _, maybeColDir := range dirContent {
if !maybeColDir.IsDir() {
continue
}
if numPartsAssumed {
return fmt.Errorf("Please manually repair database partition number config file %s", numPartsFilePath)
}
if db.cols[maybeColDir.Name()], err = OpenCol(db, maybeColDir.Name()); err != nil {
return err
}
}
// Synchronize data files at regular interval
if db.autoSync == nil {
db.autoSync = time.NewTicker(AUTO_SYNC_INTERVAL * time.Millisecond)
go func() {
for {
select {
case <-db.autoSync.C:
if err := db.Sync(); err != nil {
tdlog.Noticef("Background Auto-Sync on %s: Failed with error: %v", db.path, err)
}
case <-db.autoSyncStop:
db.autoSync.Stop()
return
}
}
}()
}
return err
}
func (db *DB) sync(placeSchemaLock bool) error {
if placeSchemaLock {
// File buffers are replaced, therefore it requires exclusive locks
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
}
errs := make([]error, 0, 0)
for _, col := range db.cols {
if err := col.sync(); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Synchronize all data files to disk.
func (db *DB) Sync() error {
return db.sync(true)
}
// Close all database files. Do not use the DB afterwards!
func (db *DB) Close() error {
close(db.autoSyncStop)
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
errs := make([]error, 0, 0)
for _, col := range db.cols {
if err := col.close(); err != nil {
errs = append(errs, err)
}
}
if len(errs) == 0 {
return nil
}
return fmt.Errorf("%v", errs)
}
// Create a new collection.
func (db *DB) Create(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; exists {
return fmt.Errorf("Collection %s already exists", name)
} else if err := os.MkdirAll(path.Join(db.path, name), 0700); err != nil {
return err
} else if db.cols[name], err = OpenCol(db, name); err != nil {
return err
}
return nil
}
// Return all collection names.
func (db *DB) AllCols() (ret []string) {
db.schemaLock.RLock()
defer db.schemaLock.RUnlock()
ret = make([]string, 0, len(db.cols))
for name, _ := range db.cols {
ret = append(ret, name)
}
return
}
// Use the return value to interact with collection. Return value may be nil if the collection does not exist.
func (db *DB) Use(name string) *Col {
db.schemaLock.RLock()
defer db.schemaLock.RUnlock()
if col, exists := db.cols[name]; exists {
return col
}
return nil
}
// Rename a collection.
func (db *DB) Rename(oldName, newName string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[oldName]; !exists {
return fmt.Errorf("Collection %s does not exist", oldName)
} else if _, exists := db.cols[newName]; exists {
return fmt.Errorf("Collection %s already exists", newName)
} else if newName == oldName {
return fmt.Errorf("Old and new names are the same")
} else if err := db.cols[oldName].close(); err != nil {
return err
} else if err := os.Rename(path.Join(db.path, oldName), path.Join(db.path, newName)); err != nil {
return err
} else if db.cols[newName], err = OpenCol(db, newName); err != nil {
return err
}
delete(db.cols, oldName)
return nil
}
// Truncate a collection - delete all documents and clear
func (db *DB) Truncate(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
} else if err := db.cols[name].sync(); err != nil {
return err
}
col := db.cols[name]
for i := 0; i < db.numParts; i++ {
if err := col.parts[i].Clear(); err != nil {
return err
}
for _, ht := range col.hts[i] {
if err := ht.Clear(); err != nil {
return err
}
}
}
return nil
}
// Scrub a collection - fix corrupted documents and de-fragment free space.
func (db *DB) Scrub(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
} else if err := db.cols[name].sync(); err != nil {
return err
}
// Prepare a temporary collection in file system
tmpColName := fmt.Sprintf("scrub-%s-%d", name, time.Now().UnixNano())
tmpColDir := path.Join(db.path, tmpColName)
if err := os.MkdirAll(tmpColDir, 0700); err != nil {
return err
}
// Mirror indexes from original collection
for _, idxPath := range db.cols[name].indexPaths {
if err := os.MkdirAll(path.Join(tmpColDir, strings.Join(idxPath, INDEX_PATH_SEP)), 0700); err != nil {
return err
}
}
// Iterate through all documents and put them into the temporary collection
tmpCol, err := OpenCol(db, tmpColName)
if err != nil {
return err
}
db.cols[name].forEachDoc(func(id int, doc []byte) bool {
var docObj map[string]interface{}
if err := json.Unmarshal([]byte(doc), &docObj); err != nil {
// Skip corrupted document
return true
}
if err := tmpCol.InsertRecovery(id, docObj); err != nil {
tdlog.Noticef("Scrub %s: failed to insert back document %v", name, docObj)
}
return true
}, false)
if err := tmpCol.close(); err != nil {
return err
}
// Replace the original collection with the "temporary" one
db.cols[name].close()
if err := os.RemoveAll(path.Join(db.path, name)); err != nil {
return err
}
if err := os.Rename(path.Join(db.path, tmpColName), path.Join(db.path, name)); err != nil {
return err
}
if db.cols[name], err = OpenCol(db, name); err != nil {
return err
}
return nil
}
// Drop a collection and lose all of its documents and indexes.
func (db *DB) Drop(name string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
if _, exists := db.cols[name]; !exists {
return fmt.Errorf("Collection %s does not exist", name)
} else if err := db.cols[name].close(); err != nil {
return err
} else if err := os.RemoveAll(path.Join(db.path, name)); err != nil {
return err
}
delete(db.cols, name)
return nil
}
// Copy this database into destination directory (for backup).
func (db *DB) Dump(dest string) error {
db.schemaLock.Lock()
defer db.schemaLock.Unlock()
db.sync(false)
for _, col := range db.cols {
if err := col.sync(); err != nil {
return err
}
}
cpFun := func(currPath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
relPath, err := filepath.Rel(db.path, currPath)
if err != nil {
return err
}
destDir := path.Join(dest, relPath)
if err := os.MkdirAll(destDir, 0700); err != nil {
return err
}
tdlog.Noticef("Dump: created directory %s", destDir)
} else {
src, err := os.Open(currPath)
if err != nil {
return err
}
relPath, err := filepath.Rel(db.path, currPath)
if err != nil {
return err
}
destPath := path.Join(dest, relPath)
if _, fileExists := os.Open(destPath); fileExists == nil {
return fmt.Errorf("Destination file %s already exists", destPath)
}
destFile, err := os.Create(destPath)
if err != nil {
return err
}
written, err := io.Copy(destFile, src)
if err != nil {
return err
}
tdlog.Noticef("Dump: copied file %s, size is %d", destPath, written)
}
return nil
}
return filepath.Walk(db.path, cpFun)
}