-
Notifications
You must be signed in to change notification settings - Fork 402
/
migrator.go
224 lines (193 loc) · 6.37 KB
/
migrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package sqliteutil
import (
"context"
"database/sql/driver"
"fmt"
"github.com/mattn/go-sqlite3"
"github.com/zeebo/errs"
"storj.io/storj/private/dbutil/txutil"
"storj.io/storj/private/tagsql"
)
var (
// ErrMigrateTables is error class for MigrateTables
ErrMigrateTables = errs.Class("migrate tables")
// ErrKeepTables is error class for MigrateTables
ErrKeepTables = errs.Class("keep tables")
)
// getSqlite3Conn attempts to get a *sqlite3.SQLiteConn from the connection.
func getSqlite3Conn(conn interface{}) (*sqlite3.SQLiteConn, error) {
for {
switch c := conn.(type) {
case *sqlite3.SQLiteConn:
return c, nil
case interface{ Unwrap() driver.Conn }:
conn = c.Unwrap()
default:
return nil, ErrMigrateTables.New("unable to get raw database connection")
}
}
}
// MigrateTablesToDatabase copies the specified tables from srcDB into destDB.
// All tables in destDB will be dropped other than those specified in
// tablesToKeep.
func MigrateTablesToDatabase(ctx context.Context, srcDB, destDB tagsql.DB, tablesToKeep ...string) error {
err := backupDBs(ctx, srcDB, destDB)
if err != nil {
return ErrMigrateTables.Wrap(err)
}
// Remove tables we don't want to keep from the cloned destination database.
return ErrMigrateTables.Wrap(KeepTables(ctx, destDB, tablesToKeep...))
}
func backupDBs(ctx context.Context, srcDB, destDB tagsql.DB) error {
// Retrieve the raw Sqlite3 driver connections for the src and dest so that
// we can execute the backup API for a corruption safe clone.
srcConn, err := srcDB.Conn(ctx)
if err != nil {
return ErrMigrateTables.Wrap(err)
}
defer func() {
err = errs.Combine(err, ErrMigrateTables.Wrap(srcConn.Close()))
}()
destConn, err := destDB.Conn(ctx)
if err != nil {
return ErrMigrateTables.Wrap(err)
}
defer func() {
err = errs.Combine(err, ErrMigrateTables.Wrap(destConn.Close()))
}()
// The references to the driver connections are only guaranteed to be valid
// for the life of the callback so we must do the work within both callbacks.
err = srcConn.Raw(ctx, func(srcDriverConn interface{}) error {
srcSqliteConn, err := getSqlite3Conn(srcDriverConn)
if err != nil {
return err
}
err = destConn.Raw(ctx, func(destDriverConn interface{}) error {
destSqliteConn, err := getSqlite3Conn(destDriverConn)
if err != nil {
return err
}
return ErrMigrateTables.Wrap(backupConns(ctx, srcSqliteConn, destSqliteConn))
})
if err != nil {
return ErrMigrateTables.Wrap(err)
}
return nil
})
return ErrMigrateTables.Wrap(err)
}
// backupConns executes the sqlite3 backup process that safely ensures that no other
// connections to the database accidentally corrupt the source or destination.
func backupConns(ctx context.Context, sourceDB *sqlite3.SQLiteConn, destDB *sqlite3.SQLiteConn) error {
// "main" represents the main (ie not "temp") database in sqlite3, which is
// the database we want to backup, and the appropriate dest in the destDB
backup, err := destDB.Backup("main", sourceDB, "main")
if err != nil {
return ErrMigrateTables.Wrap(err)
}
isDone, err := backup.Step(0)
if err != nil {
return ErrMigrateTables.Wrap(err)
}
if isDone {
return ErrMigrateTables.New("Backup is done")
}
// Check that the page count and remaining values are reasonable.
initialPageCount := backup.PageCount()
if initialPageCount <= 0 {
return ErrMigrateTables.New("initialPageCount invalid")
}
initialRemaining := backup.Remaining()
if initialRemaining <= 0 {
return ErrMigrateTables.New("initialRemaining invalid")
}
if initialRemaining != initialPageCount {
return ErrMigrateTables.New("initialRemaining != initialPageCount")
}
// Step -1 is used to copy the entire source database to the destination.
isDone, err = backup.Step(-1)
if err != nil {
return ErrMigrateTables.Wrap(err)
}
if !isDone {
return ErrMigrateTables.New("Backup not done")
}
// Check that the page count and remaining values are reasonable.
finalPageCount := backup.PageCount()
if finalPageCount != initialPageCount {
return ErrMigrateTables.New("finalPageCount != initialPageCount")
}
finalRemaining := backup.Remaining()
if finalRemaining != 0 {
return ErrMigrateTables.New("finalRemaining invalid")
}
// Finish the backup.
err = backup.Finish()
if err != nil {
return ErrMigrateTables.Wrap(err)
}
return nil
}
// KeepTables drops all the tables except the specified tables to keep.
func KeepTables(ctx context.Context, db tagsql.DB, tablesToKeep ...string) (err error) {
err = dropTables(ctx, db, tablesToKeep...)
if err != nil {
return ErrKeepTables.Wrap(err)
}
// VACUUM the database to reclaim the space used by the dropped tables. The
// data will not actually be reclaimed until the db has been closed.
// We don't include this in the above transaction because
// you can't VACUUM within a transaction with SQLite3.
_, err = db.ExecContext(ctx, "VACUUM;")
if err != nil {
return ErrKeepTables.Wrap(err)
}
return err
}
// dropTables performs the table drops in a single transaction
func dropTables(ctx context.Context, db tagsql.DB, tablesToKeep ...string) (err error) {
err = txutil.WithTx(ctx, db, nil, func(ctx context.Context, tx tagsql.Tx) error {
// Get a list of tables excluding sqlite3 system tables.
rows, err := tx.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';")
if err != nil {
return err
}
// Collect a list of the tables. We must do this because we can't do DDL
// statements like drop tables while a query result is open.
var tables []string
for rows.Next() {
var tableName string
err = rows.Scan(&tableName)
if err != nil {
return errs.Combine(err, rows.Close())
}
tables = append(tables, tableName)
}
err = errs.Combine(rows.Err(), rows.Close())
if err != nil {
return err
}
// Loop over the list of tables and decide which ones to keep and which to drop.
for _, tableName := range tables {
if !tableToKeep(tableName, tablesToKeep) {
// Drop tables we aren't told to keep in the destination database.
_, err = tx.ExecContext(ctx, fmt.Sprintf("DROP TABLE %s;", tableName))
if err != nil {
return err
}
}
}
return nil
})
return ErrKeepTables.Wrap(err)
}
func tableToKeep(table string, tables []string) bool {
for _, t := range tables {
if t == table {
return true
}
}
return false
}