-
Notifications
You must be signed in to change notification settings - Fork 106
/
pubsub.go
69 lines (56 loc) · 1.63 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package full
import (
"github.com/eapache/channels"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
tmtypes "github.com/tendermint/tendermint/types"
)
var _ tmtypes.Subscription = (*tendermintPubsubBuffer)(nil)
// tendermintPubsubBuffer is a wrapper around tendermint subscriptions.
// Because unbuffered subscriptions are dangerous and can lead to deadlocks
// if they're not drained, this wrapper shunts all events into its own buffer.
type tendermintPubsubBuffer struct {
messageBuffer *channels.InfiniteChannel
tmSubscription tmtypes.Subscription
outCh chan tmpubsub.Message
cancelCh chan struct{}
}
func newTendermintPubsubBuffer(tmSubscription tmtypes.Subscription) *tendermintPubsubBuffer {
ps := &tendermintPubsubBuffer{
messageBuffer: channels.NewInfiniteChannel(),
tmSubscription: tmSubscription,
outCh: make(chan tmpubsub.Message),
cancelCh: make(chan struct{}),
}
go ps.reader()
go ps.writer()
return ps
}
func (ps *tendermintPubsubBuffer) Out() <-chan tmpubsub.Message {
return ps.outCh
}
func (ps *tendermintPubsubBuffer) Cancelled() <-chan struct{} {
return ps.cancelCh
}
func (ps *tendermintPubsubBuffer) Err() error {
return ps.tmSubscription.Err()
}
func (ps *tendermintPubsubBuffer) reader() {
defer close(ps.cancelCh)
defer ps.messageBuffer.Close()
for {
select {
case msg, ok := <-ps.tmSubscription.Out():
if !ok {
return
}
ps.messageBuffer.In() <- &msg
case <-ps.tmSubscription.Cancelled():
return
}
}
}
func (ps *tendermintPubsubBuffer) writer() {
for msg := range ps.messageBuffer.Out() {
ps.outCh <- *(msg.(*tmpubsub.Message))
}
}