-
Notifications
You must be signed in to change notification settings - Fork 8
/
op_filter.go
74 lines (62 loc) · 1.4 KB
/
op_filter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package mono
import (
"context"
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal"
"github.com/pkg/errors"
)
type filterSubscriber struct {
ctx context.Context
actual reactor.Subscriber
predicate reactor.Predicate
}
type monoFilter struct {
s reactor.RawPublisher
f reactor.Predicate
}
func (m monoFilter) Parent() reactor.RawPublisher {
return m.s
}
func newFilterSubscriber(actual reactor.Subscriber, predicate reactor.Predicate) *filterSubscriber {
return &filterSubscriber{
actual: actual,
predicate: predicate,
}
}
func newMonoFilter(s reactor.RawPublisher, f reactor.Predicate) monoFilter {
return monoFilter{
s: s,
f: f,
}
}
func (f *filterSubscriber) OnComplete() {
f.actual.OnComplete()
}
func (f *filterSubscriber) OnError(err error) {
f.actual.OnError(err)
}
func (f *filterSubscriber) OnNext(v Any) {
defer func() {
rec := recover()
if rec == nil {
return
}
if e, ok := rec.(error); ok {
f.OnError(errors.WithStack(e))
} else {
f.OnError(errors.Errorf("%v", rec))
}
}()
if f.predicate(v) {
f.actual.OnNext(v)
return
}
internal.TryDiscard(f.ctx, v)
}
func (f *filterSubscriber) OnSubscribe(ctx context.Context, s reactor.Subscription) {
f.ctx = ctx
f.actual.OnSubscribe(ctx, s)
}
func (m monoFilter) SubscribeWith(ctx context.Context, s reactor.Subscriber) {
m.s.SubscribeWith(ctx, newFilterSubscriber(s, m.f))
}