generated from oracle/template-repo
-
Notifications
You must be signed in to change notification settings - Fork 3
/
processors.go
572 lines (477 loc) · 20 KB
/
processors.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
/*
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
package processors
import (
"github.com/oracle/coherence-go-client/coherence/extractors"
"github.com/oracle/coherence-go-client/coherence/filters"
"math/big"
)
const (
processorPrefix = "processor."
extractorPrefix = "extractor."
queuePrefix = "internal.net.queue.processor."
compositeProcessorType = processorPrefix + "CompositeProcessor"
conditionalProcessorType = processorPrefix + "ConditionalProcessor"
conditionalPutProcessorType = processorPrefix + "ConditionalPut"
conditionalPutAllProcessorType = processorPrefix + "ConditionalPutAll"
conditionalRemoveProcessorType = processorPrefix + "ConditionalRemove"
extractorProcessorType = processorPrefix + "ExtractorProcessor"
incrementProcessorType = processorPrefix + "NumberIncrementor"
methodInvocationProcessorType = processorPrefix + "MethodInvocationProcessor"
multiplierProcessorType = processorPrefix + "NumberMultiplier"
preloadProcessorType = processorPrefix + "PreloadRequest"
touchProcessorType = processorPrefix + "TouchProcessor"
updateProcessorType = processorPrefix + "UpdaterProcessor"
versionedPutProcessorType = processorPrefix + "VersionedPut"
versionedPutAllProcessorType = processorPrefix + "VersionedPutAll"
compositeUpdaterType = extractorPrefix + "CompositeUpdater"
universalUpdaterType = extractorPrefix + "UniversalUpdater"
queueNameHashType = queuePrefix + "QueueNameHash"
queueOfferType = queuePrefix + "QueueOffer"
queuePollType = queuePrefix + "QueuePoll"
queuePeekType = queuePrefix + "QueuePeek"
)
// Processor interface allows composition of Processors. An instance of a Processor
// should be created using the various factory methods.
type Processor interface {
// AndThen creates a Processor that executes the current Processor followed by the specified 'next' Processor.
AndThen(next Processor) Processor
// When creates a Processor that executes only if the specified Filter passes. If the underlying filter
// expects to evaluate existent entries only it should be combined with a filter to test for presence
// like and(present).
When(filter filters.Filter) Processor
}
// Number represents a type that can be incremented or multiplied
type Number interface {
~float32 | ~float64 | ~int | ~int8 | ~int16 | ~int32 | ~int64 |
~uint | ~uint8 | ~uint16 | ~uint32 | ~uint64 |
~complex64 | ~complex128 | big.Rat | big.Int
}
// ConditionalPut puts the value if the filter returns true. While the conditional insert processing
// could be implemented via direct key-based QueryMap operations, this method is more efficient and
// enforces concurrency control without explicit locking.
func ConditionalPut[V any](filter filters.Filter, value V) Processor {
return newConditionalPutProcessor[V](filter, value)
}
// ConditionalPutAll inserts the specified values if the filter evaluates to true. While the conditional
// insert processing could be implemented via direct key-based QueryMap operations, this is more efficient
// and enforces concurrency control without explicit locking.
func ConditionalPutAll[K comparable, V any](filter filters.Filter, entries map[K]V) Processor {
return newConditionalPutAllProcessor[K, V](filter, entries)
}
// ConditionalRemove removes the values if the filter evaluates to true. While the conditional remove
// could be implemented via direct key-based QueryMap operations, this is more efficient and
// enforces concurrency control without explicit locking. If returnCurrent is set to true and
// the remove does not occur, then the current value will be returned.
func ConditionalRemove(filter filters.Filter, returnCurrent ...bool) Processor {
return newConditionalRemoveProcessor(filter, returnCurrent...)
}
// Extractor creates a processor to extract the specified property from an entry's value. If the property
// contains a "." (period), then a chained extractor is created.
func Extractor[E any](property string) Processor {
return newExtractorProcessor[E](property)
}
// InvokeAccessor invokes an accessor method on an entry. The specified method will
// be invoked with the specified arguments. It returns a Processor that can be used
// for further composition.
func InvokeAccessor(method string, args ...interface{}) Processor {
return newMethodInvocationProcessor(method, false, args)
}
// InvokeMutator invokes a mutator method. The specified method will
// be invoked with the specified arguments. It returns a Processor that can be used
// for further composition.
func InvokeMutator(method string, args ...interface{}) Processor {
return newMethodInvocationProcessor(method, true, args)
}
// Increment creates an Increment Processor that increments the numeric value of the
// specified property by the specified value. If postInc is true then
// return the value as it was before it was incremented, or if false return the
// value as it is after it is incremented.
func Increment[I Number](property string, value I, postInc ...bool) Processor {
return newNumberIncrementor[I](property, value, postInc...)
}
// Multiply creates a Multiply Processor that multiplies the numeric value of the
// specified property by the specified value. If postInc is true then
// return the value as it was before it was incremented, or if false return the
// value as it is after it is incremented.
func Multiply[I Number](property string, value I, postInc ...bool) Processor {
return newNumberMultiplier[I](property, value, postInc...)
}
// Preload loads an entry into a NamedMap. This processor provides a means to "pre-load"
// an entry or a collection of entries into the cache without incurring the cost of
// sending the value(s) over the network. If the corresponding entry (or entries) already
// exists in the map, or if the map does not have a loader, then invoking this Processor
// has no effect.
func Preload() Processor {
return newPreloadProcessor()
}
// Touch touches an entry (if present) in order to trigger interceptor re-evaluation
// and possibly increment expiry time.
func Touch() Processor {
return newTouchProcessor()
}
// Update modifies an entry's specified property with the specified value.
// The processor will return a bool indicating if the entry to be updated was present.
func Update[V any](property string, value V) Processor {
return newUpdaterProcessor[V](property, value)
}
// VersionedPut inserts the specified value if the version specified by the new
// value matches the version of the current value. If insertion occurs
// the version of the entry within the map will be incremented.
func VersionedPut[V any](value V, canInsert, returnCurrent bool) Processor {
return newVersionedPutProcessor[V](value, canInsert, returnCurrent)
}
// VersionedPutAll inserts the specified value if the version specified by the new
// value matches the version of the current value. If insertion occurs
// the version of the entry within the map will be incremented.
func VersionedPutAll[K comparable, V any](entries map[K]V, canInsert, returnCurrent bool) Processor {
return newVersionedPutAllProcessor[K, V](entries, canInsert, returnCurrent)
}
// QueueNameHashProcessor determines the hash for a given queue name.
func QueueNameHashProcessor(queueName string) Processor {
return newQueueNameHashProcessor(queueName)
}
// QueueOfferProcessor places an item at the tail of the queue.
func QueueOfferProcessor[V any](value V) Processor {
return newQueueOfferProcessor[V](value)
}
// QueuePollProcessor retrieves an item from the head of the queue.
func QueuePollProcessor() Processor {
return newQueuePollProcessor()
}
// QueuePeekProcessor peeks at the first item from the head of the queue.
func QueuePeekProcessor() Processor {
return newQueuePeekProcessor()
}
type abstractProcessor struct {
Type string `json:"@class,omitempty"`
delegate Processor // delegate processor whose type name is set in the Type
}
func newAbstractProcessor(typeName string, delegate Processor) *abstractProcessor {
return &abstractProcessor{
Type: typeName,
delegate: delegate,
}
}
// AndThen creates a Processor that executes the current Processor followed by the specified Processor.
func (ap *abstractProcessor) AndThen(next Processor) Processor {
return newCompositeProcessor(ap.delegate, next)
}
// When creates a Processor that executes only if the specified Filter passes.
func (ap *abstractProcessor) When(filter filters.Filter) Processor {
return newConditionalProcessor(filter, ap.delegate)
}
type compositeProcessor struct {
*abstractProcessor
Processors []Processor `json:"processors,omitempty"`
}
func newCompositeProcessor(left, right Processor) *compositeProcessor {
cp := &compositeProcessor{}
cp.abstractProcessor = newAbstractProcessor(compositeProcessorType, cp)
cp.Processors = []Processor{left, right}
return cp
}
// AndThen creates a Processor that executes the current Processor followed by the specified Processor.
func (ap *compositeProcessor) AndThen(next Processor) Processor {
ap.Processors = append(ap.Processors, next)
return ap
}
type conditionalProcessor struct {
*abstractProcessor
Filter filters.Filter `json:"filter,omitempty"`
Processor Processor `json:"processor,omitempty"`
}
func newConditionalProcessor(filter filters.Filter, proc Processor) *conditionalProcessor {
cp := &conditionalProcessor{Filter: filter, Processor: proc}
cp.abstractProcessor = newAbstractProcessor(conditionalProcessorType, cp)
return cp
}
type conditionalPutAllProcessor[K comparable, V any] struct {
*abstractProcessor
Filter filters.Filter `json:"filter,omitempty"`
Entries putAllEntries[K, V] `json:"entries,omitempty"`
RetCurrent bool `json:"return"`
}
type putAllEntry[K comparable, V any] struct {
Key K `json:"key,omitempty"`
Value V `json:"value"`
}
type putAllEntries[K comparable, V any] struct {
Entries []*putAllEntry[K, V] `json:"entries,omitempty"`
}
func newConditionalPutAllProcessor[K comparable, V any](filter filters.Filter, entries map[K]V, returnCurrent ...bool) *conditionalPutAllProcessor[K, V] {
retCurrent := false
if len(returnCurrent) > 0 {
retCurrent = returnCurrent[0]
}
e := make([]*putAllEntry[K, V], len(entries))
counter := 0
for k, v := range entries {
e[counter] = &putAllEntry[K, V]{Key: k, Value: v}
counter++
}
cp := &conditionalPutAllProcessor[K, V]{Filter: filter, Entries: putAllEntries[K, V]{Entries: e}, RetCurrent: retCurrent}
cp.abstractProcessor = newAbstractProcessor(conditionalPutAllProcessorType, cp)
return cp
}
type conditionalPutProcessor[V any] struct {
*abstractProcessor
Filter filters.Filter `json:"filter,omitempty"`
Value V `json:"value,omitempty"`
RetCurrent bool `json:"return"`
}
func newConditionalPutProcessor[V any](filter filters.Filter, value V, returnCurrent ...bool) *conditionalPutProcessor[V] {
retCurrent := false
if len(returnCurrent) > 0 {
retCurrent = returnCurrent[0]
}
cp := &conditionalPutProcessor[V]{Filter: filter, Value: value, RetCurrent: retCurrent}
cp.abstractProcessor = newAbstractProcessor(conditionalPutProcessorType, cp)
return cp
}
// ReturnCurrent marks if this processor should return current value or not.
func (cp *conditionalPutProcessor[V]) ReturnCurrent() Processor {
cp.RetCurrent = true
return cp
}
type conditionalRemoveProcessor struct {
*abstractProcessor
Filter filters.Filter `json:"filter,omitempty"`
RetCurrent bool `json:"return"`
}
func newConditionalRemoveProcessor(filter filters.Filter, returnCurrent ...bool) *conditionalRemoveProcessor {
retCurrent := false
if len(returnCurrent) > 0 {
retCurrent = returnCurrent[0]
}
cp := &conditionalRemoveProcessor{Filter: filter, RetCurrent: retCurrent}
cp.abstractProcessor = newAbstractProcessor(conditionalRemoveProcessorType, cp)
return cp
}
// ReturnCurrent marks if this processor should return current value or not.
func (cp *conditionalRemoveProcessor) ReturnCurrent(returnCurrent bool) Processor {
cp.RetCurrent = returnCurrent
return cp
}
type extractorProcessor[E any] struct {
*abstractProcessor
Extractor extractors.ValueExtractor[any, E] `json:"extractor,omitempty"`
Name string `json:"name,omitempty"`
Params []interface{} `json:"params"`
Target int `json:"target,omitempty"` //1 for Key extractor, 0 for value extractor
}
func newExtractorProcessor[E any](property string) *extractorProcessor[E] {
ep := &extractorProcessor[E]{Name: property, Extractor: extractors.Extract[E](property)}
ep.abstractProcessor = newAbstractProcessor(extractorProcessorType, ep)
return ep
}
type methodInvocationProcessor struct {
*abstractProcessor
MethodName string `json:"methodName,omitempty"`
IsMutator bool `json:"mutator"`
Args []interface{} `json:"args"`
}
func newMethodInvocationProcessor(property string, isMutator bool, args []interface{}) *methodInvocationProcessor {
if args == nil {
args = make([]interface{}, 0)
}
ep := &methodInvocationProcessor{
MethodName: property,
IsMutator: isMutator,
Args: args,
}
ep.abstractProcessor = newAbstractProcessor(methodInvocationProcessorType, ep)
return ep
}
type numberIncrementor[I Number] struct {
*abstractProcessor
Manipulator *compositeIncrementor[I] `json:"manipulator,omitempty"`
IncBy I `json:"increment,omitempty"`
ReturnPostInc bool `json:"postInc"`
}
func newNumberIncrementor[I Number](propertyName string, incBy I, postInc ...bool) *numberIncrementor[I] {
postIncrement := false
if len(postInc) > 0 {
postIncrement = postInc[0]
}
ni := &numberIncrementor[I]{IncBy: incBy, ReturnPostInc: postIncrement}
ni.abstractProcessor = newAbstractProcessor(incrementProcessorType, ni)
ni.Manipulator = newCompositeIncrementor[I](propertyName)
return ni
}
type numberMultiplier[I Number] struct {
*abstractProcessor
Manipulator *compositeIncrementor[I] `json:"manipulator,omitempty"`
MultiplyBy I `json:"multiplier,omitempty"`
ReturnPostValue bool `json:"postMultiplication"`
}
func newNumberMultiplier[I Number](propertyName string, multiplyBy I, postInc ...bool) *numberMultiplier[I] {
postIncrement := false
if len(postInc) > 0 {
postIncrement = postInc[0]
}
ni := &numberMultiplier[I]{MultiplyBy: multiplyBy, ReturnPostValue: postIncrement}
ni.abstractProcessor = newAbstractProcessor(multiplierProcessorType, ni)
ni.Manipulator = newCompositeIncrementor[I](propertyName)
return ni
}
type preloadProcessor struct {
*abstractProcessor
}
func newPreloadProcessor() *preloadProcessor {
cp := &preloadProcessor{}
cp.abstractProcessor = newAbstractProcessor(preloadProcessorType, cp)
return cp
}
type touchProcessor struct {
*abstractProcessor
}
func newTouchProcessor() *touchProcessor {
tp := &touchProcessor{}
tp.abstractProcessor = newAbstractProcessor(touchProcessorType, tp)
return tp
}
type updaterProcessor[V any] struct {
*abstractProcessor
Updater *compositeUpdater `json:"updater,omitempty"`
Value V `json:"value"`
}
func newUpdaterProcessor[V any](propertyName string, value V) *updaterProcessor[V] {
up := &updaterProcessor[V]{Value: value}
up.abstractProcessor = newAbstractProcessor(updateProcessorType, up)
ie := extractors.Identity[any]()
up.Updater = &compositeUpdater{
Type: compositeUpdaterType,
Extractor: &ie,
Updater: newUniversalUpdater(propertyName),
}
return up
}
type compositeUpdater struct {
Type string `json:"@class,omitempty"`
Extractor *extractors.ValueExtractor[any, any] `json:"extractor,omitempty"`
Updater *universalUpdater `json:"updater,omitempty"`
}
type compositeIncrementor[E any] struct {
Type string `json:"@class,omitempty"`
Extractor *extractors.ValueExtractor[any, E] `json:"extractor,omitempty"`
Updater *universalUpdater `json:"updater,omitempty"`
}
type universalUpdater struct {
Type string `json:"@class,omitempty"`
Name string `json:"name,omitempty"`
}
func newUniversalUpdater(propertyName string) *universalUpdater {
return &universalUpdater{universalUpdaterType, propertyName}
}
func newCompositeIncrementor[I Number](propertyName string) *compositeIncrementor[I] {
ve := extractors.Extract[I](propertyName)
return &compositeIncrementor[I]{
Type: compositeUpdaterType,
Extractor: &ve,
Updater: newUniversalUpdater(propertyName),
}
}
type versionedPutAllEntriesWrapper[K comparable, V any] struct {
Type string `json:"@class,omitempty"`
Entries []*putAllEntry[K, V] `json:"entries,omitempty"`
}
func newVersionedPutAllEntriesWrapper[K comparable, V any](typeStr string, entries []*putAllEntry[K, V]) *versionedPutAllEntriesWrapper[K, V] {
return &versionedPutAllEntriesWrapper[K, V]{
Type: typeStr,
Entries: entries,
}
}
type versionedPutAllProcessor[K comparable, V any] struct {
*abstractProcessor
Entries *versionedPutAllEntriesWrapper[K, V] `json:"entries,omitempty"`
CanInsert bool `json:"insert"`
RetCurrent bool `json:"return"`
}
func newVersionedPutAllProcessor[K comparable, V any](entries map[K]V, canInsert, returnCurrent bool) *versionedPutAllProcessor[K, V] {
e := make([]*putAllEntry[K, V], len(entries))
counter := 0
for k, v := range entries {
e[counter] = &putAllEntry[K, V]{Key: k, Value: v}
counter++
}
cp := &versionedPutAllProcessor[K, V]{
Entries: newVersionedPutAllEntriesWrapper[K, V]("java.util.HashMap", e),
CanInsert: canInsert,
RetCurrent: returnCurrent,
}
cp.abstractProcessor = newAbstractProcessor(versionedPutAllProcessorType, cp)
return cp
}
// ReturnCurrent marks if this processor should return current value or not.
func (cp *versionedPutAllProcessor[K, V]) ReturnCurrent(returnCurrent bool) *versionedPutAllProcessor[K, V] {
cp.RetCurrent = returnCurrent
return cp
}
// AllowInsert marks if this processor should allow inserts or not.
func (cp *versionedPutAllProcessor[K, V]) AllowInsert(allowInsert bool) *versionedPutAllProcessor[K, V] {
cp.CanInsert = allowInsert
return cp
}
type versionedPutProcessor[V any] struct {
*abstractProcessor
Value V `json:"value,omitempty"`
CanInsert bool `json:"insert"`
RetCurrent bool `json:"return"`
}
func newVersionedPutProcessor[V any](value V, canInsert, returnCurrent bool) *versionedPutProcessor[V] {
cp := &versionedPutProcessor[V]{
Value: value,
CanInsert: canInsert,
RetCurrent: returnCurrent,
}
cp.abstractProcessor = newAbstractProcessor(versionedPutProcessorType, cp)
return cp
}
// ReturnCurrent marks if this processor should return current value or not.
func (cp *versionedPutProcessor[V]) ReturnCurrent(returnCurrent bool) Processor {
cp.RetCurrent = returnCurrent
return cp
}
// AllowInsert marks if this processor should allow inserts or not.
func (cp *versionedPutProcessor[V]) AllowInsert(allowInsert bool) Processor {
cp.CanInsert = allowInsert
return cp
}
type queueNameHashProcessor struct {
*abstractProcessor
Name string `json:"name"`
}
func newQueueNameHashProcessor(queueName string) *queueNameHashProcessor {
h := &queueNameHashProcessor{Name: queueName}
h.abstractProcessor = newAbstractProcessor(queueNameHashType, h)
return h
}
type queueOfferProcessor[V any] struct {
*abstractProcessor
Value V `json:"value"`
}
func newQueueOfferProcessor[V any](value V) *queueOfferProcessor[V] {
h := &queueOfferProcessor[V]{Value: value}
h.abstractProcessor = newAbstractProcessor(queueOfferType, h)
return h
}
type queuePollProcessor struct {
*abstractProcessor
}
func newQueuePollProcessor() *queuePollProcessor {
h := &queuePollProcessor{}
h.abstractProcessor = newAbstractProcessor(queuePollType, h)
return h
}
type queuePeekProcessor struct {
*abstractProcessor
}
func newQueuePeekProcessor() *queuePeekProcessor {
h := &queuePeekProcessor{}
h.abstractProcessor = newAbstractProcessor(queuePeekType, h)
return h
}