-
Notifications
You must be signed in to change notification settings - Fork 8
/
mono_timeout.go
86 lines (73 loc) · 1.79 KB
/
mono_timeout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package mono
import (
"context"
"math"
"sync/atomic"
"time"
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/hooks"
)
type monoTimeout struct {
source reactor.RawPublisher
timeout time.Duration
}
func newMonoTimeout(source reactor.RawPublisher, timeout time.Duration) *monoTimeout {
return &monoTimeout{
source: source,
timeout: timeout,
}
}
func (m *monoTimeout) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.source.SubscribeWith(ctx, &timeoutSubscriber{
actual: s,
timeout: m.timeout,
done: make(chan struct{}),
})
}
func (m *monoTimeout) Parent() reactor.RawPublisher {
return m.source
}
type timeoutSubscriber struct {
actual reactor.Subscriber
timeout time.Duration
done chan struct{}
closed int32
}
func (t *timeoutSubscriber) OnComplete() {
if atomic.CompareAndSwapInt32(&t.closed, 0, math.MaxInt32) || atomic.CompareAndSwapInt32(&t.closed, 1, math.MaxInt32) {
close(t.done)
t.actual.OnComplete()
}
}
func (t *timeoutSubscriber) OnError(err error) {
if atomic.CompareAndSwapInt32(&t.closed, 0, -1) {
close(t.done)
t.actual.OnError(err)
return
}
// item is emitted before error reach, should be processed as completed.
if atomic.CompareAndSwapInt32(&t.closed, 1, -1) {
close(t.done)
t.actual.OnComplete()
}
hooks.Global().OnErrorDrop(err)
}
func (t *timeoutSubscriber) OnNext(any reactor.Any) {
if atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
t.actual.OnNext(any)
} else {
hooks.Global().OnNextDrop(any)
}
}
func (t *timeoutSubscriber) OnSubscribe(ctx context.Context, subscription reactor.Subscription) {
timer := time.NewTimer(t.timeout)
go func() {
defer timer.Stop()
select {
case <-timer.C:
t.OnError(reactor.ErrSubscribeCancelled)
case <-t.done:
}
}()
t.actual.OnSubscribe(ctx, subscription)
}