/
changelog.go
198 lines (178 loc) · 5.06 KB
/
changelog.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package dbsync
import (
"context"
"fmt"
"github.com/target/goalert/util/sqlutil"
"github.com/abiosoft/ishell"
"github.com/pkg/errors"
"github.com/vbauerster/mpb/v4"
"github.com/vbauerster/mpb/v4/decor"
)
const (
changeLogTableDel = `DROP TABLE IF EXISTS change_log`
changeLogTableDef = `
CREATE TABLE change_log (
id BIGSERIAL PRIMARY KEY,
op TEXT NOT NULL,
table_name TEXT NOT NULL,
row_id TEXT NOT NULL,
tx_id BIGINT,
cmd_id cid,
row_data JSONB
)`
changeLogFuncDel = `DROP FUNCTION IF EXISTS fn_process_change_log()`
changeLogFuncDef = `
CREATE OR REPLACE FUNCTION fn_process_change_log() RETURNS TRIGGER AS $$
DECLARE
cur_state enum_switchover_state := 'idle';
BEGIN
SELECT INTO cur_state current_state
FROM switchover_state;
IF cur_state != 'in_progress' THEN
RETURN NEW;
END IF;
IF (TG_OP = 'DELETE') THEN
INSERT INTO change_log (op, table_name, row_id, tx_id, cmd_id)
VALUES (TG_OP, TG_TABLE_NAME, cast(OLD.id as TEXT), txid_current(), OLD.cmax);
RETURN OLD;
ELSE
INSERT INTO change_log (op, table_name, row_id, tx_id, cmd_id, row_data)
VALUES (TG_OP, TG_TABLE_NAME, cast(NEW.id as TEXT), txid_current(), NEW.cmin, to_jsonb(NEW));
RETURN NEW;
END IF;
RETURN NULL;
END;
$$ LANGUAGE 'plpgsql'`
)
func changeLogTrigName(tableName string) string {
return fmt.Sprintf("zz_99_change_log_%s", tableName)
}
func changeLogTrigDel(tableName string) string {
return fmt.Sprintf(`DROP TRIGGER IF EXISTS %s ON %s`, sqlutil.QuoteID(changeLogTrigName(tableName)), sqlutil.QuoteID(tableName))
}
func changeLogTrigDef(tableName string) string {
return fmt.Sprintf(`
CREATE TRIGGER %s
AFTER INSERT OR UPDATE OR DELETE ON %s
FOR EACH ROW EXECUTE PROCEDURE fn_process_change_log()`,
sqlutil.QuoteID(changeLogTrigName(tableName)), sqlutil.QuoteID(tableName))
}
// ChangeLogEnable will instrument the database for the sync operation.
func (s *Sync) ChangeLogEnable(ctx context.Context, sh *ishell.Context) error {
var stat string
err := s.oldDB.QueryRowContext(ctx, `select current_state from switchover_state`).Scan(&stat)
if err != nil {
return errors.Wrap(err, "lookup switchover state")
}
if stat != "idle" {
return errors.New("must be idle")
}
run := func(name, stmt string) {
if err != nil {
return
}
_, err = s.oldDB.ExecContext(ctx, stmt)
err = errors.Wrap(err, name)
}
runNew := func(name, stmt string) {
if err != nil {
return
}
_, err = s.newDB.ExecContext(ctx, stmt)
err = errors.Wrap(err, name)
}
sh.Println("Resetting change log...")
runNew("clear dest change_log", changeLogTableDel)
runNew("configure dest change_log", changeLogTableDef)
run("clear change_log", changeLogTableDel)
run("configure change_log", changeLogTableDef)
run("define change hook", changeLogFuncDef)
run("create initial entry", `insert into change_log (op, table_name, row_id) values ('INIT', '', '')`)
p := mpb.NewWithContext(ctx)
process := make([]Table, 0, len(s.tables))
for _, t := range s.tables {
if contains(ignoreTriggerTables, t.Name) {
continue
}
process = append(process, t)
}
bar := p.AddBar(int64(len(process)),
mpb.BarClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(
decor.Name("Adding triggers..."),
"Instrumented all tables.")),
)
for _, t := range process {
run("clear prev. trigger for "+t.SafeName(), changeLogTrigDel(t.Name))
run("set trigger for "+t.SafeName(), changeLogTrigDef(t.Name))
bar.IncrBy(1)
}
p.Wait()
if err != nil {
return err
}
sh.Println("Activating change tracking...")
res, err := s.oldDB.ExecContext(ctx, `update switchover_state set current_state = 'in_progress' where current_state = 'idle'`)
if err != nil {
return err
}
r, err := res.RowsAffected()
if err != nil {
return err
}
if r != 1 {
return errors.New("not idle")
}
return nil
}
// ChangeLogDisable will remove all sync instrumentation.
func (s *Sync) ChangeLogDisable(ctx context.Context, sh *ishell.Context) error {
res, err := s.oldDB.ExecContext(ctx, `update switchover_state set current_state = 'idle' where current_state = 'in_progress'`)
if err != nil {
return err
}
r, err := res.RowsAffected()
if err != nil {
return err
}
if r != 1 {
return errors.New("not in_progress")
}
run := func(name, stmt string) {
if err != nil {
return
}
_, err = s.oldDB.ExecContext(ctx, stmt)
err = errors.Wrap(err, name)
}
runNew := func(name, stmt string) {
if err != nil {
return
}
_, err = s.newDB.ExecContext(ctx, stmt)
err = errors.Wrap(err, name)
}
p := mpb.NewWithContext(ctx)
bar := p.AddBar(int64(len(s.tables)),
mpb.BarClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(
decor.Name("Removing triggers..."),
"Removed all triggers."),
),
)
for _, t := range s.tables {
run("clear trigger for "+t.SafeName(), changeLogTrigDel(t.Name))
bar.IncrBy(1)
}
p.Wait()
sh.Println("Resetting change log...")
run("remove change hook", changeLogFuncDel)
run("remove change_log", changeLogTableDel)
runNew("remove dest change_log", changeLogTableDel)
if err != nil {
return err
}
return nil
}