forked from digitalocean/atc
/
80_add_pipeline_build_events_tables.go
186 lines (156 loc) · 3.89 KB
/
80_add_pipeline_build_events_tables.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package migrations
import (
"fmt"
"github.com/concourse/atc/dbng/migration"
)
func AddPipelineBuildEventsTables(tx migration.LimitedTx) error {
_, err := tx.Exec(`
ALTER TABLE build_events
DROP CONSTRAINT build_events_build_id_fkey
`)
if err != nil {
return fmt.Errorf("failed to update build events foreign key: %s", err)
}
rows, err := tx.Query(`SELECT id FROM pipelines`)
if err != nil {
return err
}
defer rows.Close()
var pipelineIDs []int
for rows.Next() {
var pipelineID int
err = rows.Scan(&pipelineID)
if err != nil {
return fmt.Errorf("failed to scan pipeline ID: %s", err)
}
pipelineIDs = append(pipelineIDs, pipelineID)
}
for _, pipelineID := range pipelineIDs {
err = createBuildEventsTable(tx, pipelineID)
if err != nil {
return fmt.Errorf("failed to create build events table: %s", err)
}
err = populateBuildEventsTable(tx, pipelineID)
if err != nil {
return fmt.Errorf("failed to populate build events: %s", err)
}
}
_, err = tx.Exec(`
CREATE INDEX build_events_build_id_idx ON build_events (build_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX build_outputs_build_id_idx ON build_outputs (build_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX build_inputs_build_id_idx ON build_inputs (build_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX build_outputs_versioned_resource_id_idx ON build_outputs (versioned_resource_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX build_inputs_versioned_resource_id_idx ON build_inputs (versioned_resource_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX image_resource_versions_build_id_idx ON image_resource_versions (build_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX pipelines_team_id_idx ON pipelines (team_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX resources_pipeline_id_idx ON resources (pipeline_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX jobs_pipeline_id_idx ON jobs (pipeline_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX jobs_serial_groups_job_id_idx ON jobs_serial_groups (job_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX builds_job_id_idx ON builds (job_id)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
CREATE INDEX versioned_resources_resource_id_idx ON versioned_resources (resource_id)
`)
if err != nil {
return fmt.Errorf("failed to create indexes: %s", err)
}
_, err = tx.Exec(`
DELETE FROM ONLY build_events
WHERE build_id IN (SELECT id FROM builds WHERE job_id IS NOT NULL)
`)
if err != nil {
return fmt.Errorf("failed to clean up build events: %s", err)
}
return nil
}
func createBuildEventsTable(tx migration.LimitedTx, pipelineID int) error {
_, err := tx.Exec(fmt.Sprintf(`
CREATE TABLE pipeline_build_events_%[1]d ()
INHERITS (build_events)
`, pipelineID))
if err != nil {
return err
}
_, err = tx.Exec(fmt.Sprintf(`
CREATE INDEX pipelines_build_events_%[1]d_build_id ON pipeline_build_events_%[1]d (build_id)
`, pipelineID))
if err != nil {
return err
}
_, err = tx.Exec(fmt.Sprintf(`
CREATE UNIQUE INDEX pipeline_build_events_%[1]d_build_id_event_id ON pipeline_build_events_%[1]d USING btree (build_id, event_id)
`, pipelineID))
if err != nil {
return err
}
return nil
}
func populateBuildEventsTable(tx migration.LimitedTx, pipelineID int) error {
_, err := tx.Exec(fmt.Sprintf(`
INSERT INTO pipeline_build_events_%[1]d (
build_id, type, payload, event_id, version
)
SELECT build_id, type, payload, event_id, version
FROM build_events AS e, builds AS b, jobs AS j
WHERE j.pipeline_id = $1
AND b.job_id = j.id
AND b.id = e.build_id
`, pipelineID), pipelineID)
if err != nil {
return fmt.Errorf("failed to insert: %s", err)
}
return err
}