-
Notifications
You must be signed in to change notification settings - Fork 98
/
options.go
92 lines (77 loc) · 2.37 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
/*
Copyright 2022 The Numaproj Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package forward
import (
"time"
"go.uber.org/zap"
dfv1 "github.com/numaproj/numaflow/pkg/apis/numaflow/v1alpha1"
"github.com/numaproj/numaflow/pkg/shared/logging"
)
// options for forwarding the message
type options struct {
// readBatchSize is the default batch size
readBatchSize int64
// udfConcurrency sets the concurrency for concurrent map UDF processing
udfConcurrency int
// retryInterval is the time.Duration to sleep before retrying
retryInterval time.Duration
// logger is used to pass the logger variable
logger *zap.SugaredLogger
// enableMapUdfStream indicates whether the message streaming is enabled or not for map UDF processing
enableMapUdfStream bool
}
type Option func(*options) error
func DefaultOptions() *options {
return &options{
readBatchSize: dfv1.DefaultReadBatchSize,
udfConcurrency: dfv1.DefaultReadBatchSize,
retryInterval: time.Millisecond,
logger: logging.NewLogger(),
enableMapUdfStream: false,
}
}
// WithRetryInterval sets the retry interval
func WithRetryInterval(f time.Duration) Option {
return func(o *options) error {
o.retryInterval = time.Duration(f)
return nil
}
}
// WithReadBatchSize sets the read batch size
func WithReadBatchSize(f int64) Option {
return func(o *options) error {
o.readBatchSize = f
return nil
}
}
// WithUDFConcurrency sets concurrency for map UDF processing
func WithUDFConcurrency(f int) Option {
return func(o *options) error {
o.udfConcurrency = f
return nil
}
}
// WithLogger is used to return logger information
func WithLogger(l *zap.SugaredLogger) Option {
return func(o *options) error {
o.logger = l
return nil
}
}
// WithUDFStreaming sets streaming for map UDF processing
func WithUDFStreaming(f bool) Option {
return func(o *options) error {
o.enableMapUdfStream = f
return nil
}
}