forked from vmware-archive/atc
-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
141_add_worker_resource_cache_to_containers.go
143 lines (121 loc) · 3.62 KB
/
141_add_worker_resource_cache_to_containers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package migrations
import (
"database/sql"
"fmt"
"github.com/concourse/atc/db/migration"
)
func AddWorkerResourceCacheToContainers(tx migration.LimitedTx) error {
_, err := tx.Exec(`
CREATE TABLE worker_resource_caches (
id serial PRIMARY KEY,
worker_base_resource_type_id int REFERENCES worker_base_resource_types (id) ON DELETE CASCADE,
resource_cache_id int REFERENCES resource_caches (id) ON DELETE CASCADE
)
`)
if err != nil {
return err
}
_, err = tx.Exec(`
ALTER TABLE containers
ADD COLUMN worker_resource_cache_id INTEGER
REFERENCES worker_resource_caches (id) ON DELETE SET NULL
`)
if err != nil {
return err
}
rows, err := tx.Query(`SELECT id, resource_cache_id, worker_name FROM containers WHERE resource_cache_id IS NOT NULL`)
if err != nil {
return err
}
defer rows.Close()
containerWorkerResourceCaches := []containerWorkerResourceCache{}
for rows.Next() {
var id int
var resourceCacheID int
var workerName string
err = rows.Scan(&id, &resourceCacheID, &workerName)
if err != nil {
return fmt.Errorf("failed to scan container id, resource_cache_id and worker_name: %s", err)
}
containerWorkerResourceCaches = append(containerWorkerResourceCaches, containerWorkerResourceCache{
ID: id,
ResourceCacheID: resourceCacheID,
WorkerName: workerName,
})
}
for _, cwrc := range containerWorkerResourceCaches {
baseResourceTypeID, err := findBaseResourceTypeID(tx, cwrc.ResourceCacheID)
if err != nil {
return err
}
if baseResourceTypeID == 0 {
// most likely resource cache was garbage collected
// keep worker_base_resource_type_id as null, so that gc can remove this container
continue
}
var workerBaseResourceTypeID int
err = tx.QueryRow(`
SELECT id FROM worker_base_resource_types WHERE base_resource_type_id=$1 AND worker_name=$2
`, baseResourceTypeID, cwrc.WorkerName).
Scan(&workerBaseResourceTypeID)
if err != nil {
return err
}
var workerResourceCacheID int
err = tx.QueryRow(`
SELECT id FROM worker_resource_caches WHERE worker_base_resource_type_id = $1 AND resource_cache_id = $2
`, workerBaseResourceTypeID, cwrc.ResourceCacheID).
Scan(&workerResourceCacheID)
if err != nil {
if err != sql.ErrNoRows {
return err
}
err = tx.QueryRow(`
INSERT INTO worker_resource_caches (worker_base_resource_type_id, resource_cache_id)
VALUES ($1, $2)
RETURNING id
`, workerBaseResourceTypeID, cwrc.ResourceCacheID).
Scan(&workerResourceCacheID)
if err != nil {
return err
}
}
_, err = tx.Exec(`
UPDATE containers SET worker_resource_cache_id=$1 WHERE id=$2
`, workerResourceCacheID, cwrc.ID)
if err != nil {
return err
}
}
_, err = tx.Exec(`
ALTER TABLE containers
DROP COLUMN resource_cache_id
`)
if err != nil {
return err
}
return nil
}
type containerWorkerResourceCache struct {
ID int
ResourceCacheID int
WorkerName string
}
func findBaseResourceTypeID(tx migration.LimitedTx, resourceCacheID int) (int, error) {
var innerResourceCacheID sql.NullInt64
var baseResourceTypeID sql.NullInt64
err := tx.QueryRow(`
SELECT resource_cache_id, base_resource_type_id FROM resource_caches rca LEFT JOIN resource_configs rcf ON rca.resource_config_id = rcf.id WHERE rca.id=$1
`, resourceCacheID).
Scan(&innerResourceCacheID, &baseResourceTypeID)
if err != nil {
return 0, err
}
if baseResourceTypeID.Valid {
return int(baseResourceTypeID.Int64), nil
}
if innerResourceCacheID.Valid {
return findBaseResourceTypeID(tx, int(innerResourceCacheID.Int64))
}
return 0, nil
}