/
config.go
148 lines (122 loc) · 5.36 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package scheduler
import "time"
// Config configures the running scheduler
type Config struct {
// ZombieEvicted counts when a zombie order gets evicted
mxZombieEvicted Count
// mxThrottleDuplicate is incremented whenever a throttling order ignores a duplicate
mxThrottleDuplicate Count
// mxThrottleFirstRescheduled is incremented whenever a throttling order is being rescheduled
mxThrottleFirstRescheduled Count
// mxRescheduled is incremented whenever a waiter waits for a message.
// The type is the number of milliseconds of the waiter's max wait time.
// So note this is not actually the number of milliseconds the waiter is going to wait, which is probably less.
mxRescheduled CountForType
// mxExecuteRoundTrips observes the number of wait-iterations an order has run before being executed.
mxExecuteRoundTrips Observe
// mxExecutionTimeDeviation observes the number of seconds that the actual execution time
// deviates from the planned execution time.
mxExecutionTimeDeviation Observe
// mxPlaceOrderLag observes the current lag of processing order placements in seconds
mxPlaceOrderLag Observe
// configures the duration after which we'll consider a delay-order being "lost", meaning that because of possible
// waiter-change we might never get the order to be executed, so we will execute the order right now.
orderCatchupTimeout time.Duration
// mxExecutionDropped counts how many executions have been dropped due to missing orders.
// This can happen if the scheduler was offline for too long and orders are being re-scheduled, in which case
// we might have duplicate executions, but only one will succeed.
mxExecutionDropped Count
// orderZombieTTL defines the duration for after which the scheduler will
// not wait for an order to be executed and reschedule. This time should be greater than a usual redeployment-time or
// the kafka-configured topic's retention time.
// This can also happen if the scheduler's waiter intervals are changed, so the scheduled orders actually keep
// hanging in the queue.
// defaults to 1*Minute
orderZombieTTL time.Duration
}
// Observe metric allows to observe multiple values, e.g. a histogram or a summary
type Observe func(float64)
// Count allows to add values and calculate a rate
type Count func(float64)
// CountForType allows to count events belonging to a specific type
type CountForType func(string, float64)
// NewConfig creates a new config for
func NewConfig() *Config {
return &Config{
// init metrics with dummies
mxZombieEvicted: func(float64) {},
mxThrottleDuplicate: func(float64) {},
mxThrottleFirstRescheduled: func(float64) {},
mxRescheduled: func(string, float64) {},
mxExecuteRoundTrips: func(float64) {},
mxExecutionTimeDeviation: func(float64) {},
mxPlaceOrderLag: func(float64) {},
mxExecutionDropped: func(float64) {},
// after 10 seconds we'll try to do a catchup.
orderCatchupTimeout: 10 * time.Second,
orderZombieTTL: 1 * time.Minute,
}
}
// WithMxZombieEvicted sets a counter for measuring the number of
// zombie orders being evicted.
func (c *Config) WithMxZombieEvicted(cnt Count) *Config {
c.mxZombieEvicted = cnt
return c
}
// WithMxExecuteRoundTrips sets an observer for measuring the number of
// round trips an order has done before finally being executed. This will
// usually be a histogram or summary
func (c *Config) WithMxExecuteRoundTrips(h Observe) *Config {
c.mxExecuteRoundTrips = h
return c
}
// WithMxExecutionTimeDeviation sets an observer for measuring the seconds of
// deviation between planned execution and actual execution. This will
// usually be a histogram or summary. Times can also be negative in case there
// are no waiters small enough for the last iteration, which means the order will be executed before
// its actual deadline.
func (c *Config) WithMxExecutionTimeDeviation(h Observe) *Config {
c.mxExecutionTimeDeviation = h
return c
}
// WithMxThrottleDuplicate sets a counter for measuring the number of
// duplicates/throttles for a throttling order
func (c *Config) WithMxThrottleDuplicate(cnt Count) *Config {
c.mxThrottleDuplicate = cnt
return c
}
// WithMxThrottleFirstRescheduled sets a counter for measuring the number of
// reschedules for orders configured with ThrottleFirstReschedule
func (c *Config) WithMxThrottleFirstRescheduled(cnt Count) *Config {
c.mxThrottleFirstRescheduled = cnt
return c
}
// WithMxRescheduled sets a counter for measuring the number of
// reschedules in total
func (c *Config) WithMxRescheduled(cnt CountForType) *Config {
c.mxRescheduled = cnt
return c
}
// WithMxExecutionDropped adds a counter to track dropped executions due to retention or rescheduling
// issues.
func (c *Config) WithMxExecutionDropped(cnt Count) *Config {
c.mxExecutionDropped = cnt
return c
}
// WithMxPlaceOrderLag sets an observer for measuring the lag in seconds for
// order placement
func (c *Config) WithMxPlaceOrderLag(o Observe) *Config {
c.mxPlaceOrderLag = o
return c
}
// WithOrderCatchupTimeout sets a counter for measuring the number of catchups after
// restarting the scheduler with existing delay-orders
func (c *Config) WithOrderCatchupTimeout(timeout time.Duration) *Config {
c.orderCatchupTimeout = timeout
return c
}
// WithOrderZombieTTL sets the order zombie ttl
func (c *Config) WithOrderZombieTTL(ttl time.Duration) *Config {
c.orderZombieTTL = ttl
return c
}