forked from benthosdev/benthos
/
common.go
433 lines (375 loc) · 12.2 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
package mongodb
import (
"context"
"errors"
"fmt"
"strconv"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"github.com/nehal119/benthos-119/pkg/docs"
"github.com/nehal119/benthos-119/public/bloblang"
"github.com/nehal119/benthos-119/public/service"
)
// JSONMarshalMode represents the way in which BSON should be marshalled to JSON.
type JSONMarshalMode string
const (
// JSONMarshalModeCanonical Canonical BSON to JSON marshal mode.
JSONMarshalModeCanonical JSONMarshalMode = "canonical"
// JSONMarshalModeRelaxed Relaxed BSON to JSON marshal mode.
JSONMarshalModeRelaxed JSONMarshalMode = "relaxed"
)
//------------------------------------------------------------------------------
const (
// Common Client Fields
commonFieldClientURL = "url"
commonFieldClientDatabase = "database"
commonFieldClientUsername = "username"
commonFieldClientPassword = "password"
)
func clientFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewURLField(commonFieldClientURL).
Description("The URL of the target MongoDB server.").
Example("mongodb://localhost:27017"),
service.NewStringField(commonFieldClientDatabase).
Description("The name of the target MongoDB database."),
service.NewStringField(commonFieldClientUsername).
Description("The username to connect to the database.").
Default(""),
service.NewStringField(commonFieldClientPassword).
Description("The password to connect to the database.").
Default("").
Secret(),
}
}
func getClient(parsedConf *service.ParsedConfig) (client *mongo.Client, database *mongo.Database, err error) {
var url string
if url, err = parsedConf.FieldString(commonFieldClientURL); err != nil {
return
}
var username, password string
if username, err = parsedConf.FieldString(commonFieldClientUsername); err != nil {
return
}
if password, err = parsedConf.FieldString(commonFieldClientPassword); err != nil {
return
}
opt := options.Client().
SetConnectTimeout(10 * time.Second).
SetSocketTimeout(30 * time.Second).
SetServerSelectionTimeout(30 * time.Second).
ApplyURI(url)
if username != "" && password != "" {
creds := options.Credential{
Username: username,
Password: password,
}
opt.SetAuth(creds)
}
ctx, done := context.WithTimeout(context.Background(), time.Minute)
defer done()
if client, err = mongo.Connect(ctx, opt); err != nil {
return
}
var databaseStr string
if databaseStr, err = parsedConf.FieldString(commonFieldClientDatabase); err != nil {
return
}
database = client.Database(databaseStr)
return
}
//------------------------------------------------------------------------------
// Operation represents the operation that will be performed by MongoDB.
type Operation string
const (
// OperationInsertOne Insert One operation.
OperationInsertOne Operation = "insert-one"
// OperationDeleteOne Delete One operation.
OperationDeleteOne Operation = "delete-one"
// OperationDeleteMany Delete many operation.
OperationDeleteMany Operation = "delete-many"
// OperationReplaceOne Replace one operation.
OperationReplaceOne Operation = "replace-one"
// OperationUpdateOne Update one operation.
OperationUpdateOne Operation = "update-one"
// OperationFindOne Find one operation.
OperationFindOne Operation = "find-one"
// OperationInvalid Invalid operation.
OperationInvalid Operation = "invalid"
)
func (op Operation) isDocumentAllowed() bool {
switch op {
case OperationInsertOne,
OperationReplaceOne,
OperationUpdateOne:
return true
default:
return false
}
}
func (op Operation) isFilterAllowed() bool {
switch op {
case OperationDeleteOne,
OperationDeleteMany,
OperationReplaceOne,
OperationUpdateOne,
OperationFindOne:
return true
default:
return false
}
}
func (op Operation) isHintAllowed() bool {
switch op {
case OperationDeleteOne,
OperationDeleteMany,
OperationReplaceOne,
OperationUpdateOne,
OperationFindOne:
return true
default:
return false
}
}
func (op Operation) isUpsertAllowed() bool {
switch op {
case OperationReplaceOne,
OperationUpdateOne:
return true
default:
return false
}
}
// NewOperation converts a string operation to a strongly-typed Operation.
func NewOperation(op string) Operation {
switch op {
case "insert-one":
return OperationInsertOne
case "delete-one":
return OperationDeleteOne
case "delete-many":
return OperationDeleteMany
case "replace-one":
return OperationReplaceOne
case "update-one":
return OperationUpdateOne
case "find-one":
return OperationFindOne
default:
return OperationInvalid
}
}
const (
// Common Operation Fields
commonFieldOperation = "operation"
)
func processorOperationDocs(defaultOperation Operation) docs.FieldSpec {
fs := outputOperationDocs(defaultOperation)
return fs.HasOptions(append(fs.Options, string(OperationFindOne))...)
}
func outputOperationDocs(defaultOperation Operation) docs.FieldSpec {
return docs.FieldString(
"operation",
"The mongodb operation to perform.",
).HasOptions(
string(OperationInsertOne),
string(OperationDeleteOne),
string(OperationDeleteMany),
string(OperationReplaceOne),
string(OperationUpdateOne),
).HasDefault(string(defaultOperation))
}
func operationFromParsed(pConf *service.ParsedConfig) (operation Operation, err error) {
var operationStr string
if operationStr, err = pConf.FieldString(commonFieldOperation); err != nil {
return
}
if operation = NewOperation(operationStr); operation == OperationInvalid {
err = fmt.Errorf("mongodb operation '%s' unknown: must be insert-one, delete-one, delete-many, replace-one or update-one", operationStr)
}
return
}
//------------------------------------------------------------------------------
const (
// Common Write Concern Fields
commonFieldWriteConcern = "write_concern"
commonFieldWriteConcernW = "w"
commonFieldWriteConcernJ = "j"
commonFieldWriteConcernWTimeout = "w_timeout"
)
func writeConcernDocs() docs.FieldSpec {
return docs.FieldObject(commonFieldWriteConcern, "The write concern settings for the mongo connection.").
WithChildren(
docs.FieldString(commonFieldWriteConcernW, "W requests acknowledgement that write operations propagate to the specified number of mongodb instances.").HasDefault(""),
docs.FieldBool(commonFieldWriteConcernJ, "J requests acknowledgement from MongoDB that write operations are written to the journal.").HasDefault(false),
docs.FieldString(commonFieldWriteConcernWTimeout, "The write concern timeout.").HasDefault(""),
)
}
func writeConcernCollectionOptionFromParsed(pConf *service.ParsedConfig) (opt *options.CollectionOptions, err error) {
pConf = pConf.Namespace(commonFieldWriteConcern)
var w string
if w, err = pConf.FieldString(commonFieldWriteConcernW); err != nil {
return
}
var j bool
if j, err = pConf.FieldBool(commonFieldWriteConcernJ); err != nil {
return
}
var wTimeout time.Duration
if dStr, _ := pConf.FieldString(commonFieldWriteConcernWTimeout); dStr != "" {
if wTimeout, err = pConf.FieldDuration(commonFieldWriteConcernWTimeout); err != nil {
return
}
}
writeConcern := &writeconcern.WriteConcern{
Journal: &j,
WTimeout: wTimeout,
}
if wInt, err := strconv.Atoi(w); err != nil {
writeConcern.W = w
} else {
writeConcern.W = wInt
}
return options.Collection().SetWriteConcern(writeConcern), nil
}
//------------------------------------------------------------------------------
const (
// Common Write Map Fields
commonFieldDocumentMap = "document_map"
commonFieldFilterMap = "filter_map"
commonFieldHintMap = "hint_map"
commonFieldUpsert = "upsert"
)
func writeMapsFields() []*service.ConfigField {
return []*service.ConfigField{
service.NewBloblangField(commonFieldDocumentMap).
Description("A bloblang map representing the records in the mongo db. Used to generate the document for mongodb by " +
"mapping the fields in the message to the mongodb fields. The document map is required for the operations " +
"insert-one, replace-one and update-one.").
Examples(mapExamples()...).
Default(""),
service.NewBloblangField(commonFieldFilterMap).
Description("A bloblang map representing the filter for the mongo db command. The filter map is required for all operations except " +
"insert-one. It is used to find the document(s) for the operation. For example in a delete-one case, the filter map should " +
"have the fields required to locate the document to delete.").
Examples(mapExamples()...).
Default(""),
service.NewBloblangField(commonFieldHintMap).
Description("A bloblang map representing the hint for the mongo db command. This map is optional and is used with all operations " +
"except insert-one. It is used to improve performance of finding the documents in the mongodb.").
Examples(mapExamples()...).
Default(""),
service.NewBoolField(commonFieldUpsert).
Description("The upsert setting is optional and only applies for update-one and replace-one operations. If the filter specified in filter_map matches, the document is updated or replaced accordingly, otherwise it is created.").
Version("3.60.0").
Default(false),
}
}
type writeMaps struct {
filterMap *bloblang.Executor
documentMap *bloblang.Executor
hintMap *bloblang.Executor
upsert bool
}
func writeMapsFromParsed(conf *service.ParsedConfig, operation Operation) (maps writeMaps, err error) {
if probeStr, _ := conf.FieldString(commonFieldFilterMap); probeStr != "" {
if maps.filterMap, err = conf.FieldBloblang(commonFieldFilterMap); err != nil {
return
}
}
if probeStr, _ := conf.FieldString(commonFieldDocumentMap); probeStr != "" {
if maps.documentMap, err = conf.FieldBloblang(commonFieldDocumentMap); err != nil {
return
}
}
if probeStr, _ := conf.FieldString(commonFieldHintMap); probeStr != "" {
if maps.hintMap, err = conf.FieldBloblang(commonFieldHintMap); err != nil {
return
}
}
if maps.upsert, err = conf.FieldBool(commonFieldUpsert); err != nil {
return
}
if operation.isFilterAllowed() {
if maps.filterMap == nil {
err = errors.New("mongodb filter_map must be specified")
return
}
} else if maps.filterMap != nil {
err = fmt.Errorf("mongodb filter_map not allowed for '%s' operation", operation)
return
}
if operation.isDocumentAllowed() {
if maps.documentMap == nil {
err = errors.New("mongodb document_map must be specified")
return
}
} else if maps.documentMap != nil {
err = fmt.Errorf("mongodb document_map not allowed for '%s' operation", operation)
return
}
if !operation.isHintAllowed() && maps.hintMap != nil {
err = fmt.Errorf("mongodb hint_map not allowed for '%s' operation", operation)
return
}
if !operation.isUpsertAllowed() && maps.upsert {
err = fmt.Errorf("mongodb upsert not allowed for '%s' operation", operation)
return
}
return
}
func (w writeMaps) extractFromMessage(operation Operation, i int, batch service.MessageBatch) (
docJSON, filterJSON, hintJSON any, err error,
) {
var hintVal, filterVal, documentVal *service.Message
var filterValWanted, documentValWanted bool
filterValWanted = operation.isFilterAllowed()
documentValWanted = operation.isDocumentAllowed()
if filterValWanted {
if filterVal, err = batch.BloblangQuery(i, w.filterMap); err != nil {
err = fmt.Errorf("failed to execute filter_map: %v", err)
return
}
}
if (filterVal != nil || !filterValWanted) && documentValWanted {
if documentVal, err = batch.BloblangQuery(i, w.documentMap); err != nil {
err = fmt.Errorf("failed to execute document_map: %v", err)
return
}
}
if filterVal == nil && filterValWanted {
err = fmt.Errorf("failed to generate filterVal")
return
}
if documentVal == nil && documentValWanted {
err = fmt.Errorf("failed to generate documentVal")
return
}
if filterValWanted {
if filterJSON, err = filterVal.AsStructured(); err != nil {
return
}
}
if documentValWanted {
if docJSON, err = documentVal.AsStructured(); err != nil {
return
}
}
if w.hintMap != nil {
hintVal, err = batch.BloblangQuery(i, w.hintMap)
if err != nil {
err = fmt.Errorf("failed to execute hint_map: %v", err)
return
}
if hintJSON, err = hintVal.AsStructured(); err != nil {
return
}
}
return
}
func mapExamples() []any {
examples := []any{"root.a = this.foo\nroot.b = this.bar"}
return examples
}