-
Notifications
You must be signed in to change notification settings - Fork 5
/
restore.go
72 lines (60 loc) · 1.58 KB
/
restore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Package restore periodically runs through all the queues and finds events
// which are in the claimed queue but have been abandoned and puts them back in
// the unclaimed queue
package restore
import (
"time"
"github.com/mc0/okq/db"
"github.com/mc0/okq/log"
)
func init() {
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
validateClaimedEvents()
}
}()
}
func validateClaimedEvents() {
log.L.Debug("validating claimed events")
queueNames := db.AllQueueNames()
for i := range queueNames {
queueName := queueNames[i]
claimedKey := db.ClaimedKey(queueName)
// get the presumably oldest 50 items
var eventIDs []string
eventIDs, err := db.Inst.Cmd("LRANGE", claimedKey, -50, -1).List()
if err != nil {
log.L.Printf("ERR lrange redis replied %q", err)
return
} else if len(eventIDs) == 0 {
continue
}
var locks []interface{}
for i := range eventIDs {
lockKey := db.ItemLockKey(queueName, eventIDs[i])
locks = append(locks, lockKey)
}
var locksList [][]byte
locksList, err = db.Inst.Cmd("MGET", locks...).ListBytes()
if err != nil {
log.L.Printf("ERR mget redis replied %q", err)
return
}
for i := range locksList {
if locksList[i] == nil {
err = restoreEventToQueue(queueName, eventIDs[i])
if err != nil {
return
}
}
}
}
}
func restoreEventToQueue(queueName string, eventID string) error {
unclaimedKey := db.UnclaimedKey(queueName)
claimedKey := db.ClaimedKey(queueName)
r := db.Inst.Lua("LREMRPUSH", 2, claimedKey, unclaimedKey, 0, eventID)
return r.Err
}