-
Notifications
You must be signed in to change notification settings - Fork 307
/
limiter_setup.go
81 lines (73 loc) · 2.54 KB
/
limiter_setup.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package payload
import (
"context"
"time"
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/utils/mem"
"golang.org/x/sync/errgroup"
)
type AdaptiveLimiterFunc func(int64) int64
// SetupAdaptiveLimiter creates a new AdaptiveLimiter, starts its RunLoop in a goroutine and periodically collects statistics.
func SetupAdaptiveLimiter(ctx context.Context, g *errgroup.Group) AdaptiveLimiterFunc {
var freeMem FreeMemory
if config.GetBool("AdaptivePayloadLimiter.enabled", false) {
freeMem = func() (float64, error) {
s, err := mem.Get()
if err != nil {
return 0, err
}
return s.AvailablePercent, nil
}
}
limiterConfig := AdaptiveLimiterConfig{
FreeMemThresholdLimit: config.GetFloat64("AdaptivePayloadLimiter.freeMemThresholdLimit", 30),
FreeMemCriticalLimit: config.GetFloat64("AdaptivePayloadLimiter.freeMemCriticalLimit", 10),
MaxThresholdFactor: config.GetInt("AdaptivePayloadLimiter.maxThresholdFactor", 9),
Log: logger.NewLogger().Child("payload_limiter"),
FreeMemory: freeMem,
}
// run tick periodically
tickFrequency := config.GetDuration("AdaptivePayloadLimiter.tickFrequency", 1, time.Second)
limiter := NewAdaptiveLimiter(limiterConfig)
g.Go(func() error {
limiter.RunLoop(ctx, func() <-chan time.Time {
return time.After(tickFrequency)
})
return nil
})
// collect statistics
statsFrequency := config.GetDuration("AdaptivePayloadLimiter.statsFrequency", 15, time.Second)
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-time.After(statsFrequency):
limiterStats := limiter.Stats()
stats.Default.NewStat(
"adaptive_payload_limiter_state",
stats.GaugeType,
).Gauge(int(limiterStats.State))
stats.Default.NewStat(
"adaptive_payload_limiter_threshold_factor",
stats.GaugeType,
).Gauge(limiterStats.ThresholdFactor)
if memStats, err := mem.Get(); err == nil {
stats.Default.NewStat("mem_total_bytes", stats.GaugeType).
Gauge(memStats.Total)
stats.Default.NewStat("mem_available_bytes", stats.GaugeType).
Gauge(memStats.Available)
stats.Default.NewStat("mem_available_percent", stats.GaugeType).
Gauge(memStats.AvailablePercent)
stats.Default.NewStat("mem_used_bytes", stats.GaugeType).
Gauge(memStats.Used)
stats.Default.NewStat("mem_used_percent", stats.GaugeType).
Gauge(memStats.UsedPercent)
}
}
}
})
return limiter.Limit
}