forked from keybase/client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
leveldb.go
237 lines (221 loc) · 7.47 KB
/
leveldb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
// Copyright 2016 Keybase Inc. All rights reserved.
// Use of this source code is governed by a BSD
// license that can be found in the LICENSE file.
package libkbfs
import (
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"github.com/keybase/client/go/kbfs/ioutil"
"github.com/keybase/client/go/logger"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb"
ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/filter"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage"
)
const (
diskCacheVersionFilename string = "version"
metered = true
unmetered = false
)
var leveldbOptions = &opt.Options{
Compression: opt.NoCompression,
WriteBuffer: 10 * opt.MiB,
BlockSize: 1 << 16,
// Default max open file descriptors (ulimit -n) is 256 on OS
// X, and >=1024 on (most?) Linux machines. So set to a low
// number since we have multiple leveldb instances.
OpenFilesCacheCapacity: 10,
}
// LevelDb is a libkbfs wrapper for leveldb.DB.
type LevelDb struct {
*leveldb.DB
closer io.Closer
}
// Close closes the DB.
func (ldb *LevelDb) Close() (err error) {
err = ldb.DB.Close()
// Hide the closer error.
_ = ldb.closer.Close()
return err
}
// Get gets data from the DB.
func (ldb *LevelDb) Get(key []byte, ro *opt.ReadOptions) (
value []byte, err error) {
defer func() {
if err != nil {
err = errors.WithStack(err)
}
}()
return ldb.DB.Get(key, ro)
}
// GetWithMeter gets data from the DB while tracking the hit rate.
func (ldb *LevelDb) GetWithMeter(key []byte, hitMeter, missMeter *CountMeter) (
value []byte, err error) {
defer func() {
if err == nil {
if hitMeter != nil {
hitMeter.Mark(1)
}
} else if missMeter != nil {
missMeter.Mark(1)
}
}()
return ldb.Get(key, nil)
}
// Put puts data into the DB.
func (ldb *LevelDb) Put(key, value []byte, wo *opt.WriteOptions) (err error) {
defer func() {
if err != nil {
err = errors.WithStack(err)
}
}()
return ldb.DB.Put(key, value, wo)
}
// PutWithMeter gets data from the DB while tracking the hit rate.
func (ldb *LevelDb) PutWithMeter(key, value []byte, putMeter *CountMeter) (
err error) {
defer func() {
if err == nil && putMeter != nil {
putMeter.Mark(1)
}
}()
return ldb.Put(key, value, nil)
}
// openLevelDB opens or recovers a leveldb.DB with a passed-in storage.Storage
// as its underlying storage layer, and with the options specified.
func openLevelDBWithOptions(stor storage.Storage, options *opt.Options) (
*LevelDb, error) {
db, err := leveldb.Open(stor, options)
if ldberrors.IsCorrupted(err) {
// There's a possibility that if the leveldb wasn't closed properly
// last time while it was being written, then the manifest is corrupt.
// This means leveldb must rebuild its manifest, which takes longer
// than a simple `Open`.
// TODO: log here
db, err = leveldb.Recover(stor, options)
}
if err != nil {
stor.Close()
return nil, err
}
return &LevelDb{db, stor}, nil
}
// openLevelDB opens or recovers a leveldb.DB with a passed-in storage.Storage
// as its underlying storage layer.
func openLevelDB(stor storage.Storage) (*LevelDb, error) {
options := *leveldbOptions
options.Filter = filter.NewBloomFilter(16)
return openLevelDBWithOptions(stor, &options)
}
func versionPathFromVersion(dirPath string, version uint64) string {
return filepath.Join(dirPath, fmt.Sprintf("v%d", version))
}
func getVersionedPathForDiskCache(
log logger.Logger, dirPath string, cacheName string,
currentDiskCacheVersion uint64) (versionedDirPath string, err error) {
// Read the version file
versionFilepath := filepath.Join(dirPath, diskCacheVersionFilename)
versionBytes, err := ioutil.ReadFile(versionFilepath)
// We expect the file to open successfully or not exist. Anything else is a
// problem.
version := currentDiskCacheVersion
if ioutil.IsNotExist(err) {
// Do nothing, meaning that we will create the version file below.
log.CDebugf(
nil, "Creating new version file for the disk %s cache.", cacheName)
} else if err != nil {
log.CDebugf(
nil, "An error occurred while reading the disk %s cache "+
"version file. Using %d as the version and creating a new "+
"file to record it.", cacheName, version)
// TODO: when we increase the version of the disk cache, we'll have
// to make sure we wipe all previous versions of the disk cache.
} else {
// We expect a successfully opened version file to parse a single
// unsigned integer representing the version. Anything else is a
// corrupted version file. However, this we can solve by deleting
// everything in the cache. TODO: Eventually delete the whole disk
// cache if we have an out of date version.
version, err = strconv.ParseUint(string(versionBytes), 10,
strconv.IntSize)
if err == nil && version == currentDiskCacheVersion {
// Success case, no need to write the version file again.
log.CDebugf(
nil, "Loaded the disk %s cache version file successfully."+
" Version: %d", cacheName, version)
return versionPathFromVersion(dirPath, version), nil
}
if err != nil {
log.CDebugf(
nil, "An error occurred while parsing the disk %s cache "+
"version file. Using %d as the version.",
cacheName, currentDiskCacheVersion)
// TODO: when we increase the version of the disk cache, we'll have
// to make sure we wipe all previous versions of the disk cache.
version = currentDiskCacheVersion
} else if version < currentDiskCacheVersion {
log.CDebugf(
nil, "The disk %s cache version file contained an old "+
"version: %d. Updating to the new version: %d.",
cacheName, version, currentDiskCacheVersion)
// TODO: when we increase the version of the disk cache, we'll have
// to make sure we wipe all previous versions of the disk cache.
version = currentDiskCacheVersion
} else if version > currentDiskCacheVersion {
log.CDebugf(
nil, "The disk %s cache version file contained a newer "+
"version (%d) than this client knows how to read. "+
"Switching to this client's newest known version: %d.",
cacheName, version, currentDiskCacheVersion)
version = currentDiskCacheVersion
}
}
// Ensure the disk cache directory exists.
err = os.MkdirAll(dirPath, 0700)
if err != nil {
// This does actually need to be fatal.
return "", err
}
versionString := strconv.FormatUint(version, 10)
err = ioutil.WriteFile(versionFilepath, []byte(versionString), 0600)
if err != nil {
// This also needs to be fatal.
return "", err
}
return versionPathFromVersion(dirPath, version), nil
}
// openVersionedLevelDB opens a level DB under a versioned path on the local filesystem
// under storageRoot. The path include dbFolderName and dbFilename. Note that
// dbFilename is actually created as a folder; it's just where raw LevelDb
// lives.
func openVersionedLevelDB(log logger.Logger, storageRoot string,
dbFolderName string, currentDiskCacheVersion uint64, dbFilename string) (
db *LevelDb, err error) {
dbPath := filepath.Join(storageRoot, dbFolderName)
versionPath, err := getVersionedPathForDiskCache(
log, dbPath, dbFolderName, currentDiskCacheVersion)
if err != nil {
return nil, err
}
p := filepath.Join(versionPath, dbFilename)
log.Debug("opening LevelDB: %s", p)
storage, err := storage.OpenFile(p, false)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
storage.Close()
}
}()
options := *leveldbOptions
if db, err = openLevelDBWithOptions(storage, &options); err != nil {
return nil, err
}
return db, nil
}