-
Notifications
You must be signed in to change notification settings - Fork 0
/
stdstream.go
132 lines (117 loc) · 2.59 KB
/
stdstream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package stdstream
import (
"bufio"
"context"
"encoding/json"
"fmt"
"github.com/saylorsolutions/nomlog/pkg/entries"
"github.com/saylorsolutions/nomlog/pkg/iterator"
"github.com/saylorsolutions/nomlog/plugin"
"github.com/saylorsolutions/nomlog/runtime/dsl"
"os"
)
func Plugin() plugin.Plugin {
return new(stdplugin)
}
var _ plugin.Plugin = (*stdplugin)(nil)
type stdplugin struct {
}
func (s *stdplugin) ID() string {
return "std"
}
func (s *stdplugin) Register(reg *plugin.Registration) {
reg.RegisterSource("std", "In", SourceIn)
reg.DocumentSource("std", "In", `std.In
Reads each line of STDIN as a log entry. The input may be a valid JSON object, or completely unstructured.`)
reg.RegisterSink("std", "Out", SinkOut)
reg.DocumentSink("std", "Out", `std.Out
Writes each log entry as a line to STDOUT.`)
reg.RegisterSink("std", "Err", SinkErr)
reg.DocumentSink("std", "Err", `std.Err
Writes each log entry as a line to STDERR.`)
}
func (s *stdplugin) Stopping() error {
return nil
}
func SourceIn(ctx context.Context, _ ...*dsl.Arg) (iterator.Iterator, error) {
ch := make(chan entries.LogEntry)
go func() {
defer func() {
close(ch)
}()
scanner := bufio.NewScanner(os.Stdin)
var hasClosed bool
go func() {
<-ctx.Done()
hasClosed = true
}()
for scanner.Scan() {
if hasClosed {
return
}
line := scanner.Text()
entry := entries.FromString(line)
ch <- entry
}
}()
return iterator.FromChannel(ch), nil
}
func jsonify(entry entries.LogEntry) (string, error) {
data, err := json.Marshal(entry)
if err != nil {
return "", err
}
return string(data), nil
}
func SinkOut(ctx context.Context, src iterator.Iterator, _ ...*dsl.Arg) error {
var hasCancelled bool
go func() {
<-ctx.Done()
hasCancelled = true
}()
err := src.Iterate(func(entry entries.LogEntry, i int) error {
if hasCancelled {
return iterator.ErrAtEnd
}
str, err := jsonify(entry)
if err != nil {
return err
}
_, err = fmt.Fprintf(os.Stdout, "%s\n", str)
if err != nil {
return err
}
return nil
})
if err != nil {
iterator.Drain(src)
return err
}
return nil
}
func SinkErr(ctx context.Context, src iterator.Iterator, _ ...*dsl.Arg) error {
var hasCancelled bool
go func() {
<-ctx.Done()
hasCancelled = true
}()
err := src.Iterate(func(entry entries.LogEntry, i int) error {
if hasCancelled {
return iterator.ErrAtEnd
}
str, err := jsonify(entry)
if err != nil {
return err
}
_, err = fmt.Fprintf(os.Stderr, "%s\n", str)
if err != nil {
return err
}
return nil
})
if err != nil {
iterator.Drain(src)
return err
}
return nil
}