-
Notifications
You must be signed in to change notification settings - Fork 0
/
fc.go
165 lines (137 loc) · 3.45 KB
/
fc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package fc
import (
"fmt"
"io"
)
type FC struct {
InputFilters map[string]InputFilter
Filters map[string]Filter
OutputFilters map[string]OutputFilter
}
type BaseFilter interface {
Description() string
setFC(*FC)
}
type baseFilter struct{}
func (baseFilter) setFC(*FC) {}
type InputFilter interface {
BaseFilter
Input(input io.Reader, output interface{}, args ...string) error
}
type Filter interface {
BaseFilter
Filter(input interface{}, args ...string) (interface{}, error)
}
type OutputFilter interface {
BaseFilter
Output(output io.Writer, input interface{}, args ...string) error
}
type Pipeline struct {
fc *FC
inputFilter InputFilter
inputArgs []string
filters []Filter
filterArgs [][]string
outputFilter OutputFilter
outputArgs []string
}
func NewFC() *FC {
return &FC{
InputFilters: make(map[string]InputFilter),
Filters: make(map[string]Filter),
OutputFilters: make(map[string]OutputFilter),
}
}
func (f *FC) AddInputFilter(filter InputFilter, names ...string) error {
for _, name := range names {
if _, ok := f.InputFilters[name]; ok {
return fmt.Errorf("Cannot add InputFilter, filter '%s' already exists", name)
}
}
for _, name := range names {
f.InputFilters[name] = filter
}
filter.setFC(f)
return nil
}
func (f *FC) AddFilter(filter Filter, names ...string) error {
for _, name := range names {
if _, ok := f.Filters[name]; ok {
return fmt.Errorf("Cannot add Filter, filter '%s' already exists", name)
}
}
for _, name := range names {
f.Filters[name] = filter
}
filter.setFC(f)
return nil
}
func (f *FC) AddOutputFilter(filter OutputFilter, names ...string) error {
for _, name := range names {
if _, ok := f.OutputFilters[name]; ok {
return fmt.Errorf("Cannot add OutputFilter, filter '%s' already exists", name)
}
}
for _, name := range names {
f.OutputFilters[name] = filter
}
filter.setFC(f)
return nil
}
func (f *FC) NewPipeline() *Pipeline {
return &Pipeline{
fc: f,
}
}
func (f *FC) GetInputFilter(name string) (filter InputFilter, err error) {
var ok bool
filter, ok = f.InputFilters[name]
if !ok {
err = fmt.Errorf("Unknown input filter '%s'", name)
}
return
}
func (p *Pipeline) SetInputFilter(inputFilter string, args ...string) error {
var ok bool
if p.inputFilter, ok = p.fc.InputFilters[inputFilter]; !ok {
return fmt.Errorf("Unknown InputFilter '%s'", inputFilter)
}
p.inputArgs = args
return nil
}
func (p *Pipeline) SetOutputFilter(outputFilter string, args ...string) error {
var ok bool
if p.outputFilter, ok = p.fc.OutputFilters[outputFilter]; !ok {
return fmt.Errorf("Unknown OutputFilter '%s'", outputFilter)
}
p.outputArgs = args
return nil
}
func (p *Pipeline) AddFilter(filterName string, args ...string) error {
var (
f Filter
ok bool
)
if f, ok = p.fc.Filters[filterName]; !ok {
return fmt.Errorf("Unknown Filter '%s'", filterName)
}
p.filters = append(p.filters, f)
p.filterArgs = append(p.filterArgs, args)
return nil
}
func (p *Pipeline) Process(in io.Reader, out io.Writer) error {
var (
data interface{}
err error
)
if err = p.inputFilter.Input(in, &data, p.inputArgs...); err != nil {
return fmt.Errorf("Error while processing input: %s", err)
}
for k, filter := range p.filters {
if data, err = filter.Filter(data, p.filterArgs[k]...); err != nil {
return fmt.Errorf("Error while processing filter: %s", err)
}
}
return p.outputFilter.Output(out, data, p.outputArgs...)
}
var DefaultFC = NewFC()