-
Notifications
You must be signed in to change notification settings - Fork 0
/
flowconfig.go
130 lines (120 loc) · 3.16 KB
/
flowconfig.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package youtubetoolkit
import (
"encoding/csv"
"encoding/json"
"fmt"
"io"
"strings"
"text/tabwriter"
)
type FlowOption func(*flowconfig)
type flowconfig struct {
stringSource func(errors chan<- error) <-chan string
itemSink func(errors chan<- error, input <-chan Item)
}
// SingleStringSource sets the source to only emit the param input string.
func SingleStringSource(input string) FlowOption {
return func(ic *flowconfig) {
ic.stringSource = func(errors chan<- error) <-chan string {
output := make(chan string, 1)
output <- input
close(output)
return output
}
}
}
// CSVFirstFieldOnlySource sets a CSV reader as source and emit only the
// first field/column (or the entire line if the input isn't a proper CSV)
func CSVFirstFieldOnlySource(input io.Reader) FlowOption {
return func(ic *flowconfig) {
ic.stringSource = func(errors chan<- error) <-chan string {
output := make(chan string)
go func() {
reader := csv.NewReader(input)
for {
record, err := reader.Read()
if err != nil {
if err == io.EOF {
close(output)
return
}
errors <- fmt.Errorf("csv read error: %w", err)
} else {
output <- record[0]
}
}
}()
return output
}
}
}
// CSVSink sets a CSV writer as sink. The columns param selects the fields of Item
// to be written in the CSV output.
func CSVSink(output io.Writer, columns *[]string) FlowOption {
return func(ic *flowconfig) {
ic.itemSink = func(errors chan<- error, input <-chan Item) {
w := csv.NewWriter(output)
for item := range input {
err := w.Write(item.AsRecord(columns))
if err != nil {
// FIXME fatal?
errors <- fmt.Errorf("csv write error: %w", err)
}
}
w.Flush()
if w.Error() != nil {
errors <- fmt.Errorf("csv write error: %w", w.Error())
}
}
}
}
// TableSink sets a text/tabwriter as sink for a human readable output.
// The columns param selects the fields of Item to be written in the output.
func TableSink(output io.Writer, columns *[]string) FlowOption {
return func(ic *flowconfig) {
ic.itemSink = func(errors chan<- error, input <-chan Item) {
w := tabwriter.NewWriter(output, 0, 8, 2, ' ', 0)
for item := range input {
_, err := fmt.Fprintln(w, strings.Join(item.AsRecord(columns), "\t"))
if err != nil {
// FIXME fatal?
errors <- fmt.Errorf("table write error: %w", err)
}
}
err := w.Flush()
if err != nil {
errors <- fmt.Errorf("table write error: %w", err)
}
}
}
}
// NullSink sets a discard sink.
func NullSink() FlowOption {
return func(ic *flowconfig) {
ic.itemSink = func(errors chan<- error, input <-chan Item) {
for range input {
// noop
}
}
}
}
// JSONLinesSink sets a JSON Lines as sink.
func JSONLinesSink(output io.Writer) FlowOption {
return func(ic *flowconfig) {
ic.itemSink = func(errors chan<- error, input <-chan Item) {
enc := json.NewEncoder(output)
for i := range input {
if err := enc.Encode(i); err != nil {
errors <- fmt.Errorf("jsonl write error: %w", err)
}
}
}
}
}
func options2flowconfig(cfgs ...FlowOption) flowconfig {
var cfg flowconfig
for _, c := range cfgs {
c(&cfg)
}
return cfg
}