-
Notifications
You must be signed in to change notification settings - Fork 159
/
chan.go
48 lines (38 loc) · 1.08 KB
/
chan.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package extension
import (
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)
// ChanSource represents an inbound connector that creates a stream of
// elements from a channel.
type ChanSource struct {
in chan any
}
var _ streams.Source = (*ChanSource)(nil)
// NewChanSource returns a new ChanSource connector.
func NewChanSource(in chan any) *ChanSource {
return &ChanSource{in}
}
// Via streams data to a specified operator and returns it.
func (cs *ChanSource) Via(operator streams.Flow) streams.Flow {
flow.DoStream(cs, operator)
return operator
}
// Out returns the output channel of the ChanSource connector.
func (cs *ChanSource) Out() <-chan any {
return cs.in
}
// ChanSink represents an outbound connector that writes streaming data
// to a channel.
type ChanSink struct {
Out chan any
}
var _ streams.Sink = (*ChanSink)(nil)
// NewChanSink returns a new ChanSink connector.
func NewChanSink(out chan any) *ChanSink {
return &ChanSink{out}
}
// In returns the input channel of the ChanSink connector.
func (ch *ChanSink) In() chan<- any {
return ch.Out
}