-
-
Notifications
You must be signed in to change notification settings - Fork 13
/
loader.go
380 lines (312 loc) · 9.53 KB
/
loader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
package machine
import (
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/mitchellh/mapstructure"
)
const (
streamConst = "stream"
subscriptionConst = "subscription"
httpConst = "http"
websocketConst = "websocket"
)
var (
pluginProviders = map[string]PluginProvider{}
)
// PluginProvider interface for providing a way of loading plugins
// must return one of the following types:
//
// Subscription
// Retriever
// Applicative
// Fold
// Fork
// Publisher
type PluginProvider interface {
Load(*PluginDefinition) (interface{}, error)
}
// PluginDefinition type for declaring the path and symbol for a golang plugin containing the Provider
type PluginDefinition struct {
// Type is the name of the PluginProvider to use.
Type string `json:"type" mapstructure:"type"`
// Payload is the location, script, etc provided to load the plugin.
// Depends on the PluginProvider.
Payload string `json:"payload" mapstructure:"payload"`
// Symbol is the name of the symbol to be loaded from the plugin.
Symbol string `json:"symbol" mapstructure:"symbol"`
// Attributes are a map[string]interface{} of properties to be used with the PluginProvider.
Attributes map[string]interface{} `json:"attributes" mapstructure:"attributes"`
}
// StreamSerialization config based definition for a stream
type StreamSerialization struct {
// Type type of stream to create.
//
// For root serializations valid values are 'http', 'subscription', or 'stream'.
Type string `json:"type,omitempty" mapstructure:"type,omitempty"`
// Interval is the duration in nanoseconds between pulls in a 'subscription' Type. It is only read
// if the Type is 'subscription'.
Interval time.Duration `json:"interval,omitempty" mapstructure:"interval,omitempty"`
// Options are a slice of machine.Option https://godoc.org/github.com/whitaker-io/machine#Option
Options []*Option `json:"options,omitempty" mapstructure:"options,omitempty"`
*VertexSerialization
}
// VertexSerialization config based definition for a stream vertex
type VertexSerialization struct {
// ID unique identifier for the stream.
ID string `json:"id,omitempty" mapstructure:"id,omitempty"`
// Provider Plugin information to load
Provider *PluginDefinition `json:"provider,omitempty" mapstructure:"provider,omitempty"`
next map[string]*VertexSerialization
}
// RegisterPluginProvider function for registering a PluginProvider
// to be used for loading VertexProviders
func RegisterPluginProvider(name string, p PluginProvider) {
pluginProviders[name] = p
}
// Load method loads a stream based on the StreamSerialization
func Load(serialization *StreamSerialization) (Stream, error) {
var stream Stream
switch serialization.Type {
case httpConst:
if serialization.VertexSerialization == nil {
return nil, fmt.Errorf("http stream missing config")
}
stream = NewHTTPStream(serialization.ID, serialization.Options...)
case websocketConst:
if serialization.VertexSerialization == nil {
return nil, fmt.Errorf("websocket stream missing config")
}
stream = NewWebsocketStream(serialization.ID, serialization.Options...)
case subscriptionConst:
if serialization.VertexSerialization == nil {
return nil, fmt.Errorf("non-terminated subscription")
}
stream = NewSubscriptionStream(
serialization.ID,
serialization.VertexSerialization.subscription(),
serialization.Interval,
serialization.Options...,
)
case streamConst:
if serialization.VertexSerialization == nil {
return nil, fmt.Errorf("non-terminated stream")
}
stream = NewStream(
serialization.ID,
serialization.VertexSerialization.retriever(),
serialization.Options...,
)
default:
return nil, fmt.Errorf("invalid type")
}
if err := serialization.VertexSerialization.load(stream.Builder()); err != nil {
return nil, err
}
return stream, nil
}
func (vs *VertexSerialization) load(builder Builder) error {
if next, ok := vs.next["map"]; ok {
return next.load(builder.Map(next.ID, next.applicative()))
} else if next, ok := vs.next["fold_left"]; ok {
return next.load(builder.FoldLeft(next.ID, next.fold()))
} else if next, ok := vs.next["fold_right"]; ok {
return next.load(builder.FoldRight(next.ID, next.fold()))
} else if next, ok := vs.next["fork"]; ok {
leftBuilder, rightBuilder := builder.Fork(next.ID, next.fork())
left, right := next.next["left"], next.next["right"]
if left != nil {
if err := left.load(leftBuilder); err != nil {
return err
}
}
if right != nil {
return right.load(rightBuilder)
}
} else if next, ok := vs.next["loop"]; ok {
leftBuilder, rightBuilder := builder.Loop(next.ID, next.fork())
left, right := next.next["in"], next.next["out"]
if left != nil {
if err := left.load(leftBuilder); err != nil {
return err
}
}
if right != nil {
return right.load(rightBuilder)
}
} else if next, ok := vs.next["publish"]; ok {
builder.Publish(next.ID, next.publish())
}
return nil
}
func (vs *VertexSerialization) subscription() Subscription {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Subscription); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not subscription"))
}
func (vs *VertexSerialization) retriever() Retriever {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Retriever); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not retriever"))
}
func (vs *VertexSerialization) applicative() Applicative {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Applicative); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not applicative"))
}
func (vs *VertexSerialization) fold() Fold {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Fold); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not fold"))
}
func (vs *VertexSerialization) fork() Fork {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Fork); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not fork"))
}
func (vs *VertexSerialization) publish() Publisher {
if sym, err := vs.Provider.load(); err != nil {
panic(err)
} else if x, ok := sym.(Publisher); ok {
return x
}
panic(fmt.Errorf("invalid plugin type not publisher"))
}
// Load is a function to load all of the Providers into memory
func (def *PluginDefinition) load() (interface{}, error) {
if provider, ok := pluginProviders[def.Type]; ok {
return provider.Load(def)
}
return nil, fmt.Errorf("missing PluginProvider %s", def.Type)
}
// MarshalJSON implementation to marshal json
func (s *StreamSerialization) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{}
s.toMap(m)
return json.Marshal(m)
}
// UnmarshalJSON implementation to unmarshal json
func (s *StreamSerialization) UnmarshalJSON(data []byte) error {
m := map[string]interface{}{}
if err := json.Unmarshal(data, &m); err != nil {
return err
}
return s.fromMap(m)
}
// MarshalYAML implementation to marshal yaml
func (s *StreamSerialization) MarshalYAML() (interface{}, error) {
m := map[string]interface{}{}
s.toMap(m)
return m, nil
}
// UnmarshalYAML implementation to unmarshal yaml
func (s *StreamSerialization) UnmarshalYAML(unmarshal func(interface{}) error) error {
m := map[string]interface{}{}
if err := unmarshal(&m); err != nil {
return err
}
return s.fromMap(m)
}
func (s *StreamSerialization) toMap(m map[string]interface{}) {
m["id"] = s.ID
m["type"] = s.Type
if s.Type == subscriptionConst {
m["interval"] = int64(s.Interval)
}
m["provider"] = s.Provider
m["options"] = s.Options
for k, v := range s.next {
out := map[string]interface{}{}
v.toMap(out)
m[k] = out
}
}
func (s *StreamSerialization) fromMap(m map[string]interface{}) error {
s.VertexSerialization = &VertexSerialization{}
if t, ok := m["type"]; ok {
s.Type = t.(string)
} else {
return fmt.Errorf("missing type field")
}
if interval, ok := m["interval"]; ok && s.Type == subscriptionConst {
switch val := interval.(type) {
case int64:
s.Interval = time.Duration(val)
case int:
s.Interval = time.Duration(val)
case float64:
s.Interval = time.Duration(val)
case string:
default:
return fmt.Errorf("invalid interval type expecting int or int64 for %s found %v", s.ID, reflect.TypeOf(val))
}
} else if s.Type == subscriptionConst {
return fmt.Errorf("missing interval field")
}
if provider, ok := m["provider"]; ok {
s.VertexSerialization.Provider = &PluginDefinition{}
if err := mapstructure.Decode(provider, s.VertexSerialization.Provider); err != nil {
panic(err)
}
}
if options, ok := m["options"]; ok {
s.Options = []*Option{}
if err := mapstructure.Decode(options, &s.Options); err != nil {
panic(err)
}
}
delete(m, "type")
delete(m, "interval")
delete(m, "provider")
s.VertexSerialization.fromMap(m)
return nil
}
func (vs *VertexSerialization) toMap(m map[string]interface{}) {
if vs.ID != "" {
m["id"] = vs.ID
}
if vs.Provider != nil {
m["provider"] = vs.Provider
}
for k, v := range vs.next {
out := map[string]interface{}{}
v.toMap(out)
m[k] = out
}
}
func (vs *VertexSerialization) fromMap(m map[string]interface{}) {
if id, ok := m["id"]; ok {
vs.ID = id.(string)
}
if provider, ok := m["provider"]; ok {
vs.Provider = &PluginDefinition{}
if err := mapstructure.Decode(provider, vs.Provider); err != nil {
panic(err)
}
delete(m, "provider")
}
vs.next = map[string]*VertexSerialization{}
for k, v := range m {
if x, ok := v.(map[string]interface{}); ok {
vs2 := &VertexSerialization{}
vs2.fromMap(x)
vs.next[k] = vs2
}
}
}