-
Notifications
You must be signed in to change notification settings - Fork 16
/
queue.go
218 lines (196 loc) · 5 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
package main
import (
"database/sql"
"fmt"
"log"
"strings"
"time"
"github.com/isucon/isucon6-final/portal/job"
"github.com/pkg/errors"
)
type errAlreadyQueued int
func (n errAlreadyQueued) Error() string {
return fmt.Sprintf("job already queued (teamID=%d)", n)
}
func enqueueJob(teamID int) error {
var id int
err := db.QueryRow(`
SELECT id FROM queues
WHERE team_id = ? AND status IN ('waiting', 'running')`, teamID).Scan(&id)
switch {
case err == sql.ErrNoRows:
// 行がない場合はINSERTする
case err != nil:
return errors.Wrap(err, "failed to enqueue job when selecting table")
default:
return errAlreadyQueued(teamID)
}
// XXX: worker nodeが死んだ時のために古くて実行中のジョブがある場合をケアした方が良いかも
// XXX: ここですり抜けて二重で入る可能性がある
_, err = db.Exec(`
INSERT INTO queues (team_id) VALUES (?)`, teamID)
if err != nil {
return errors.Wrap(err, "enqueue job failed")
}
return nil
}
func dequeueJob(benchNode string) (*job.Job, error) {
var j job.Job
err := db.QueryRow(`
SELECT id, team_id FROM queues
WHERE status = 'waiting' ORDER BY id LIMIT 1`).Scan(&j.ID, &j.TeamID)
switch {
case err == sql.ErrNoRows:
return nil, nil
case err != nil:
return nil, errors.Wrap(err, "dequeue job failed when scanning job")
}
tx, err := db.Begin()
if err != nil {
return nil, errors.Wrap(err, "failed to dequeue job when beginning tx")
}
ret, err := tx.Exec(`
UPDATE queues SET status = 'running', bench_node = ?
WHERE id = ? AND status = 'waiting'`, benchNode, j.ID)
if err != nil {
tx.Rollback()
return nil, errors.Wrap(err, "failed to dequeue job when locking")
}
affected, err := ret.RowsAffected()
if err != nil {
tx.Rollback()
return nil, errors.Wrap(err, "failed to dequeue job when checking affected rows")
}
if affected > 1 {
tx.Rollback()
return nil, fmt.Errorf("failed to dequeue job. invalid affected rows: %d", affected)
}
err = tx.Commit()
if err != nil {
return nil, errors.Wrap(err, "failed to dequeue job when commiting tx")
}
// タッチの差で別のワーカーにジョブを取られたとか
if affected < 1 {
return nil, nil
}
return &j, nil
}
func doneJob(res *job.Result) error {
log.Printf("doneJob: job=%#v output=%#v", res.Job, res.Output)
tx, err := db.Begin()
if err != nil {
return errors.Wrap(err, "doneJob failed when beginning tx")
}
ret, err := tx.Exec(`
UPDATE queues
SET status = 'done', stderr = ?
WHERE id = ?
AND team_id = ?
AND status = 'running'
`,
res.Stderr,
res.Job.ID,
res.Job.TeamID,
)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "doneJob failed when locking")
}
affected, err := ret.RowsAffected()
if err != nil {
tx.Rollback()
return errors.Wrap(err, "doneJob failed when checking affected rows")
}
if affected != 1 {
tx.Rollback()
return fmt.Errorf("doneJob failed. invalid affected rows=%d", affected)
}
pass := 0
if res.Output.Pass {
pass = 1
}
_, err = tx.Exec(`
INSERT INTO results (team_id, queue_id, pass, score, messages)
VALUES (?, ?, ?, ?, ?)
`,
res.Job.TeamID, res.Job.ID, pass, res.Output.Score, strings.Join(res.Output.Messages, "\n"),
)
if err != nil {
tx.Rollback()
return errors.Wrap(err, "INSERT INTO results")
}
err = tx.Commit()
if err != nil {
return errors.Wrap(err, "doneJob failed when commiting tx")
}
return nil
}
type QueuedJob struct {
TeamID int
Status string
}
// まだ終わってないキューを取得
func getQueuedJobs(db *sql.DB) ([]QueuedJob, error) {
jobs := []QueuedJob{}
if getContestStatus() == contestStatusStarted {
rows, err := db.Query(`
SELECT team_id, status
FROM queues
WHERE status IN ('waiting', 'running')
AND team_id <> 9999
ORDER BY created_at ASC
`)
if err != nil {
return nil, err
}
for rows.Next() {
var job QueuedJob
err := rows.Scan(&job.TeamID, &job.Status)
if err != nil {
rows.Close()
return nil, err
}
jobs = append(jobs, job)
}
rows.Close()
if err := rows.Err(); err != nil {
return nil, err
}
}
return jobs, nil
}
type QueueItem struct {
ID int
TeamID int
TeamName string
Status string
BenchNode string
Stderr string
CreatedAt time.Time
UpdatedAt time.Time
}
// 終わってないのも終わってるのも含めてキューを取得
func getQueueItems(db *sql.DB, limit int) ([]QueueItem, error) {
rows, err := db.Query(`
SELECT
queues.id,team_id,name,status,IFNULL(bench_node, ''),IFNULL(stderr, ''),created_at,updated_at
FROM queues
LEFT JOIN teams ON queues.team_id = teams.id
ORDER BY queues.created_at DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
items := []QueueItem{}
defer rows.Close()
for rows.Next() {
var item QueueItem
err := rows.Scan(&item.ID, &item.TeamID, &item.TeamName, &item.Status, &item.BenchNode, &item.Stderr, &item.CreatedAt, &item.UpdatedAt)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, nil
}