-
Notifications
You must be signed in to change notification settings - Fork 5
/
queue.go
executable file
·130 lines (106 loc) · 2.75 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package queue
import (
"errors"
log "github.com/cihub/seelog"
"infini.sh/framework/core/config"
"infini.sh/framework/core/global"
"infini.sh/framework/core/pipeline"
"infini.sh/framework/core/queue"
. "infini.sh/framework/modules/queue/disk_queue"
"os"
"path"
"strings"
"sync"
"time"
)
var queues map[string]*BackendQueue
type DiskQueue struct {
pipeline.Parameters
}
func (module DiskQueue) Name() string {
return "Queue"
}
var initLocker sync.Mutex
func (module DiskQueue) initQueue(name string) error {
channel := "default"
if queues[name] != nil {
return nil
}
initLocker.Lock()
defer initLocker.Unlock()
//double check after lock in
if queues[name] != nil {
return nil
}
log.Debugf("init queue: %s", name)
dataPath := path.Join(global.Env().GetWorkingDir(), "queue", strings.ToLower(name))
os.MkdirAll(dataPath, 0755)
readBuffSize := module.GetIntOrDefault("read_chan_buffer", 0)
syncTime := time.Duration(module.GetIntOrDefault("sync_timeout_in_seconds", 10)) * time.Second
var syncEvery = module.GetInt64OrDefault("sync_every_in_seconds", 1000)
var maxPerFile = module.GetInt64OrDefault("max_bytes_per_file", 50*1024*1024*1024)
var minMsgSize int = module.GetIntOrDefault("min_msg_size", 1)
var maxMsgSize int = module.GetIntOrDefault("max_msg_size", 1<<25)
q := NewDiskQueue(strings.ToLower(channel), dataPath, maxPerFile, int32(minMsgSize), int32(maxMsgSize), syncEvery, syncTime, readBuffSize)
queues[name] = &q
return nil
}
func (module DiskQueue) Setup(cfg *config.Config) {
queues = make(map[string]*BackendQueue)
queue.Register("disk", module)
}
func (module DiskQueue) Push(k string, v []byte) error {
module.initQueue(k)
return (*queues[k]).Put(v)
}
func (module DiskQueue) ReadChan(k string) chan []byte {
module.initQueue(k)
return (*queues[k]).ReadChan()
}
func (module DiskQueue) Pop(k string, timeoutInSeconds time.Duration) ([]byte, error) {
module.initQueue(k)
if timeoutInSeconds > 0 {
timeout := make(chan bool, 1)
go func() {
time.Sleep(timeoutInSeconds)
timeout <- true
}()
select {
case b := <-(*queues[k]).ReadChan():
return b, nil
case <-timeout:
return nil, errors.New("time out")
}
} else {
b := <-(*queues[k]).ReadChan()
return b, nil
}
}
func (module DiskQueue) Close(k string) error {
b := (*queues[k]).Close()
return b
}
func (module DiskQueue) Depth(k string) int64 {
module.initQueue(k)
b := (*queues[k]).Depth()
return b
}
func (module DiskQueue) GetQueues() []string {
result := []string{}
for k := range queues {
result = append(result, k)
}
return result
}
func (module DiskQueue) Start() error {
return nil
}
func (module DiskQueue) Stop() error {
for _, v := range queues {
err := (*v).Close()
if err != nil {
log.Debug(err)
}
}
return nil
}