/
queue.go
122 lines (104 loc) · 2.56 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package connector
import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
)
// Queue is an interface modeling a task-queue (it is started and more Requests can be pushed to it, and finally it is stopped after all requests are handled).
type Queue interface {
ResponseHandlerSetter
SenderSetter
Start() error
Push(request Request) error
Stop() error
}
type queue struct {
sender Sender
responseHandler ResponseHandler
requestsC chan Request
nWorkers int
metrics bool
wg sync.WaitGroup
}
// NewQueue returns a new Queue (not started).
func NewQueue(sender Sender, nWorkers int) Queue {
q := &queue{
sender: sender,
nWorkers: nWorkers,
metrics: true,
}
return q
}
func (q *queue) SetResponseHandler(rh ResponseHandler) {
q.responseHandler = rh
}
func (q *queue) ResponseHandler() ResponseHandler {
return q.responseHandler
}
func (q *queue) Sender() Sender {
return q.sender
}
func (q *queue) SetSender(s Sender) {
q.sender = s
}
// Start a fixed number of goroutines to handle requests and responses w.r.t. external push-notification services.
func (q *queue) Start() error {
q.requestsC = make(chan Request)
for i := 1; i <= q.nWorkers; i++ {
go q.worker(i)
}
return nil
}
func (q *queue) worker(i int) {
logger.WithField("worker", i).Info("starting queue worker")
for request := range q.requestsC {
q.handle(request)
}
}
func (q *queue) handle(request Request) {
q.wg.Add(1)
defer q.wg.Done()
var beforeSend time.Time
if q.metrics {
beforeSend = time.Now()
}
response, err := q.sender.Send(request)
if q.responseHandler != nil {
var metadata *Metadata
if q.metrics {
metadata = &Metadata{time.Since(beforeSend)}
}
err = q.responseHandler.HandleResponse(request, response, metadata, err)
if err != nil {
logger.WithFields(log.Fields{
"error": err.Error(),
"subscriber": request.Subscriber(),
"message": request.Message(),
}).Error("error handling connector response")
}
} else if err == nil {
logger.WithField("response", response).Info("no response handler was set")
} else {
logger.WithField("error", err.Error()).Error("error while sending, and no response handler was set")
}
}
func (q *queue) Push(request Request) error {
// recover if the channel been closed
defer func() {
if r := recover(); r != nil {
switch x := r.(type) {
case error:
logger.WithError(x).Error("recovered from error")
default:
panic(r)
}
}
}()
q.requestsC <- request
return nil
}
func (q *queue) Stop() error {
close(q.requestsC)
q.wg.Wait()
return nil
}