-
Notifications
You must be signed in to change notification settings - Fork 11
/
async_subject.go
77 lines (64 loc) · 1.5 KB
/
async_subject.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package co
import (
syncx "go.tempura.ink/co/internal/syncx"
)
type AsyncSubject[R any] struct {
*asyncSequence[R]
latestDataCh chan *data[R]
sourceEnded bool
}
func NewAsyncSubject[R any]() *AsyncSubject[R] {
a := &AsyncSubject[R]{
latestDataCh: make(chan *data[R]),
}
a.asyncSequence = NewAsyncSequence[R](a)
return a
}
func (c *AsyncSubject[R]) Next(val R) *AsyncSubject[R] {
syncx.SafeGo(func() {
syncx.SafeNRead(c.latestDataCh)
c.latestDataCh <- NewDataWith(val, nil)
})
return c
}
func (c *AsyncSubject[R]) Error(err error) *AsyncSubject[R] {
syncx.SafeGo(func() {
syncx.SafeNRead(c.latestDataCh)
c.latestDataCh <- NewDataWith(*new(R), err)
})
return c
}
func (c *AsyncSubject[R]) Complete() *AsyncSubject[R] {
c.sourceEnded = true
syncx.SafeClose(c.latestDataCh)
return c
}
func (c *AsyncSubject[R]) iterator() Iterator[R] {
it := &asyncSubjectIterator[R]{
AsyncSubject: c,
}
it.asyncSequenceIterator = NewAsyncSequenceIterator[R](it)
return it
}
type asyncSubjectIterator[R any] struct {
*asyncSequenceIterator[R]
*AsyncSubject[R]
}
func (it *asyncSubjectIterator[R]) next() *Optional[R] {
if it.sourceEnded {
return NewOptionalEmpty[R]()
}
for data, ok := <-it.latestDataCh; ok; data, ok = <-it.latestDataCh {
if data.err != nil {
it.handleError(data.GetError())
if it.errorMode.shouldSkip() {
continue
}
if it.errorMode.shouldStop() {
return NewOptionalEmpty[R]()
}
}
return OptionalOf(data.GetValue())
}
return NewOptionalEmpty[R]()
}