forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
console.go
145 lines (120 loc) · 3.12 KB
/
console.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package console
import (
"bufio"
"fmt"
"os"
"runtime"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
"github.com/elastic/beats/libbeat/outputs/codec/json"
"github.com/elastic/beats/libbeat/publisher"
)
type console struct {
out *os.File
observer outputs.Observer
writer *bufio.Writer
codec codec.Codec
index string
}
type consoleEvent struct {
Timestamp time.Time `json:"@timestamp" struct:"@timestamp"`
// Note: stdlib json doesn't support inlining :( -> use `codec: 2`, to generate proper event
Fields interface{} `struct:",inline"`
}
func init() {
outputs.RegisterType("console", makeConsole)
}
func makeConsole(
beat beat.Info,
observer outputs.Observer,
cfg *common.Config,
) (outputs.Group, error) {
config := defaultConfig
err := cfg.Unpack(&config)
if err != nil {
return outputs.Fail(err)
}
var enc codec.Codec
if config.Codec.Namespace.IsSet() {
enc, err = codec.CreateEncoder(beat, config.Codec)
if err != nil {
return outputs.Fail(err)
}
} else {
enc = json.New(config.Pretty, beat.Version)
}
index := beat.Beat
c, err := newConsole(index, observer, enc)
if err != nil {
return outputs.Fail(fmt.Errorf("console output initialization failed with: %v", err))
}
// check stdout actually being available
if runtime.GOOS != "windows" {
if _, err = c.out.Stat(); err != nil {
err = fmt.Errorf("console output initialization failed with: %v", err)
return outputs.Fail(err)
}
}
return outputs.Success(config.BatchSize, 0, c)
}
func newConsole(index string, observer outputs.Observer, codec codec.Codec) (*console, error) {
c := &console{out: os.Stdout, codec: codec, observer: observer, index: index}
c.writer = bufio.NewWriterSize(c.out, 8*1024)
return c, nil
}
func (c *console) Close() error { return nil }
func (c *console) Publish(batch publisher.Batch) error {
st := c.observer
events := batch.Events()
st.NewBatch(len(events))
dropped := 0
for i := range events {
ok := c.publishEvent(&events[i])
if !ok {
dropped++
}
}
c.writer.Flush()
batch.ACK()
st.Dropped(dropped)
st.Acked(len(events) - dropped)
return nil
}
var nl = []byte("\n")
func (c *console) publishEvent(event *publisher.Event) bool {
serializedEvent, err := c.codec.Encode(c.index, &event.Content)
if err != nil {
if !event.Guaranteed() {
return false
}
logp.Critical("Unable to encode event: %v", err)
return false
}
if err := c.writeBuffer(serializedEvent); err != nil {
c.observer.WriteError(err)
logp.Critical("Unable to publish events to console: %v", err)
return false
}
if err := c.writeBuffer(nl); err != nil {
c.observer.WriteError(err)
logp.Critical("Error when appending newline to event: %v", err)
return false
}
c.observer.WriteBytes(len(serializedEvent) + 1)
return true
}
func (c *console) writeBuffer(buf []byte) error {
written := 0
for written < len(buf) {
n, err := c.writer.Write(buf[written:])
if err != nil {
return err
}
written += n
}
return nil
}