forked from m-rots/bernard
/
sqlite.go
313 lines (256 loc) · 8.45 KB
/
sqlite.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Package sqlite provides a reference implementation of a Bernard datastore.
// Other SQL implementations should ideally borrow from this code as the SQL
// should be compatible with other drivers as well.
package sqlite
import (
"database/sql"
"embed"
"fmt"
"strings"
ds "github.com/l3uddz/bernard/datastore"
"github.com/l3uddz/bernard/migrate"
// database driver
_ "modernc.org/sqlite"
)
var (
//go:embed migrations
migrations embed.FS
)
// New returns a Bernard Datastore with a SQLite3 backend.
func New(path string) (*Datastore, error) {
db, err := sql.Open("sqlite", path)
if err != nil {
return nil, fmt.Errorf("open: %w", ds.ErrDatabase)
}
// migrations
mg, err := migrate.New(db, "migrations")
if err != nil {
return nil, fmt.Errorf("%v: %w", err, ds.ErrDatabase)
}
if err := mg.Migrate(&migrations, "bernard"); err != nil {
return nil, fmt.Errorf("%v: %w", err, ds.ErrDatabase)
}
return &Datastore{DB: db}, nil
}
// FromDB returns a Bernard Datastore with the given SQLite3 backend.
func FromDB(db *sql.DB) (*Datastore, error) {
// migrations
mg, err := migrate.New(db, "migrations")
if err != nil {
return nil, fmt.Errorf("%v: %w", err, ds.ErrDatabase)
}
if err := mg.Migrate(&migrations, "bernard"); err != nil {
return nil, fmt.Errorf("%v: %w", err, ds.ErrDatabase)
}
return &Datastore{DB: db}, nil
}
// Datastore holds our SQLite3 database connection
// and implements the Bernard Datastore interface.
type Datastore struct {
DB *sql.DB
}
// ErrTransaction can have values begin or commit, and indicates an error
// when beginning or commiting a transaction
var ErrTransaction = fmt.Errorf("transaction: %w", ds.ErrDatabase)
// ErrInvalidStatement occurs when the SQL statement is not compatible
// with the underlying driver or when the database is not initialised with tables yet.
var ErrInvalidStatement = fmt.Errorf("invalid statement: %w", ds.ErrDatabase)
// addParameters adds the bind vars to the query string for the provided number of items.
//
// items must be >0
func addParameters(query string, items int) string {
i := strings.IndexByte(query, '?') + 1
var str strings.Builder
str.Grow(len(query) + len(", ?")*(items-1))
str.WriteString(query[:i])
for i := 0; i < items-1; i++ {
str.WriteString(", ?")
}
str.WriteString(query[i:])
return str.String()
}
// FullSync synchronises the provided Drive state to the datastore.
func (store *Datastore) FullSync(drive ds.Drive, folders []ds.Folder, files []ds.File) (err error) {
// Start transaction so all statements can be rolled back.
tx, err := store.DB.Begin()
if err != nil {
return fmt.Errorf("begin: %w", ErrTransaction)
}
// Prepare sql statement to upsert folders.
upsertFolder, err := tx.Prepare(sqlUpsertFolder)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertFolder, ErrInvalidStatement)
}
// Prepare sql statement to upsert files.
upsertFile, err := tx.Prepare(sqlUpsertFile)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertFile, ErrInvalidStatement)
}
// Prepare sql statement to upsert a variable (pageToken).
upsertDrive, err := tx.Prepare(sqlUpsertDrive)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertDrive, ErrInvalidStatement)
}
// Update the pageToken for future sync jobs.
// TODO(m-rots) error should not be a data anomaly
_, err = upsertDrive.Exec(drive.ID, drive.PageToken)
if err != nil {
return fmt.Errorf("pageToken: %w", ds.ErrDataAnomaly)
}
// Insert the Shared Drive as the root folder.
_, err = upsertFolder.Exec(drive.ID, drive.ID, drive.Name, nil, false)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", drive.ID, ds.ErrDataAnomaly)
}
// Upsert all folders.
// Rollback when a data anomaly is detected (such as a FOREIGN KEY constraint).
for _, f := range folders {
_, err = upsertFolder.Exec(f.ID, drive.ID, f.Name, f.Parent, f.Trashed)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", f.ID, ds.ErrDataAnomaly)
}
}
// Upsert all files.
// Rollback when a data anomaly is detected (such as a FOREIGN KEY constraint).
for _, f := range files {
_, err = upsertFile.Exec(f.ID, drive.ID, f.Name, f.MD5, f.Parent, f.Size, f.Trashed)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", f.ID, ds.ErrDataAnomaly)
}
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit: %w", ErrTransaction)
}
return nil
}
// PartialSync synchronises the provided changes to the datastore.
//
// 1. Update the pageToken and (if applicable) the name of the Shared Drive.
//
// 2. Process changed folders with UPSERT.
//
// 3. Process changed folders with UPSERT.
//
// 4. Remove any items of which the IDs match with the removedIDs slice.
func (store *Datastore) PartialSync(drive ds.Drive, changedFolders []ds.Folder, changedFiles []ds.File, removedIDs []string) error {
tx, err := store.DB.Begin()
if err != nil {
return fmt.Errorf("begin: %w", ErrTransaction)
}
// Prepare sql statement to upsert folders.
upsertFolder, err := tx.Prepare(sqlUpsertFolder)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertFolder, ErrInvalidStatement)
}
// Prepare sql statement to upsert files.
upsertFile, err := tx.Prepare(sqlUpsertFile)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertFile, ErrInvalidStatement)
}
// Prepare sql statement to upsert a variable (pageToken).
upsertDrive, err := tx.Prepare(sqlUpsertDrive)
if err != nil {
return fmt.Errorf("%v: %w", sqlUpsertDrive, ErrInvalidStatement)
}
// Update the pageToken for future sync jobs.
_, err = upsertDrive.Exec(drive.ID, drive.PageToken)
if err != nil {
tx.Rollback()
return fmt.Errorf("pageToken: %w", ds.ErrDataAnomaly)
}
// Drive name is empty if not changed, so when not empty we should update it.
if drive.Name != "" {
_, err = upsertFolder.Exec(drive.ID, drive.ID, drive.Name, nil, false)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", drive.ID, ds.ErrDataAnomaly)
}
}
// upsert all changed folders and change childrens' trashed state
for _, f := range changedFolders {
_, err := upsertFolder.Exec(f.ID, drive.ID, f.Name, f.Parent, f.Trashed)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", f.ID, ds.ErrDataAnomaly)
}
}
// upsert all changed files
for _, f := range changedFiles {
_, err = upsertFile.Exec(f.ID, drive.ID, f.Name, f.MD5, f.Parent, f.Size, f.Trashed)
if err != nil {
tx.Rollback()
return fmt.Errorf("%v: %w", f.ID, ds.ErrDataAnomaly)
}
}
if len(removedIDs) > 0 {
// convert []string to []interface{} as Exec requires a []interface{} input
args := make([]interface{}, len(removedIDs)+1)
for i, id := range removedIDs {
args[i] = id
}
// append DriveID for the WHERE clause
args[len(removedIDs)] = drive.ID
// first try to delete all files to prevent data anomalies
deleteFiles := addParameters(sqlDeleteFiles, len(removedIDs))
_, err = tx.Exec(deleteFiles, args...)
if err != nil {
tx.Rollback()
return fmt.Errorf("deleting files: %w", ds.ErrDataAnomaly)
}
// then try to delete all folders, which should have no files as children now
deleteFolders := addParameters(sqlDeleteFolders, len(removedIDs))
_, err = tx.Exec(deleteFolders, args...)
if err != nil {
tx.Rollback()
return fmt.Errorf("deleting folders: %w", ds.ErrDataAnomaly)
}
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("commit: %w", ErrTransaction)
}
return nil
}
// PageToken retrieves the pageToken the datastore currently reflects.
func (store *Datastore) PageToken(driveID string) (string, error) {
var pageToken string
row := store.DB.QueryRow(sqlGetPageToken, driveID)
if err := row.Scan(&pageToken); err != nil {
return "", ds.ErrFullSync
}
return pageToken, nil
}
const sqlUpsertDrive = `
INSERT INTO drive (id, pageToken) VALUES (?, ?)
ON CONFLICT(id) DO UPDATE SET
pageToken=excluded.pageToken
`
const sqlUpsertFolder = `
INSERT INTO folder (id, drive, name, parent, trashed) VALUES (?, ?, ?, NULLIF(?, ""), ?)
ON CONFLICT(id, drive) DO UPDATE SET
name=excluded.name,
parent=excluded.parent,
trashed=excluded.trashed
`
const sqlUpsertFile = `
INSERT INTO file (id, drive, name, md5, parent, size, trashed) VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id, drive) DO UPDATE SET
name=excluded.name,
md5=excluded.md5,
parent=excluded.parent,
size=excluded.size,
trashed=excluded.trashed
`
const sqlDeleteFiles = `
DELETE FROM file WHERE id IN (?) AND drive=?
`
const sqlDeleteFolders = `
DELETE FROM folder WHERE id IN (?) AND drive=?
`
const sqlGetPageToken = `
SELECT pageToken FROM drive WHERE id=?
`