/
pubsub_redis.go
56 lines (47 loc) · 1.19 KB
/
pubsub_redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// Copyright 2019 The OpenPitrix Authors. All rights reserved.
// Use of this source code is governed by a Apache license
// that can be found in the LICENSE file.
package redis
import (
"github.com/go-redis/redis"
i "openpitrix.io/libqueue"
)
type RedisPubSub struct {
RedisClient
Channel string
}
func (rPubSub *RedisPubSub) SetClient(iClient *i.IClient) i.IPubSub {
rPubSub.RedisClient = (*iClient).(RedisClient)
return rPubSub
}
func (rPubSub *RedisPubSub) SetChannel(channel string) i.IPubSub {
rPubSub.Channel = channel
return rPubSub
}
func (rPubSub *RedisPubSub) Publish(msg string) error {
err := (rPubSub.Client).Publish(rPubSub.Channel, msg).Err()
if err != nil {
panic(err)
}
return nil
}
func (rPubSub *RedisPubSub) ReceiveMessage() chan string {
var msgChan = make(chan string, 255)
pubsub := (rPubSub.Client).PSubscribe(rPubSub.Channel)
go rPubSub.getMessages(pubsub, msgChan)
return msgChan
}
func (rPubSub *RedisPubSub) getMessages(ps *redis.PubSub, msgChan chan string) {
ch := ps.Channel()
for msg := range ch {
msgChan <- msg.Payload
}
}
func (rPubSub *RedisPubSub) Close() error {
err := rPubSub.Client.Close()
if err != nil {
return err
} else {
return nil
}
}