-
Notifications
You must be signed in to change notification settings - Fork 38
/
db.go
243 lines (214 loc) · 7.43 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
// Copyright (c) 2016 Western Digital Corporation or its affiliates. All rights reserved.
// SPDX-License-Identifier: MIT
package watchblb
import (
"database/sql"
"fmt"
"time"
// Import sqlite3 driver so that we can create db backed by sqlite.
_ "github.com/mattn/go-sqlite3"
log "github.com/golang/glog"
"github.com/westerndigitalcorporation/blb/client/blb"
)
var errNoBlobs = fmt.Errorf("no previously written blobs")
// SqliteDB is a persistent DB backed by sqlite for storing blob ids and
// creation times.
type SqliteDB struct {
// The sqlite database.
db *sql.DB
// Prepared statements for operating on 'blobids' table.
putStmt, getByIDStmt, delByCreationStmt, getDeletedStmt, delByIDStmt, randStmt *sql.Stmt
}
// NewSqliteDB creates a SqliteDB backed by the file located at 'path'.
func NewSqliteDB(path string) *SqliteDB {
db, err := sql.Open("sqlite3", path)
if err != nil {
log.Fatalf("failed to open the db backed by %s: %s", path, err)
}
// Create the table to store active blobs, if it doesn't already exist.
//
// Due to a bug in early version of sqlite, a non-integer primary key
// can be null. So we need to set it to be not null explicitly here.
// (see https://www.sqlite.org/lang_createtable.html#rowid).
createStmt := "CREATE TABLE IF NOT EXISTS blobids (id TEXT NOT NULL PRIMARY KEY, creation INTEGER NOT NULL, deleted INTEGER)"
if _, err := db.Exec(createStmt); err != nil {
db.Close()
log.Fatalf("failed to create blobids table: %s", err)
}
// Prepared statements on 'blobids' table.
// Insert a blob and its creation time into the table.
putStmt, err := db.Prepare("INSERT INTO blobids (id, creation, deleted) VALUES (?, ?, 0)")
if err != nil {
db.Close()
log.Fatalf("failed to prepare put statement: %s", err)
}
// Retrieve the creation time of a given blob.
getByIDStmt, err := db.Prepare("SELECT creation FROM blobids WHERE id=?")
if err != nil {
db.Close()
log.Fatalf("failed to prepare getById statement: %s", err)
}
// Flag expired blobs for deletion.
delByCreationStmt, err := db.Prepare("UPDATE blobids SET deleted=1 WHERE creation<?")
if err != nil {
db.Close()
log.Fatalf("failed to prepare delByCreation statement: %s", err)
}
// Retrieve blobs that have been flagged for deletion.
getDeletedStmt, err := db.Prepare("SELECT id FROM blobids WHERE deleted=1")
if err != nil {
db.Close()
log.Fatalf("failed to prepare getDeleted statement: %s", err)
}
// Delete a flagged blob with the given id.
delByIDStmt, err := db.Prepare("DELETE FROM blobids WHERE id=? AND deleted=1")
if err != nil {
db.Close()
log.Fatalf("failed to prepare delByID statement: %s", err)
}
// Retrieve a random blob that hasn't been flagged for deletion.
randStmt, err := db.Prepare("SELECT id FROM blobids WHERE deleted=0 ORDER BY RANDOM() LIMIT 1")
if err != nil {
db.Close()
log.Fatalf("failed to prepare rand statement: %s", err)
}
return &SqliteDB{
db: db,
putStmt: putStmt,
getByIDStmt: getByIDStmt,
delByCreationStmt: delByCreationStmt,
getDeletedStmt: getDeletedStmt,
delByIDStmt: delByIDStmt,
randStmt: randStmt,
}
}
// Put stores a blob id with its creation time.
func (s *SqliteDB) Put(id blb.BlobID, creation time.Time) (err error) {
if _, err = s.putStmt.Exec(id.String(), creation.UnixNano()); err != nil {
log.Errorf("failed to insert (id=%s, creation=%s): %s", id, creation, err)
}
return err
}
// Get retrieves the creation time for the given blob.
func (s *SqliteDB) Get(id blb.BlobID) (creation time.Time, err error) {
var t int64
if err = s.getByIDStmt.QueryRow(id.String()).Scan(&t); err != nil {
log.Errorf("failed to get record for id=%s: %s", id, err)
return
}
return time.Unix(0, t), nil
}
// DeleteIfExpires flag expired blobs for deletion. A blob expires if creation +
// lifetime < time.Now().
//
// The workflow is:
// (1) Flag expired blobs for deletion.
// (2) Return such blobs to watchblb.
// (3) After watchblb removes the blobs from the blb cluster, it acknowledges
// the success by calling 'ConfirmDeletion', which deletes the flagged blobs
// from the db.
// (4) If watchblb fails before step (4), it needs to replay the deletion
// operations upon restart.
// By doing this, we guarantee that the information stored in the db is
// consistent with that in the blb cluster, and thus there is no blob leakage
// during the process of removing expired blobs.
//
// Note however, there is no guarantee on no blob leakage during blob creation
// -- if watchblb crashes after creating the blob in the blb cluster but before
// logging it in the db, the blob is leaked.
func (s *SqliteDB) DeleteIfExpires(lifetime time.Duration) ([]blb.BlobID, error) {
// Compute the creation time upper bound. Any blob with a creation time
// smaller than this bound will be deleted.
upper := time.Now().Add(-lifetime)
// Flag expired blobs for deletion.
if _, err := s.delByCreationStmt.Exec(upper.UnixNano()); err != nil {
log.Errorf("failed to update record for creation<%s: %s", upper, err)
return nil, err
}
// Return such blobs.
return s.GetDeleted()
}
// GetDeleted retrieves blobs that are flagged for deletion.
func (s *SqliteDB) GetDeleted() ([]blb.BlobID, error) {
rows, err := s.getDeletedStmt.Query()
if err != nil {
log.Errorf("failed to select record flagged for deletion: %s", err)
}
var idStr string
var blobs []blb.BlobID
for rows.Next() {
if err := rows.Scan(&idStr); err != nil {
log.Fatalf("failed to get blob id: %s", err)
}
if id, err := blb.ParseBlobID(idStr); err != nil {
log.Fatalf("failed to parse blob id: %s", err)
} else {
blobs = append(blobs, id)
}
}
rows.Close()
if err := rows.Err(); err != nil {
log.Errorf("error in iterating through rows: %s", err)
return nil, err
}
return blobs, nil
}
// Rand retrieves a random blob that has not expired.
func (s *SqliteDB) Rand() (id blb.BlobID, err error) {
var idStr string
if err = s.randStmt.QueryRow().Scan(&idStr); err != nil {
log.Errorf("failed to get a random record: %s", err)
}
if err == sql.ErrNoRows {
err = errNoBlobs
}
if err != nil {
return
}
id, err = blb.ParseBlobID(idStr)
return
}
// ConfirmDeletion removes expired blobs in 'blobs'. All errors are logged and
// the last error is returned.
func (s *SqliteDB) ConfirmDeletion(blobs []blb.BlobID) (err error) {
for _, id := range blobs {
if _, derr := s.delByIDStmt.Exec(id.String()); derr != nil {
log.Errorf("failed to remove blob %s: %s", id, err)
err = derr
}
}
return
}
// Close closes the db. All errors will be logged and the last error is
// returned.
func (s *SqliteDB) Close() (err error) {
if cerr := s.putStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close put statement: %s", err)
}
if cerr := s.getByIDStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close getByID statement: %s", err)
}
if cerr := s.delByCreationStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close delByCreation statement: %s", err)
}
if cerr := s.getDeletedStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close getDeleted statement: %s", err)
}
if cerr := s.delByIDStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close delByID statement: %s", err)
}
if cerr := s.randStmt.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close rand statement: %s", err)
}
if cerr := s.db.Close(); cerr != nil {
err = cerr
log.Errorf("failed to close db: %s", err)
}
return err
}