/
block_subscriber.go
70 lines (62 loc) · 1.25 KB
/
block_subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package mono
import (
"context"
"github.com/jjeffcaii/reactor-go"
"github.com/rsocket/rsocket-go/internal/common"
"github.com/rsocket/rsocket-go/payload"
)
type blockSubscriber struct {
done chan struct{}
vchan chan<- payload.Payload
echan chan<- error
}
func newBlockSubscriber(
done chan struct{},
vchan chan<- payload.Payload,
echan chan<- error,
) reactor.Subscriber {
return blockSubscriber{
done: done,
vchan: vchan,
echan: echan,
}
}
func (b blockSubscriber) OnComplete() {
select {
case <-b.done:
default:
_ = common.SafeCloseDoneChan(b.done)
}
}
func (b blockSubscriber) OnError(err error) {
select {
case <-b.done:
default:
if common.SafeCloseDoneChan(b.done) {
b.echan <- err
}
}
}
func (b blockSubscriber) OnNext(any reactor.Any) {
select {
case <-b.done:
default:
if r, ok := any.(common.Releasable); ok {
r.IncRef()
}
b.vchan <- any.(payload.Payload)
}
}
func (b blockSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
// workaround: watch context
if ctx != context.Background() && ctx != context.TODO() {
go func() {
select {
case <-ctx.Done():
b.OnError(reactor.ErrSubscribeCancelled)
case <-b.done:
}
}()
}
subscription.Request(reactor.RequestInfinite)
}