-
Notifications
You must be signed in to change notification settings - Fork 211
/
migrations.go
112 lines (99 loc) · 2.41 KB
/
migrations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package sql
import (
"bufio"
"bytes"
"embed"
"fmt"
"io/fs"
"strconv"
"strings"
)
//go:embed migrations/**/*.sql
var embedded embed.FS
type sqlMigration struct {
order int
name string
content *bufio.Scanner
}
func (m *sqlMigration) Apply(db Executor) error {
current, err := version(db)
if err != nil {
return err
}
if m.order <= current {
return nil
}
for m.content.Scan() {
if _, err := db.Exec(m.content.Text(), nil, nil); err != nil {
return fmt.Errorf("exec %s: %w", m.content.Text(), err)
}
}
// binding values in pragma statement is not allowed
if _, err := db.Exec(fmt.Sprintf("PRAGMA user_version = %d;", m.order), nil, nil); err != nil {
return fmt.Errorf("update user_version to %d: %w", m.order, err)
}
return nil
}
func (m *sqlMigration) Name() string {
return m.name
}
func (m *sqlMigration) Order() int {
return m.order
}
func (sqlMigration) Rollback() error {
// handled by the DB itself
return nil
}
func version(db Executor) (int, error) {
var current int
if _, err := db.Exec("PRAGMA user_version;", nil, func(stmt *Statement) bool {
current = stmt.ColumnInt(0)
return true
}); err != nil {
return 0, fmt.Errorf("read user_version %w", err)
}
return current, nil
}
func StateMigrations() ([]Migration, error) {
return sqlMigrations("state")
}
func LocalMigrations() ([]Migration, error) {
return sqlMigrations("local")
}
func sqlMigrations(dbname string) ([]Migration, error) {
var migrations []Migration
err := fs.WalkDir(embedded, fmt.Sprintf("migrations/%s", dbname), func(path string, d fs.DirEntry, err error) error {
if err != nil {
return fmt.Errorf("walkdir %s: %w", path, err)
}
if d.IsDir() {
return nil
}
parts := strings.Split(d.Name(), "_")
if len(parts) < 1 {
return fmt.Errorf("invalid migration %s", d.Name())
}
order, err := strconv.Atoi(parts[0])
if err != nil {
return fmt.Errorf("invalid migration %s: %w", d.Name(), err)
}
f, err := embedded.Open(path)
if err != nil {
return fmt.Errorf("read file %s: %w", path, err)
}
scanner := bufio.NewScanner(f)
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {
if i := bytes.Index(data, []byte(";")); i >= 0 {
return i + 1, data[0 : i+1], nil
}
return 0, nil, nil
})
migrations = append(migrations, &sqlMigration{
order: order,
name: d.Name(),
content: scanner,
})
return nil
})
return migrations, err
}