-
Notifications
You must be signed in to change notification settings - Fork 1
/
sub.go
73 lines (55 loc) · 1.46 KB
/
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package pubsub
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
"github.com/rumorsflow/rumors/v2/internal/common"
"github.com/rumorsflow/rumors/v2/pkg/errs"
"sync"
)
type Subscriber struct {
mu sync.Mutex
subs []*redis.PubSub
client redis.UniversalClient
}
func NewSubscriber(rdbMaker common.RedisMaker) (*Subscriber, error) {
client, err := rdbMaker.Make()
if err != nil {
return nil, err
}
return &Subscriber{client: client}, nil
}
func (s *Subscriber) All(ctx context.Context) *redis.PubSub {
return s.pSubscribe(ctx, ChannelPrefix+"*")
}
func (s *Subscriber) Telegram(ctx context.Context) *redis.PubSub {
return s.subscribe(ctx, ChannelTg)
}
func (s *Subscriber) Articles(ctx context.Context) *redis.PubSub {
return s.subscribe(ctx, ChannelArticles)
}
func (s *Subscriber) subscribe(ctx context.Context, channels ...string) *redis.PubSub {
s.mu.Lock()
defer s.mu.Unlock()
sub := s.client.Subscribe(ctx, channels...)
s.subs = append(s.subs, sub)
return sub
}
func (s *Subscriber) pSubscribe(ctx context.Context, channels ...string) *redis.PubSub {
s.mu.Lock()
defer s.mu.Unlock()
sub := s.client.PSubscribe(ctx, channels...)
s.subs = append(s.subs, sub)
return sub
}
func (s *Subscriber) Close() (err error) {
s.mu.Lock()
defer s.mu.Unlock()
for _, sub := range s.subs {
err = errs.Append(err, sub.Close())
}
if err = errs.Append(err, s.client.Close()); err != nil {
return fmt.Errorf("%s %w", OpClose, err)
}
return nil
}