-
Notifications
You must be signed in to change notification settings - Fork 50
/
output_channel.go
61 lines (48 loc) · 1.21 KB
/
output_channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package stream
import (
"sync"
"github.com/justtrackio/gosoline/pkg/log"
)
type OutputChannel interface {
Read() ([]WritableMessage, bool)
Write(msg []WritableMessage)
Close()
}
type outputChannel struct {
logger log.Logger
ch chan []WritableMessage
closed bool
lck sync.RWMutex
}
func NewOutputChannel(logger log.Logger, bufferSize int) OutputChannel {
return &outputChannel{
logger: logger,
ch: make(chan []WritableMessage, bufferSize),
}
}
func (c *outputChannel) Read() ([]WritableMessage, bool) {
msg, ok := <-c.ch
return msg, ok
}
func (c *outputChannel) Write(msg []WritableMessage) {
c.lck.RLock()
defer c.lck.RUnlock()
if c.closed {
// this can happen if we still get some traffic while everything is already shutting down.
// this is okay as far as the producer daemon is concerned, if your data can't handle this,
// you can't use the producer daemon anyway
c.logger.Warn("dropped batch of %d messages: channel is already closed", len(msg))
return
}
c.ch <- msg
}
func (c *outputChannel) Close() {
c.lck.Lock()
defer c.lck.Unlock()
if !c.closed {
c.closed = true
close(c.ch)
} else {
c.logger.Warn("duplicate close to output channel: channel is already closed")
}
}