/
migrations.go
371 lines (313 loc) · 9.26 KB
/
migrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
package data
import (
"database/sql"
"fmt"
"path/filepath"
"time"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
"github.com/teris-io/shortid"
)
func migrateDatabaseSchema(db *sql.DB, from, to int) error {
log.Printf("Migrating database from version %d to %d", from, to)
dbBackupFile := filepath.Join(config.BackupDirectory, fmt.Sprintf("owncast-v%d.bak", from))
utils.Backup(db, dbBackupFile)
for v := from; v < to; v++ {
log.Tracef("Migration step from %d to %d\n", v, v+1)
switch v {
case 0:
migrateToSchema1(db)
case 1:
migrateToSchema2(db)
case 2:
migrateToSchema3(db)
case 3:
migrateToSchema4(db)
case 4:
migrateToSchema5(db)
case 5:
migrateToSchema6(db)
case 6:
migrateToSchema7(db)
default:
log.Fatalln("missing database migration step")
}
}
_, err := db.Exec("UPDATE config SET value = ? WHERE key = ?", to, "version")
if err != nil {
return err
}
return nil
}
func migrateToSchema7(db *sql.DB) {
log.Println("Migrating users. This may take time if you have lots of users...")
var ids []string
rows, err := db.Query(`SELECT id FROM users`)
if err != nil {
log.Errorln("error migrating access tokens to schema v5", err)
return
}
if rows.Err() != nil {
log.Errorln("error migrating users to schema v7", rows.Err())
return
}
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
log.Error("There is a problem reading the database when migrating users.", err)
return
}
ids = append(ids, id)
}
defer rows.Close()
tx, _ := db.Begin()
stmt, _ := tx.Prepare("update users set display_color=? WHERE id=?")
defer stmt.Close()
for _, id := range ids {
displayColor := utils.GenerateRandomDisplayColor(config.MaxUserColor)
if _, err := stmt.Exec(displayColor, id); err != nil {
log.Panic(err)
return
}
}
if err := tx.Commit(); err != nil {
log.Panicln(err)
}
}
func migrateToSchema6(db *sql.DB) {
// Fix chat messages table schema. Since chat is ephemeral we can drop
// the table and recreate it.
// Drop the old messages table
MustExec(`DROP TABLE messages`, db)
// Recreate it
CreateMessagesTable(db)
}
// nolint:cyclop
func migrateToSchema5(db *sql.DB) {
// Create the access tokens table.
createAccessTokenTable(db)
// 1. Authenticated bool added to the users table.
// 2. Access tokens are now stored in their own table.
//
// Long story short, the access_token used to be the primary key of the users
// table. However, now it's going to live in its own table. However, you
// cannot change the primary key. So we need to create a copy table, then
// migrate the access tokens, and then move the copy into place.
createTempTable := `CREATE TABLE IF NOT EXISTS users_copy (
"id" TEXT,
"display_name" TEXT NOT NULL,
"display_color" NUMBER NOT NULL,
"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
"disabled_at" TIMESTAMP,
"previous_names" TEXT DEFAULT '',
"namechanged_at" TIMESTAMP,
"authenticated_at" TIMESTAMP,
"scopes" TEXT,
"type" TEXT DEFAULT 'STANDARD',
"last_used" DATETIME DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (id)
);CREATE INDEX user_id_disabled_at_index ON users (id, disabled_at);
CREATE INDEX user_id_index ON users (id);
CREATE INDEX user_id_disabled_index ON users (id, disabled_at);
CREATE INDEX user_disabled_at_index ON USERS (disabled_at);`
_, err := db.Exec(createTempTable)
if err != nil {
log.Errorln("error running migration, you may experience issues: ", err)
}
// Start insert transaction
tx, err := db.Begin()
if err != nil {
log.Errorln(err)
return
}
// Migrate the users table to the new users_copy table.
rows, err := tx.Query(`SELECT id, access_token, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used FROM users`)
if err != nil {
log.Errorln("error migrating access tokens to schema v5", err)
return
}
if rows.Err() != nil {
log.Errorln("error migrating access tokens to schema v5", rows.Err())
return
}
defer rows.Close()
defer tx.Rollback() //nolint:errcheck
log.Println("Migrating users. This may take time if you have lots of users...")
for rows.Next() {
var id string
var accessToken string
var displayName string
var displayColor int
var createdAt time.Time
var disabledAt *time.Time
var previousNames string
var namechangedAt *time.Time
var scopes *string
var userType string
var lastUsed *time.Time
if err := rows.Scan(&id, &accessToken, &displayName, &displayColor, &createdAt, &disabledAt, &previousNames, &namechangedAt, &scopes, &userType, &lastUsed); err != nil {
log.Error("There is a problem reading the database when migrating users.", err)
return
}
stmt, err := tx.Prepare(`INSERT INTO users_copy (id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type, last_used) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`)
if err != nil {
log.Errorln(err)
return
}
defer stmt.Close()
if _, err := stmt.Exec(id, displayName, displayColor, createdAt, disabledAt, previousNames, namechangedAt, scopes, userType, lastUsed); err != nil {
log.Errorln(err)
return
}
stmt, err = tx.Prepare(`INSERT INTO user_access_tokens(token, user_id, timestamp) VALUES (?, ?, ?) ON CONFLICT DO NOTHING`)
if err != nil {
log.Errorln(err)
return
}
defer stmt.Close()
if _, err := stmt.Exec(accessToken, id, createdAt); err != nil {
log.Errorln(err)
return
}
}
if err := tx.Commit(); err != nil {
log.Errorln(err)
}
_, err = db.Exec(`PRAGMA foreign_keys = OFF;DROP TABLE "users";ALTER TABLE "users_copy" RENAME TO users;PRAGMA foreign_keys = ON;`)
if err != nil {
log.Errorln("error running migration, you may experience issues: ", err)
}
}
func migrateToSchema4(db *sql.DB) {
// We now save the follow request object.
stmt, err := db.Prepare("ALTER TABLE ap_followers ADD COLUMN request_object BLOB")
if err != nil {
log.Errorln("Error running migration. This may be because you have already been running a dev version.", err)
return
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
}
func migrateToSchema3(db *sql.DB) {
// Since it's just a backlog of chat messages let's wipe the old messages
// and recreate the table.
// Drop the old messages table
stmt, err := db.Prepare("DROP TABLE messages")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
// Recreate it
CreateMessagesTable(db)
}
func migrateToSchema2(db *sql.DB) {
// Since it's just a backlog of chat messages let's wipe the old messages
// and recreate the table.
// Drop the old messages table
stmt, err := db.Prepare("DROP TABLE messages")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
// Recreate it
CreateMessagesTable(db)
}
func migrateToSchema1(db *sql.DB) {
// Since it's just a backlog of chat messages let's wipe the old messages
// and recreate the table.
// Drop the old messages table
stmt, err := db.Prepare("DROP TABLE messages")
if err != nil {
log.Fatal(err)
}
defer stmt.Close()
_, err = stmt.Exec()
if err != nil {
log.Warnln(err)
}
// Recreate it
CreateMessagesTable(db)
// Migrate access tokens to become chat users
type oldAccessToken struct {
createdAt time.Time
lastUsedAt *time.Time
accessToken string
displayName string
scopes string
}
oldAccessTokens := make([]oldAccessToken, 0)
query := `SELECT * FROM access_tokens`
rows, err := db.Query(query)
if err != nil || rows.Err() != nil {
log.Errorln("error migrating access tokens to schema v1", err, rows.Err())
return
}
defer rows.Close()
for rows.Next() {
var token string
var name string
var scopes string
var timestampString string
var lastUsedString *string
if err := rows.Scan(&token, &name, &scopes, ×tampString, &lastUsedString); err != nil {
log.Error("There is a problem reading the database.", err)
return
}
timestamp, err := time.Parse(time.RFC3339, timestampString)
if err != nil {
return
}
var lastUsed *time.Time
if lastUsedString != nil {
lastUsedTime, _ := time.Parse(time.RFC3339, *lastUsedString)
lastUsed = &lastUsedTime
}
oldToken := oldAccessToken{
accessToken: token,
displayName: name,
scopes: scopes,
createdAt: timestamp,
lastUsedAt: lastUsed,
}
oldAccessTokens = append(oldAccessTokens, oldToken)
}
// Recreate them as users
for _, token := range oldAccessTokens {
color := utils.GenerateRandomDisplayColor(config.MaxUserColor)
if err := insertAPIToken(db, token.accessToken, token.displayName, color, token.scopes); err != nil {
log.Errorln("Error migrating access token", err)
}
}
}
func insertAPIToken(db *sql.DB, token string, name string, color int, scopes string) error {
log.Debugln("Adding new access token:", name)
id := shortid.MustGenerate()
tx, err := db.Begin()
if err != nil {
return err
}
stmt, err := tx.Prepare("INSERT INTO users(id, access_token, display_name, display_color, scopes, type) values(?, ?, ?, ?, ?, ?)")
if err != nil {
return err
}
defer stmt.Close()
if _, err = stmt.Exec(id, token, name, color, scopes, "API"); err != nil {
return err
}
if err = tx.Commit(); err != nil {
return err
}
return nil
}