/
stream_consume_redis.go
88 lines (77 loc) · 1.81 KB
/
stream_consume_redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package processing
import (
"context"
"errors"
"time"
"github.com/go-redis/redis/v8"
"github.com/markuskont/datamodels"
"github.com/sirupsen/logrus"
)
type ConfigConsumeRedis struct {
ConfigStreamWorkers
ConfigStreamRedis
Handler MapHandlerFunc
}
func ConsumeRedis(c ConfigConsumeRedis) error {
if err := c.ConfigStreamWorkers.Validate(); err != nil {
return err
}
if err := c.ConfigStreamRedis.Validate(); err != nil {
return err
}
if c.Handler == nil {
return errors.New("missing data map handler")
}
if err := waitOnRedis(c.Ctx, c.Client, c.Logger); err != nil {
return err
}
// TODO: a single redis consumer that uses pipelines is much better
// still need load-balancing for JSON decode though
// we should make one worker that consumes messages and sends
// to decode workers via channel or shard assignment
for i := 0; i < c.Workers; i++ {
worker := i
c.Pool.Go(func() error {
lctx := c.Logger.
WithField("worker", worker).
WithField("queue", c.Key)
tick := time.NewTicker(30 * time.Second)
defer tick.Stop()
var count int
lctx.Info("worker setting up")
loop:
for {
select {
case <-tick.C:
lctx.WithFields(logrus.Fields{
"worker": worker,
"queue": c.Key,
"count": count,
}).Debug("redis consumer")
case <-c.Ctx.Done():
lctx.Info("caught exit")
break loop
default:
raw, err := c.Client.LPop(context.TODO(), c.Key).Bytes()
if err != nil {
if err == redis.Nil {
time.Sleep(50 * time.Millisecond)
} else {
lctx.Error(err)
}
continue loop
}
var e datamodels.Map
if err := json.Unmarshal(raw, &e); err != nil {
lctx.Error(err)
continue loop
}
c.Handler(datamodels.NewSafeMap(e))
count++
}
}
return nil
})
}
return nil
}