/
main.go
225 lines (189 loc) · 6 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package schema
import (
"bytes"
"context"
"database/sql"
"fmt"
stdLog "log"
"text/tabwriter"
"time"
migrate "github.com/rubenv/sql-migrate"
"github.com/stellar/go/support/errors"
)
//go:generate go run github.com/kevinburke/go-bindata/go-bindata@v3.18.0+incompatible -nometadata -pkg schema -o bindata.go migrations/
// MigrateDir represents a direction in which to perform schema migrations.
type MigrateDir string
const (
// MigrateUp causes migrations to be run in the "up" direction.
MigrateUp MigrateDir = "up"
// MigrateDown causes migrations to be run in the "down" direction.
MigrateDown MigrateDir = "down"
// MigrateRedo causes migrations to be run down, then up
MigrateRedo MigrateDir = "redo"
)
// Migrations represents all of the schema migration for horizon
var Migrations migrate.MigrationSource = &migrate.AssetMigrationSource{
Asset: Asset,
AssetDir: AssetDir,
Dir: "migrations",
}
// Migrate performs schema migration. Migrations can occur in one of three
// ways:
//
// - up: migrations are performed from the currently installed version upwards.
// If count is 0, all unapplied migrations will be run.
//
// - down: migrations are performed from the current version downard. If count
// is 0, all applied migrations will be run in a downard direction.
//
// - redo: migrations are first ran downard `count` times, and then are rand
// upward back to the current version at the start of the process. If count is
// 0, a count of 1 will be assumed.
func Migrate(db *sql.DB, dir MigrateDir, count int) (int, error) {
if dir == MigrateUp {
// The code below locks ingestion to apply DB migrations. This works
// for MigrateUp migrations only because it's possible that MigrateDown
// can remove `key_value_store` table and it will deadlock the process.
txConn, err := db.Conn(context.Background())
if err != nil {
return 0, err
}
defer txConn.Close()
tx, err := txConn.BeginTx(context.Background(), nil)
if err != nil {
return 0, err
}
// Unlock ingestion when done. DB migrations run in a separate DB connection
// so no need to Commit().
defer tx.Rollback()
// Check if table exists
row := tx.QueryRow(`select exists (
select from information_schema.tables where table_schema = 'public' and table_name = 'key_value_store'
)`)
err = row.Err()
if err != nil {
return 0, err
}
var tableExists bool
err = row.Scan(&tableExists)
if err != nil {
return 0, err
}
if tableExists {
// Lock ingestion
row := tx.QueryRow("select value from key_value_store where key = 'exp_ingest_last_ledger' for update")
err = row.Err()
if err != nil {
return 0, err
}
}
}
switch dir {
case MigrateUp:
return migrate.ExecMax(db, "postgres", Migrations, migrate.Up, count)
case MigrateDown:
return migrate.ExecMax(db, "postgres", Migrations, migrate.Down, count)
case MigrateRedo:
if count == 0 {
count = 1
}
down, err := migrate.ExecMax(db, "postgres", Migrations, migrate.Down, count)
if err != nil {
return down, err
}
return migrate.ExecMax(db, "postgres", Migrations, migrate.Up, down)
default:
return 0, errors.New("Invalid migration direction")
}
}
// GetMigrationsUp returns a list of names of any migrations needed in the
// "up" direction (more recent schema versions).
func GetMigrationsUp(dbUrl string) (migrationIds []string) {
// Get a DB handle
db, dbErr := sql.Open("postgres", dbUrl)
if dbErr != nil {
stdLog.Fatal(dbErr)
}
defer db.Close()
// Get the possible migrations
possibleMigrations, _, migrateErr := migrate.PlanMigration(db, "postgres", Migrations, migrate.Up, 0)
if migrateErr != nil {
stdLog.Fatal(migrateErr)
}
// Extract a list of the possible migration names
for _, m := range possibleMigrations {
migrationIds = append(migrationIds, m.Id)
}
return migrationIds
}
// GetNumMigrationsDown returns the number of migrations to apply in the
// "down" direction to return to the older schema version expected by this
// version of Horizon. To keep the code simple, it does not provide a list of
// migration names.
func GetNumMigrationsDown(dbUrl string) (nMigrations int) {
// Get a DB handle
db, dbErr := sql.Open("postgres", dbUrl)
if dbErr != nil {
stdLog.Fatal(dbErr)
}
defer db.Close()
// Get the set of migrations recorded in the database
migrationRecords, recordErr := migrate.GetMigrationRecords(db, "postgres")
if recordErr != nil {
stdLog.Fatal(recordErr)
}
// Get the list of migrations needed by this version of Horizon
allNeededMigrations, _, migrateErr := migrate.PlanMigration(db, "postgres", Migrations, migrate.Down, 0)
if migrateErr != nil {
stdLog.Fatal(migrateErr)
}
// Return the size difference between the two sets of migrations
return len(migrationRecords) - len(allNeededMigrations)
}
// Status returns information about the current status of db migrations. Which
// ones are pending, and when past ones were applied.
//
// From: https://github.com/rubenv/sql-migrate/blob/master/sql-migrate/command_status.go
func Status(db *sql.DB) (string, error) {
buffer := &bytes.Buffer{}
migrations, err := Migrations.FindMigrations()
if err != nil {
return "", err
}
records, err := migrate.GetMigrationRecords(db, "postgres")
if err != nil {
return "", err
}
table := tabwriter.NewWriter(buffer, 60, 8, 0, '\t', 0)
fmt.Fprintln(table, "Migration\tApplied")
rows := make(map[string]*statusRow)
for _, m := range migrations {
rows[m.Id] = &statusRow{
Id: m.Id,
Migrated: false,
}
}
for _, r := range records {
if rows[r.Id] == nil {
return "", fmt.Errorf("Could not find migration file: %v", r.Id)
}
rows[r.Id].Migrated = true
rows[r.Id].AppliedAt = r.AppliedAt
}
for _, m := range migrations {
if rows[m.Id] != nil && rows[m.Id].Migrated {
fmt.Fprintf(table, "%s\t%s\n", m.Id, rows[m.Id].AppliedAt.String())
} else {
fmt.Fprintf(table, "%s\tno\n", m.Id)
}
}
if err := table.Flush(); err != nil {
return "", err
}
return buffer.String(), nil
}
type statusRow struct {
Id string
Migrated bool
AppliedAt time.Time
}