-
Notifications
You must be signed in to change notification settings - Fork 8
/
wrapper_utils.go
53 lines (45 loc) · 1.05 KB
/
wrapper_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package mono
import (
"context"
"github.com/jjeffcaii/reactor-go"
"github.com/jjeffcaii/reactor-go/internal/subscribers"
"github.com/jjeffcaii/reactor-go/scheduler"
)
func IsSubscribeAsync(m Mono) bool {
var publisher reactor.RawPublisher
switch t := m.(type) {
case wrapper:
publisher = t.RawPublisher
case *oneshotWrapper:
publisher = t.RawPublisher
default:
return false
}
var sc scheduler.Scheduler
switch pub := publisher.(type) {
case monoScheduleOn:
sc = pub.sc
case *monoScheduleOn:
sc = pub.sc
default:
return false
}
return scheduler.IsParallel(sc) || scheduler.IsElastic(sc) || scheduler.IsSingle(sc)
}
func block(ctx context.Context, publisher reactor.RawPublisher) (Any, error) {
s := subscribers.BorrowBlockSubscriber()
defer subscribers.ReturnBlockSubscriber(s)
publisher.SubscribeWith(ctx, s)
<-s.Done()
if s.E != nil {
return nil, s.E
}
return s.V, nil
}
func mustProcessor(publisher reactor.RawPublisher) rawProcessor {
rp, ok := publisher.(rawProcessor)
if !ok {
panic(errNotProcessor)
}
return rp
}