-
Notifications
You must be signed in to change notification settings - Fork 0
/
dequeue.go
132 lines (119 loc) · 3.69 KB
/
dequeue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package s3queue
import (
"database/sql"
taskqueue "github.com/touee/nyn/task-queue"
)
// DequeueForProcess 取出一个等待队列中处于等待状态的任务, 并将其标记为处理中状态
func (q *Queue) DequeueForProcess() (id int, taskType string, taskData []byte, err error) {
q.dbLock.Lock()
defer q.dbLock.Unlock()
return q.dequeueForProcess()
}
func (q *Queue) dequeueForProcess() (id int, taskType string, taskData []byte, err error) {
var tx *sql.Tx
if tx, err = q.db.Begin(); err != nil {
return 0, "", nil, err
}
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
var taskTypeID int
if err = tx.QueryRow(`
SELECT task_id, task_type_id, task_data FROM tasks
WHERE task_id = (
SELECT task_id FROM pending_queue
WHERE item_priority > 0 -- 未被冻结
AND item_remaining_attempts > 0 -- 未在处理
ORDER BY item_priority DESC, task_id ASC
LIMIT 1
)`).Scan(&id, &taskTypeID, &taskData); err == sql.ErrNoRows {
return 0, "", nil, taskqueue.ErrNoTasksInPending
} else if err != nil {
return 0, "", nil, err
}
taskType = q.getTypeName(taskTypeID)
if _, err = tx.Exec(`
UPDATE pending_queue SET item_remaining_attempts = -item_remaining_attempts
WHERE task_id = ?`, id); err != nil {
return 0, "", nil, err
}
return id, taskType, taskData, nil
}
// ReportProcessResult 是在完成处理取出的任务后, 向队列报告结果的方法
func (q *Queue) ReportProcessResult(id int, result taskqueue.ProcessResult) (err error) {
q.dbLock.Lock()
defer q.dbLock.Unlock()
return q.reportProcessResult(id, result)
}
func (q *Queue) reportProcessResult(id int, result taskqueue.ProcessResult) (err error) {
var tx *sql.Tx
if tx, err = q.db.Begin(); err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
}
}()
switch result {
case taskqueue.ProcessResultFailed: //< 计本次尝试
var remains int
if err = tx.QueryRow(`SELECT item_remaining_attempts FROM pending_queue WHERE task_id = ?`, id).Scan(&remains); err != nil {
return err
}
remains = -remains // 根据规定, 任务处理时此值为原本值的相反数
remains--
if remains > 0 {
if _, err = tx.Exec(`UPDATE pending_queue SET item_remaining_attempts = ? WHERE task_id = ?`, remains, id); err != nil {
return err
}
break
} else if remains < 0 {
panic("?")
}
result = taskqueue.ProcessResultGivenUp
fallthrough
case // 以下三类都要将任务移出等待队列
taskqueue.ProcessResultSuccessful,
taskqueue.ProcessResultGivenUp,
taskqueue.ProcessResultShouldBeExcluded:
if _, err = tx.Exec(`DELETE FROM pending_queue WHERE task_id = ?`, id); err != nil {
return err
}
var status taskqueue.TaskStatus
switch result {
case taskqueue.ProcessResultSuccessful:
status = taskqueue.TaskStatusFinished
case taskqueue.ProcessResultGivenUp:
status = taskqueue.TaskStatusGivenUp
case taskqueue.ProcessResultShouldBeExcluded:
status = taskqueue.TaskStatusExcluded
default:
panic("?")
}
if _, err = tx.Exec(`UPDATE tasks SET task_status = ? WHERE task_id = ?`, status, id); err != nil {
return err
}
case taskqueue.ProcessResultRetry: //< 不计本次尝试
if _, err = tx.Exec(`UPDATE pending_queue SET item_remaining_attempts = -item_remaining_attempts WHERE task_id = ?`, id); err != nil {
return err
}
case taskqueue.ProcessResultShouldBeFrozen: //< 冻结任务
if _, err = tx.Exec(`
UPDATE pending_queue
SET item_priority = -item_priority, -- 其为负值时任务认为是被冻结了的
item_remaining_attempts = -item_remaining_attempts
WHERE task_id = ?`, id); err != nil {
return err
}
default:
panic("?")
}
return nil
}