-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
99 lines (90 loc) · 2.43 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package consumer
import (
"context"
"errors"
"log"
"time"
"github.com/xuhaidong1/offlinepush/internal/interceptor"
"github.com/xuhaidong1/offlinepush/cmd/ioc"
"github.com/xuhaidong1/offlinepush/internal/consumer/repository"
"github.com/xuhaidong1/offlinepush/internal/domain"
"go.uber.org/zap"
)
var (
NoMessage = repository.NoMessage
Paused = errors.New("paused")
)
type Consumer struct {
repo repository.ConsumerRepository
interceptor *interceptor.Interceptor
pushLogger *log.Logger
logger *zap.Logger
}
func NewConsumer(repo repository.ConsumerRepository, interceptor *interceptor.Interceptor) *Consumer {
return &Consumer{
repo: repo,
interceptor: interceptor,
pushLogger: ioc.PushLogger,
logger: ioc.Logger,
}
}
func (c *Consumer) ConsumeOld(ctx context.Context, bizName string) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
if !c.interceptor.Permit(bizName) {
return Paused
}
// todo 改造这里 分片获取数据 一次获取一片数据:biz:devicetype 到本地缓存,
// todo 本地缓存消费完成后取下一片数据 如果被pause/cancel 需要把这一片的数据写回redis
msg, err := c.repo.GetMessageFromStorage(ctx, bizName)
if err != nil && !errors.Is(err, NoMessage) {
c.logger.Error("Consumer", zap.String("ConsumeOld", "GetMessage"), zap.Error(err))
return nil
}
if errors.Is(err, NoMessage) {
return nil
}
// 在这里mock推送。。
c.Push(msg)
}
}
}
func (c *Consumer) Consume(ctx, dequeueCtx context.Context, biz string, finished, queueReady chan struct{}) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-queueReady:
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-finished:
return nil
default:
if !c.interceptor.Permit(biz) {
return Paused
}
msg, err := c.repo.GetMessage(dequeueCtx, biz)
if err != nil {
// 这里不知道是什么原因取消的,所以continue去返回原因
// 如果是上级取消(服务关闭,暂停)会走入case <-ctx.Done():
// 如果是消费完了没有消息了,会走入case <-finished:
continue
}
if domain.IsEOF(msg) {
// 后面没消息了,广播告诉其它消费者goroutine退出
close(finished)
continue
}
c.Push(msg)
}
}
}
func (c *Consumer) Push(msg domain.Message) {
time.Sleep(50 * time.Millisecond)
c.pushLogger.Printf("push %v\n", msg)
}