-
Notifications
You must be signed in to change notification settings - Fork 7
/
queue.go
119 lines (96 loc) · 2.9 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// This is queue library wrapper for widely popular queues. AnyQ provide one way to handle various queues.
//
// Supporting Queues
// - RabbitMQ(https://www.rabbitmq.com)
// - Kafka(https://kafka.apache.org)
// - NSQ(http://nsq.io)
// - NATS(http://nats.io)
//
// Visit https://github.com/jaehue/anyq
package anyq
import (
"fmt"
"reflect"
)
const _MESSAGE_BUF_COUNT = 100
var queues = make(map[string]Queuer)
type Message struct {
Body []byte
Origin interface{}
}
// Queuer provide generic method to handle queue
type Queuer interface {
// Conn returns original connection object.
Conn() (interface{}, error)
// NewConsumer create new consumer.
// You MUST pass valid argument such as RabbitmqConsumerArgs, KafkaConsumerArgs, NsqConsumerArgs, and NatsConsumerArgs
NewConsumer(args interface{}) (Consumer, error)
// NewProducer create new producer.
// You MUST pass valid argument such as RabbitmqProducerArgs, KafkaProducerArgs, NsqProducerArgs, and NatsProducerArgs
NewProducer(args interface{}) (Producer, error)
// SetLogger assigns the logger to use as well as a level.
// The logger parameter is an interface that requires the following method to be implemented (such as the the stdlib log.Logger):
//
// Output(calldepth int, s string)
//
SetLogger(logger, LogLevel)
Setup(string) error
closer
}
// Consumer process messages from Queue.
type Consumer interface {
// Consumer returns original consumer object
Consumer() (interface{}, error)
// BindRecvChan bind a channel for receive operations from queue.
BindRecvChan(messages chan<- *Message) error
closer
}
// Producer publish messages to Queue.
type Producer interface {
// Producer returns original producer object
Producer() (interface{}, error)
// Producer bind a channel for send operations to queue.
BindSendChan(messages <-chan []byte) error
closer
}
// LogLevel specifies the severity of a given log message
type LogLevel int
// Log levels
const (
LogLevelDebug LogLevel = iota
LogLevelInfo
LogLevelWarning
LogLevelError
)
type closer interface {
Close() error
}
type logger interface {
Output(calldepth int, s string) error
}
// Register makes a queue available by the provided name.
// If Register is called twice with the same name or if queue is nil, it panics.
func Register(name string, queue Queuer) {
if queue == nil {
panic("queue: Register queue is nil")
}
if _, dup := queues[name]; dup {
panic("queue: Register called twice for queue " + name)
}
queues[name] = queue
}
// New creates a queue specified by its queue name and a queue url,
func New(qname, url string, setupFn ...interface{}) (Queuer, error) {
q, ok := queues[qname]
if !ok {
return nil, fmt.Errorf("queue: unknown queue %q (forgotten import?)", qname)
}
if err := q.Setup(url); err != nil {
return nil, err
}
for _, f := range setupFn {
fn := reflect.ValueOf(f)
fn.Call([]reflect.Value{reflect.ValueOf(q)})
}
return q, nil
}