/
output_stdout.go
92 lines (75 loc) · 1.72 KB
/
output_stdout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package gallon
import (
"context"
"encoding/json"
"fmt"
"github.com/go-logr/logr"
"gopkg.in/yaml.v3"
)
type OutputPluginStdout struct {
logger logr.Logger
deserialize func(interface{}) ([]byte, error)
}
func NewOutputPluginStdout(
deserialize func(interface{}) ([]byte, error),
) *OutputPluginStdout {
return &OutputPluginStdout{
deserialize: deserialize,
}
}
var _ OutputPlugin = &OutputPluginStdout{}
func (p *OutputPluginStdout) ReplaceLogger(logger logr.Logger) {
p.logger = logger
}
func (p *OutputPluginStdout) Load(
ctx context.Context,
messages chan interface{},
errs chan error,
) error {
loadedTotal := 0
loop:
for {
select {
case <-ctx.Done():
println("done in load")
break loop
case msgs, ok := <-messages:
if !ok {
break loop
}
msgSlice := msgs.([]interface{})
for _, msg := range msgSlice {
bs, err := p.deserialize(msg)
if err != nil {
errs <- fmt.Errorf("failed to deserialize message: %v (error: %w)", msg, err)
continue
}
p.logger.Info(string(bs))
}
if len(msgSlice) > 0 {
loadedTotal += len(msgSlice)
p.logger.Info(fmt.Sprintf("loaded %v records", loadedTotal))
}
}
}
p.logger.Info(fmt.Sprintf("loaded total: %v", loadedTotal))
return nil
}
type OutputPluginStdoutConfig struct {
Format string `yaml:"format"`
}
func NewOutputPluginStdoutFromConfig(configYml []byte) (*OutputPluginStdout, error) {
var config OutputPluginStdoutConfig
if err := yaml.Unmarshal(configYml, &config); err != nil {
return nil, err
}
return NewOutputPluginStdout(
func(msg interface{}) ([]byte, error) {
if config.Format == "json" {
return json.Marshal(msg)
} else {
return []byte(fmt.Sprintf("%v", msg)), nil
}
},
), nil
}