-
Notifications
You must be signed in to change notification settings - Fork 51
/
channel.go
52 lines (44 loc) · 1.17 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package signalflow
import (
"context"
"sync"
"github.com/signalfx/signalfx-go/signalflow/messages"
)
// Channel is a queue of messages that all pertain to the same computation.
type Channel struct {
sync.Mutex
name string
messages chan messages.Message
ctx context.Context
cancel context.CancelFunc
}
func newChannel(ctx context.Context, name string) *Channel {
chanCtx, cancel := context.WithCancel(ctx)
c := &Channel{
name: name,
messages: make(chan messages.Message),
ctx: chanCtx,
cancel: cancel,
}
return c
}
// AcceptMessage from a websocket. This might block if nothing is reading from
// the channel but generally a computation should always be doing so.
func (c *Channel) AcceptMessage(msg messages.Message) {
select {
case c.messages <- msg:
case <-c.ctx.Done():
return
}
}
// Messages returns a Go chan that will be pushed all of the deserialized
// SignalFlow messages from the websocket.
func (c *Channel) Messages() <-chan messages.Message {
return c.messages
}
// Close the channel. This does not actually stop a job in SignalFlow, for
// that use Computation.Stop().
func (c *Channel) Close() {
c.cancel()
close(c.messages)
}