forked from redpanda-data/connect
-
Notifications
You must be signed in to change notification settings - Fork 1
/
function_set.go
208 lines (185 loc) · 6.19 KB
/
function_set.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package query
import (
"errors"
"fmt"
"regexp"
"sort"
)
// FunctionSet contains an explicit set of functions to be available in a
// Bloblang query.
type FunctionSet struct {
disableCtors bool
constructors map[string]FunctionCtor
specs map[string]FunctionSpec
}
// NewFunctionSet creates a function set without any functions in it.
func NewFunctionSet() *FunctionSet {
return &FunctionSet{
constructors: map[string]FunctionCtor{},
specs: map[string]FunctionSpec{},
}
}
var (
nameRegexpRaw = `^[a-z0-9]+(_[a-z0-9]+)*$`
nameRegexp = regexp.MustCompile(nameRegexpRaw)
)
// Add a new function to this set by providing a spec (name and documentation),
// a constructor to be called for each instantiation of the function, and
// information regarding the arguments of the function.
func (f *FunctionSet) Add(spec FunctionSpec, ctor FunctionCtor) error {
if !nameRegexp.MatchString(spec.Name) {
return fmt.Errorf("function name '%v' does not match the required regular expression /%v/", spec.Name, nameRegexpRaw)
}
if err := spec.Params.validate(); err != nil {
return err
}
f.constructors[spec.Name] = ctor
f.specs[spec.Name] = spec
return nil
}
// Docs returns a slice of function specs, which document each function.
func (f *FunctionSet) Docs() []FunctionSpec {
specSlice := make([]FunctionSpec, 0, len(f.specs))
for _, v := range f.specs {
specSlice = append(specSlice, v)
}
sort.Slice(specSlice, func(i, j int) bool {
return specSlice[i].Name < specSlice[j].Name
})
return specSlice
}
// Params attempts to obtain an argument specification for a given function.
func (f *FunctionSet) Params(name string) (Params, error) {
spec, exists := f.specs[name]
if !exists {
return VariadicParams(), badFunctionErr(name)
}
return spec.Params, nil
}
// Init attempts to initialize a function of the set by name and zero or more
// arguments.
func (f *FunctionSet) Init(name string, args *ParsedParams) (Function, error) {
ctor, exists := f.constructors[name]
if !exists {
return nil, badFunctionErr(name)
}
if f.disableCtors {
return disabledFunction(name), nil
}
return wrapCtorWithDynamicArgs(name, args, ctor)
}
// Without creates a clone of the function set that can be mutated in isolation,
// where a variadic list of functions will be excluded from the set.
func (f *FunctionSet) Without(functions ...string) *FunctionSet {
excludeMap := make(map[string]struct{}, len(functions))
for _, k := range functions {
excludeMap[k] = struct{}{}
}
constructors := make(map[string]FunctionCtor, len(f.constructors))
for k, v := range f.constructors {
if _, exists := excludeMap[k]; !exists {
constructors[k] = v
}
}
specs := map[string]FunctionSpec{}
for _, v := range f.specs {
if _, exists := excludeMap[v.Name]; !exists {
specs[v.Name] = v
}
}
return &FunctionSet{disableCtors: f.disableCtors, constructors: constructors, specs: specs}
}
// OnlyPure creates a clone of the function set that can be mutated in
// isolation, where all impure functions are removed.
func (f *FunctionSet) OnlyPure() *FunctionSet {
var excludes []string
for _, v := range f.specs {
if v.Impure {
excludes = append(excludes, v.Name)
}
}
return f.Without(excludes...)
}
// NoMessage creates a clone of the function set that can be mutated in
// isolation, where all message access functions are removed.
func (f *FunctionSet) NoMessage() *FunctionSet {
var excludes []string
for _, v := range f.specs {
if v.Category == FunctionCategoryMessage {
excludes = append(excludes, v.Name)
}
}
return f.Without(excludes...)
}
// Deactivated returns a version of the function set where constructors are
// disabled, allowing mappings to be parsed and validated but not executed.
//
// The underlying register of functions is shared with the target set, and
// therefore functions added to this set will also be added to the still
// activated set. Use the Without method (with empty args if applicable) in
// order to create a deep copy of the set that is independent of the source.
func (f *FunctionSet) Deactivated() *FunctionSet {
newSet := *f
newSet.disableCtors = true
return &newSet
}
//------------------------------------------------------------------------------
// AllFunctions is a set containing every single function declared by this
// package, and any globally declared plugin methods.
var AllFunctions = NewFunctionSet()
func registerFunction(spec FunctionSpec, ctor FunctionCtor) struct{} {
if err := AllFunctions.Add(spec, func(args *ParsedParams) (Function, error) {
return ctor(args)
}); err != nil {
panic(err)
}
return struct{}{}
}
func registerSimpleFunction(spec FunctionSpec, fn func(ctx FunctionContext) (any, error)) struct{} {
if err := AllFunctions.Add(spec, func(*ParsedParams) (Function, error) {
return ClosureFunction("function "+spec.Name, fn, nil), nil
}); err != nil {
panic(err)
}
return struct{}{}
}
// InitFunctionHelper attempts to initialise a function by its name and a list
// of arguments, this is convenient for writing tests.
func InitFunctionHelper(name string, args ...any) (Function, error) {
spec, ok := AllFunctions.specs[name]
if !ok {
return nil, badFunctionErr(name)
}
parsedArgs, err := spec.Params.PopulateNameless(args...)
if err != nil {
return nil, err
}
return AllFunctions.Init(name, parsedArgs)
}
// FunctionDocs returns a slice of specs, one for each function.
func FunctionDocs() []FunctionSpec {
return AllFunctions.Docs()
}
//------------------------------------------------------------------------------
func disabledFunction(name string) Function {
return ClosureFunction("function "+name, func(ctx FunctionContext) (any, error) {
return nil, errors.New("this function has been disabled")
}, func(ctx TargetsContext) (TargetsContext, []TargetPath) { return ctx, nil })
}
func wrapCtorWithDynamicArgs(name string, args *ParsedParams, fn FunctionCtor) (Function, error) {
fns := args.dynamic()
if len(fns) == 0 {
return fn(args)
}
return ClosureFunction("function "+name, func(ctx FunctionContext) (any, error) {
newArgs, err := args.ResolveDynamic(ctx)
if err != nil {
return nil, err
}
dynFunc, err := fn(newArgs)
if err != nil {
return nil, err
}
return dynFunc.Exec(ctx)
}, aggregateTargetPaths(fns...)), nil
}