-
Notifications
You must be signed in to change notification settings - Fork 307
/
setup.go
59 lines (52 loc) · 2.28 KB
/
setup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package jobsdb
import (
"fmt"
"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/jobsdb/internal/lock"
migrator "github.com/rudderlabs/rudder-server/services/sql-migrator"
)
// SchemaMigrationTable returns the table name used for storing current schema version.
func (jd *HandleT) SchemaMigrationTable() string {
return fmt.Sprintf("%s_schema_migrations", jd.tablePrefix)
}
// setupDatabaseTables will initialize jobsdb tables using migration templates inside 'sql/migrations/jobsdb'.
// Dataset tables are not created via migration scripts, they can only be updated.
// The following data are passed to JobsDB migration templates:
// - Prefix: The table prefix used by this jobsdb instance.
// - Datasets: Array of existing dataset indices.
func (jd *HandleT) setupDatabaseTables(templateData map[string]interface{}) {
// setup migrator with appropriate schema migrations table
m := &migrator.Migrator{
Handle: jd.dbHandle,
MigrationsTable: jd.SchemaMigrationTable(),
ShouldForceSetLowerVersion: config.GetBool("SQLMigrator.forceSetLowerVersion", true),
}
// execute any necessary migrations
if err := m.MigrateFromTemplates("jobsdb", templateData); err != nil {
panic(fmt.Errorf("error while migrating '%v' jobsdb tables: %w", jd.tablePrefix, err))
}
}
func (jd *HandleT) runAlwaysChangesets(templateData map[string]interface{}) {
// setup migrator with appropriate schema migrations table
m := &migrator.Migrator{
Handle: jd.dbHandle,
MigrationsTable: fmt.Sprintf("%s_runalways_migrations", jd.tablePrefix),
RunAlways: true,
}
// execute any necessary migrations
if err := m.MigrateFromTemplates("jobsdb_always", templateData); err != nil {
panic(fmt.Errorf("error while running changesets that run always in '%s' jobsdb tables: %w", jd.tablePrefix, err))
}
}
func (jd *HandleT) dropDatabaseTables(l lock.LockToken) {
jd.logger.Infof("[JobsDB:%v] Dropping all database tables", jd.tablePrefix)
jd.dropSchemaMigrationTables()
jd.assertError(jd.dropAllDS(l))
jd.dropJournal()
jd.assertError(jd.dropAllBackupDS())
}
func (jd *HandleT) dropSchemaMigrationTables() {
sqlStatement := fmt.Sprintf(`DROP TABLE IF EXISTS %s`, jd.SchemaMigrationTable())
_, err := jd.dbHandle.Exec(sqlStatement)
jd.assertError(err)
}