forked from agtorre/go-cookbook
-
Notifications
You must be signed in to change notification settings - Fork 0
/
exec.go
33 lines (29 loc) · 767 Bytes
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package reactive
import (
"github.com/reactivex/rxgo/iterable"
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
"github.com/reactivex/rxgo/subscription"
)
// Exec connects rxgo and returns
// our results side-effect + a subscription
// channel to block on at the end
func Exec() (Results, <-chan subscription.Subscription) {
results := make(Results)
watcher := observer.Observer{
NextHandler: func(item interface{}) {
wine, ok := item.(Wine)
if ok {
result := results[wine.Age]
result.SumRating += wine.Rating
result.NumSamples++
results[wine.Age] = result
}
},
}
wine := GetWine()
it, _ := iterable.New(wine)
source := observable.From(it)
sub := source.Subscribe(watcher)
return results, sub
}