-
Notifications
You must be signed in to change notification settings - Fork 1
/
observable.go
101 lines (86 loc) · 1.86 KB
/
observable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package utils
import (
"context"
"sync"
"sync/atomic"
)
// Observable is a thread-safe event helper
type Observable struct {
subscribers *sync.Map
idCount int64
}
// Event of the observale
type Event interface{}
// NewObservable creates a new observable
func NewObservable() *Observable {
return &Observable{
subscribers: &sync.Map{},
}
}
// Publish event to all subscribers, no internal goroutine is used,
// so the publish can block the goroutine. Use goroutine or buffer to prevent the blocking.
func (o *Observable) Publish(e Event) {
o.subscribers.Range(func(_, s interface{}) (goOn bool) {
goOn = true
defer func() { _ = recover() }()
s.(*Subscriber).C <- e
return
})
}
// Subscribe returns a subscriber to emit events
func (o *Observable) Subscribe() *Subscriber {
id := atomic.AddInt64(&o.idCount, 1)
subscriber := &Subscriber{
C: make(chan Event),
id: id,
}
o.subscribers.Store(id, subscriber)
return subscriber
}
// Unsubscribe from the observable
func (o *Observable) Unsubscribe(s *Subscriber) {
close(s.C)
o.subscribers.Delete(s.id)
}
// Count returns the number of subscribers
func (o *Observable) Count() int {
c := 0
o.subscribers.Range(func(key, value interface{}) bool {
c++
return true
})
return c
}
// Until check returns true keep waiting
func (o *Observable) Until(ctx context.Context, check func(Event) bool) (Event, error) {
s := o.Subscribe()
defer o.Unsubscribe(s)
for {
select {
case e := <-s.C:
if check(e) {
return e, nil
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
}
// Subscriber of the observable
type Subscriber struct {
C chan Event
id int64
}
// Filter events
func (s *Subscriber) Filter(filter func(Event) bool) chan Event {
filtered := make(chan Event)
go func() {
for e := range s.C {
if filter(e) {
filtered <- e
}
}
close(filtered)
}()
return filtered
}