-
Notifications
You must be signed in to change notification settings - Fork 239
/
007_replaces_skips.go
142 lines (129 loc) · 4.25 KB
/
007_replaces_skips.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package migrations
import (
"context"
"database/sql"
"fmt"
"strings"
)
const ReplacesSkipsMigrationKey = 7
// Register this migration
func init() {
registerMigration(ReplacesSkipsMigrationKey, replacesSkipsMigration)
}
// This migration adds a replaces and skips field to the operatorbundle table
// Two triggers are added to clean up the api table when no bundles require or provide them anymore
var replacesSkipsMigration = &Migration{
Id: ReplacesSkipsMigrationKey,
Up: func(ctx context.Context, tx *sql.Tx) error {
sql := `
ALTER TABLE operatorbundle
ADD COLUMN replaces TEXT;
ALTER TABLE operatorbundle
ADD COLUMN skips TEXT;
CREATE TRIGGER api_provider_cleanup
AFTER DELETE ON api_provider
WHEN NOT EXISTS (SELECT 1 FROM api_provider JOIN api_requirer WHERE
(api_provider.group_name = OLD.group_name AND api_provider.version = OLD.version AND api_provider.kind = OLD.kind) OR
(api_requirer.group_name = OLD.group_name AND api_requirer.version = OLD.version AND api_requirer.kind = OLD.kind))
BEGIN
DELETE FROM api WHERE group_name = OLD.group_name AND version = OLD.version AND kind = OLD.kind;
END;
CREATE TRIGGER api_requirer_cleanup
AFTER DELETE ON api_requirer
WHEN NOT EXISTS (SELECT 1 FROM api_provider JOIN api_requirer WHERE
(api_provider.group_name = OLD.group_name AND api_provider.version = OLD.version AND api_provider.kind = OLD.kind) OR
(api_requirer.group_name = OLD.group_name AND api_requirer.version = OLD.version AND api_requirer.kind = OLD.kind))
BEGIN
DELETE FROM api WHERE group_name = OLD.group_name AND version = OLD.version AND kind = OLD.kind;
END;
`
_, err := tx.ExecContext(ctx, sql)
if err != nil {
return err
}
bundles, err := listBundles(ctx, tx)
if err != nil {
return err
}
for _, bundle := range bundles {
if err := extractReplaces(ctx, tx, bundle); err != nil {
return fmt.Errorf("error backfilling replaces and skips: %v", err)
}
}
return err
},
Down: func(ctx context.Context, tx *sql.Tx) error {
foreignKeyOff := `PRAGMA foreign_keys = 0`
createTempTable := `CREATE TABLE operatorbundle_backup (name TEXT, csv TEXT, bundle TEXT, bundlepath TEXT, version TEXT, skiprange TEXT)`
backupTargetTable := `INSERT INTO operatorbundle_backup SELECT name, csv, bundle, bundlepath, version, skiprange FROM operatorbundle`
dropTargetTable := `DROP TABLE operatorbundle`
renameBackUpTable := `ALTER TABLE operatorbundle_backup RENAME TO operatorbundle;`
foreignKeyOn := `PRAGMA foreign_keys = 1`
_, err := tx.ExecContext(ctx, foreignKeyOff)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, createTempTable)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, backupTargetTable)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, dropTargetTable)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, renameBackUpTable)
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, foreignKeyOn)
return err
},
}
func extractReplaces(ctx context.Context, tx *sql.Tx, name string) error {
replaces, skips, err := getReplacesAndSkips(ctx, tx, name)
if err != nil {
return err
}
updateSql := `update operatorbundle SET replaces = ?, skips = ? WHERE name = ?;`
_, err = tx.ExecContext(ctx, updateSql, replaces, strings.Join(skips, ","), name)
return err
}
func getReplacesAndSkips(ctx context.Context, tx *sql.Tx, name string) (replaces string, skips []string, err error) {
getReplacees := `
SELECT DISTINCT replaces.operatorbundle_name
FROM channel_entry
LEFT OUTER JOIN channel_entry replaces ON channel_entry.replaces = replaces.entry_id
WHERE channel_entry.operatorbundle_name = ?
ORDER BY channel_entry.depth ASC
`
rows, err := tx.QueryContext(ctx, getReplacees, name)
if err != nil {
return "", nil, fmt.Errorf("error backfilling replaces and skips: %v", err)
}
defer rows.Close()
if rows.Next() {
var replaceeName sql.NullString
if err = rows.Scan(&replaceeName); err != nil {
return
}
if replaceeName.Valid {
replaces = replaceeName.String
}
}
skips = []string{}
for rows.Next() {
var skipName sql.NullString
if err = rows.Scan(&skipName); err != nil {
return
}
if !skipName.Valid {
continue
}
skips = append(skips, skipName.String)
}
return
}