/
worker.go
129 lines (113 loc) · 2.71 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package queue
import (
"context"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws/session"
"go.uber.org/zap"
"github.com/unanet/go/pkg/log"
)
// HandlerFunc is used to define the Handler that is run on for each message
type HandlerFunc func(ctx context.Context, msg *M) error
// HandleMessage wraps a function for handling sqs messages
func (f HandlerFunc) HandleMessage(ctx context.Context, msg *M) error {
return f(ctx, msg)
}
type Handler interface {
HandleMessage(ctx context.Context, msg *M) error
}
type Worker struct {
q *Q
log *zap.Logger
name string
timeout time.Duration
ctx context.Context
cancel context.CancelFunc
done chan bool
wqs map[string]*Q
mutex sync.Mutex
sess *session.Session
}
func NewWorker(name string, q *Q, timeout time.Duration) *Worker {
ctx, cancel := context.WithCancel(context.Background())
w := Worker{
name: name,
q: q,
log: log.Logger.With(zap.Uint64("internal_queue_id", q.id), zap.String("worker", name)),
timeout: timeout,
ctx: ctx,
cancel: cancel,
sess: q.sess,
done: make(chan bool),
wqs: make(map[string]*Q),
}
return &w
}
func (worker *Worker) Start(h Handler) {
worker.log.Info("Queue worker started")
for {
select {
case <-worker.ctx.Done():
worker.log.Info("queue worker stopped")
close(worker.done)
return
default:
ctx := context.Background()
m, err := worker.q.Receive(ctx)
if err != nil {
worker.log.Panic("error receiving message from queue", zap.Error(err))
}
if len(m) == 0 {
continue
}
worker.run(h, m)
}
}
}
func (worker *Worker) Stop() {
worker.cancel()
<-worker.done
}
func (worker *Worker) DeleteMessage(ctx context.Context, m *M) error {
return worker.q.Delete(ctx, m)
}
func (worker *Worker) getQueue(qUrl string) *Q {
worker.mutex.Lock()
defer worker.mutex.Unlock()
if val, ok := worker.wqs[qUrl]; ok {
return val
}
q := NewQ(worker.sess, Config{
QueueURL: qUrl,
})
worker.wqs[qUrl] = q
return q
}
func (worker *Worker) Message(ctx context.Context, qUrl string, m *M) error {
q := worker.getQueue(qUrl)
return q.Message(ctx, m)
}
func (worker *Worker) run(h Handler, mCtx []*mContext) {
numMessages := len(mCtx)
var wg sync.WaitGroup
wg.Add(numMessages)
for _, mc := range mCtx {
go func(m *mContext) {
ctx, cancel := context.WithTimeout(m.ctx, worker.timeout)
defer cancel()
defer wg.Done()
if err := h.HandleMessage(ctx, &m.M); err != nil {
worker.log.Error("error handling message", zap.Error(err))
}
}(mc)
}
wg.Wait()
}
func GetLogger(ctx context.Context) *zap.Logger {
reqID := log.GetReqID(ctx)
if len(reqID) > 0 {
return log.Logger.With(zap.String("req_id", reqID))
} else {
return log.Logger
}
}