-
Notifications
You must be signed in to change notification settings - Fork 2
/
redis.go
82 lines (70 loc) · 1.76 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package queue
import (
"context"
"github.com/mss-boot-io/mss-boot-admin/storage"
"github.com/mss-boot-io/redisqueue/v2"
"github.com/redis/go-redis/v9"
)
// NewRedis redis模式
func NewRedis(
producerOptions *redisqueue.ProducerOptions,
consumerOptions *redisqueue.ConsumerOptions,
) (*Redis, error) {
var err error
r := &Redis{}
r.producer, err = r.newProducer(producerOptions)
if err != nil {
return nil, err
}
r.consumer, err = r.newConsumer(consumerOptions)
if err != nil {
return nil, err
}
return r, nil
}
// Redis cache implement
type Redis struct {
client *redis.Client
consumer *redisqueue.Consumer
producer *redisqueue.Producer
}
func (*Redis) String() string {
return "redis"
}
func (r *Redis) newConsumer(options *redisqueue.ConsumerOptions) (*redisqueue.Consumer, error) {
if options == nil {
options = &redisqueue.ConsumerOptions{}
}
return redisqueue.NewConsumerWithOptions(options)
}
func (r *Redis) newProducer(options *redisqueue.ProducerOptions) (*redisqueue.Producer, error) {
if options == nil {
options = &redisqueue.ProducerOptions{}
}
return redisqueue.NewProducerWithOptions(options)
}
func (r *Redis) Append(opts ...storage.Option) error {
o := storage.SetOptions(opts...)
err := r.producer.Enqueue(&redisqueue.Message{
ID: o.Message.GetID(),
Stream: o.Message.GetStream(),
Values: o.Message.GetValues(),
})
return err
}
func (r *Redis) Register(opts ...storage.Option) {
o := storage.SetOptions(opts...)
r.consumer.Register(o.Topic, func(message *redisqueue.Message) error {
m := new(Message)
m.SetValues(message.Values)
m.SetStream(message.Stream)
m.SetID(message.ID)
return o.F(m)
})
}
func (r *Redis) Run(context.Context) {
r.consumer.Run()
}
func (r *Redis) Shutdown() {
r.consumer.Shutdown()
}