-
Notifications
You must be signed in to change notification settings - Fork 0
/
ratelimitedworker.go
147 lines (120 loc) · 3.21 KB
/
ratelimitedworker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package ratelimitedworker
import (
"fmt"
"log"
"math"
"sync/atomic"
"time"
"go.uber.org/ratelimit"
)
type RLW struct {
ID string
RL ratelimit.Limiter
TargetRPM uint32
history uint32
verbose bool // enables logging
logger *log.Logger // pkg logger instance
targetRPS int // internally computed from TargetRPM and `hasty` param in Create()
active bool // bool to flag if the rlw has started
// TODO: maybe have a channel which can send the message to stop proccessing
// TODO: work on cleanup
}
/*
This will create an instance of the ratelimitedworker.
- id: primary identifier for the ratelimiter Job the work is assosiated to
- targetRPM: the expected maximum rate limit to perform at
- hasty: if true maintains a higher RPS good for work() that takes long, false forces rate below the limit.
- verbose: enable logging
*/
func Create(
id string,
targetRPM int,
hasty bool,
verbose bool,
) RLW {
var targetRPS int
var logger *log.Logger
if hasty {
targetRPS = int(math.Ceil(float64(targetRPM / 60)))
} else {
targetRPS = int(math.Floor(float64(targetRPM / 60)))
}
if verbose {
logger = log.Default()
logPrefix := fmt.Sprintf("RatelimitedWorkerId : %s | ", id)
logger.SetPrefix(logPrefix)
logger.SetFlags(log.Ltime | log.Lshortfile)
} else {
logger = nil
}
return RLW{
ID: id,
RL: ratelimit.New(targetRPS), // param is rps
TargetRPM: uint32(targetRPM),
verbose: verbose,
history: 0,
logger: logger,
targetRPS: targetRPS,
}
}
func (rlw *RLW) Track() {
if !rlw.active {
// init
rlw.start()
if rlw.verbose {
rlw.logger.Println("Init Success")
}
}
// in case we hit the per min rate limit too soon , we wait around until we can resume
for rlw.history >= rlw.TargetRPM {
if rlw.verbose {
rlw.logger.Printf(
"Reached RPM waiting for history refresh: {history : %d , allowedRPM: %d }\n",
rlw.history, rlw.TargetRPM,
)
}
time.Sleep(1 * time.Second)
}
rlw.RL.Take()
atomic.AddUint32(&rlw.history, 1)
}
func (rlw *RLW) start() {
rlw.active = true
window := 5
// optional logger ticker just for understanding
go func() {
ticker := time.NewTicker(time.Duration(window) * time.Second)
prev := rlw.history
for range ticker.C {
var counter uint32
if rlw.history > prev {
counter = rlw.history - prev
} else {
// when Hasty and rate limit is hit for the minute is hit before 60s ,
// we need to reset this because we're using uint
counter = 0
}
if rlw.verbose {
rlw.logger.Printf(
"Completed %d tasks in the last window(%d seconds) , targetRPS = %d , currentRPS = %d , completedTaskCountThisMinute = %d \n",
counter, window, rlw.targetRPS, counter/uint32(window), rlw.history)
}
// update prev to track window counter
prev = rlw.history
}
}()
// essential, this resets history and helps stay within RPMLimit
go func() {
monitor := time.NewTicker(1 * time.Minute)
for range monitor.C {
if rlw.verbose {
rlw.logger.Printf(
"Period Stats(last 1 minute): noOfReqSent = %d , rateLimit(per min) = %d \n",
rlw.history, rlw.TargetRPM,
)
}
// reset history every minute
atomic.StoreUint32(&rlw.history, 0)
}
}()
}