-
Notifications
You must be signed in to change notification settings - Fork 0
/
producer.go
39 lines (32 loc) · 930 Bytes
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package delay_queue
import (
"context"
)
type Producer struct {
redis Redis
luaSha1DelMessage string
}
func NewProducer() *Producer {
return &Producer{
luaSha1DelMessage: sha1Script(DelMessageScript),
}
}
func (p *Producer) RegisterRedis(redis Redis) *Producer {
p.redis = redis
return p
}
func (p *Producer) PushMessage(ctx context.Context, taskName string, message string) error {
return p.redis.ZAdd(ctx, waitingQueueKey(taskName), message)
}
func (p *Producer) PushMessageT(ctx context.Context, taskName string, time int64, message string) error {
return p.redis.ZAddT(ctx, waitingQueueKey(taskName), time, message)
}
func (p *Producer) DeleteMessage(ctx context.Context, taskName string, message string) error {
_, err := execLuaScript(ctx, p.redis, p.luaSha1DelMessage, DelMessageScript, []interface{}{
2,
waitingQueueKey(taskName),
doingQueueKey(taskName),
message,
})
return err
}