-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
99 lines (86 loc) · 2.13 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package redistorage
import (
"time"
"github.com/go-redis/redis"
log "github.com/sirupsen/logrus"
)
type Queue struct {
config QueueConfig
storage *Storage
queueKey string
processingKey string
lastItem string
items chan string
errors chan error
}
// Start the blocking redis queue reader
func (q *Queue) Start() {
// stuck items checker
go doEvery(q.config.ProcessingTimeout, q.check)
for {
i, err := q.storage.ListBlockingPopAndPush(q.queueKey, q.processingKey, q.config.BlockingDuration)
if err != nil {
// timeout reached
q.errors <- err
continue
}
// got something to work on
q.items <- i
}
}
// Done removes the item from the processing list
func (q *Queue) Done(i string) error {
_, err := q.storage.ListRemove(q.processingKey, 0, i)
return err
}
// Items feeds queue items through a channel
func (q *Queue) Items() <-chan string {
return q.items
}
// Errors feeds any errors happening while listening for items through a channel
func (q *Queue) Errors() <-chan error {
return q.errors
}
func (q *Queue) check() {
log.Debug("checking for stuck items")
// if last element of the list is always the same, it s stuck
i, err := q.storage.ListGet(q.processingKey, -1)
if err == redis.Nil {
// nothing in the processing queue
return
} else if err != nil {
log.Errorf("check list get %s", err)
return
}
if q.lastItem == i {
log.Infof("%s stuck in queue, retrying", i)
err := q.storage.ListUnshift(q.queueKey, i)
if err != nil {
log.Errorf("check list unshift %s", err)
return
}
// remove from processing list
q.Done(i)
// update to last item in processing list
i, _ = q.storage.ListGet(q.processingKey, -1)
}
// set for next check
q.lastItem = i
}
// NewQueue creates a new redis reliable queue instance
func NewQueue(s *Storage, qk string, pk string, c QueueConfig) *Queue {
return &Queue{
config: c,
storage: s,
queueKey: qk,
processingKey: pk,
lastItem: "",
items: make(chan string),
errors: make(chan error),
}
}
func doEvery(d time.Duration, f func()) {
for range time.Tick(d) {
f()
}
}