New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Throttler #253
Implement Throttler #253
Conversation
Codecov Report
@@ Coverage Diff @@
## master #253 +/- ##
========================================
+ Coverage 84.95% 85.96% +1%
========================================
Files 51 54 +3
Lines 2705 2899 +194
========================================
+ Hits 2298 2492 +194
- Misses 287 288 +1
+ Partials 120 119 -1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative approach would be to lazily set the UUID in the throttler as done here: https://github.com/jaegertracing/jaeger-client-go/blob/master/transport_udp.go#L85
However, this would mean the API would have to change to IsThrottled(operation, uuid) which could potentially save us a lot of headache. However, this isn't compatible with the Sampler API although I could duck type the sampler to remove the circular dependency issues and then we could just initialize everything in the tracer on initialization. I'll play around with this some more
"github.com/uber/jaeger-client-go/internal/throttler" | ||
) | ||
|
||
// N.B. this class is private as it's a proof of concept of how we would throttle normal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the comment, this is just a proof of concept of how we'd throttle normal sampler traffic. (I'll remove before landing).
s.throttler.SetUUID(uuid) | ||
} | ||
|
||
func (s *throttledSampler) IsSampled(id jaeger.TraceID, operation string) (bool, []jaeger.Tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some brainstorming is required before we can actually do this. If sampled == true and isThrottled() == true, we need to emit a hint somewhere (meta-span?) so that XYS is aware that a sampled span was dropped.
} | ||
} | ||
|
||
func (s *throttledSampler) SetUUID(uuid string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to use the existing Sampler API, we would have to update it with this function since the sampler will be wrapped by the throttler in the config directory but the UUID setting occurs in the tracer which means it's too late to initialize this wrapped sampler in the tracer (and it'd cause cycling import errors).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
duck-typing sounds fine to me. However, I would combine this UUID with service name + process tags (i.e. have an internal Process struct) and have the same SetProcess
duck-typed interface shared by reporters and samplers.
Ok, I got it where we can use the throttler as a sampler without needing to update the sampler API. Only downside is that we're possibly increasing the surface area of our root Jaeger API. I actually think it makes sense to keep the throttledSampler API private. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like the implementation overall. Mostly nits.
t.uuid.Store(uuid) | ||
} | ||
|
||
// N.B. This function should be called with the Write Lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should use a different name if it changes the credits
map, something like deductCredits
makes more sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should/must - https://www.ietf.org/rfc/rfc2119.txt
func (t *Throttler) pollManager() { | ||
defer t.stopped.Done() | ||
ticker := time.NewTicker(t.refreshInterval) | ||
defer ticker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe stopping the ticker is unnecessary in a case like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it should be garbage collected as a local variable with no references to its channel. Unless this results in a leaked goroutine, which is confusing. For the record, it seems you can leak goroutines but not channels.
|
||
func (t *Throttler) fetchCredits() { | ||
t.mux.RLock() | ||
// TODO This is probably inefficient, maybe create a static slice of operations? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may not be as bad as you think. Used to do transformations like this all the time in C++ thanks to std::transform
. Even when we hit memory issues in our C++ code, std::transform
wasn't the main issue, despite allocating a temporary vector each time like this code does. IMO, allocation issues aren't really a problem for a small collection of temporaries (< 100 elements) as much as long living, tree-like recursive structures or huge blobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allocations are a problem when they are on the hot path of the business logic of the application. In the periodic background activities - not so much.
type Throttler struct { | ||
options | ||
|
||
mux sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the record, I used to love the idea of reader-writer locks in all code, then learned that it might actually perform worse than a regular mutex if the contention isn't high enough. RW locks generally have a bit more overhead because the logic to avoid writer/reader starvation is more complex. Might be worth considering and/or profiling.
|
||
mux sync.RWMutex | ||
service string | ||
uuid *atomic.String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why bother with an atomic string instead of using a regular constructor here?
tracer_options.go
Outdated
return func(tracer *Tracer) { | ||
tracer.throttler = throttler | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we doing two throttlers upfront? I thought we would use one throttler and just allow the users to define the type of throttling to do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The options are:
- Two throttlers options
- One throttler option, and a enum option of: "DEBUG", "SAMPLING", "BOTH"
I don't mind either one since both will work but I just chose the former
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we haven't even agreed that throttling regular sampling is not going to mess up xys.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tis a proof of concept, I wanted to make sure we could throttle the sampler without being too invasive. I'll be removing this code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. I was thinking more like the option based approach if we need it.
return false | ||
} | ||
|
||
func (t *Throttler) pollManager() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if polling is truly necessary. On the other hand, I believe it should be required to spawn an asynchronous credit request when an operation runs out of credits (i.e. below 1.0). Otherwise, we run the risk of making requests when we don't need credits and not requesting when we do need credits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we do this on demand, and the demand happens to be very high, there will be a lot of these async requests, each only returning a small fraction of accumulated credits from the agent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, the agent uses these requests to update the client's updateTime. If we don't ping regularly, we could potentially have the agent give the client way more credits than its due.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya I was wondering if we could combine the two somehow. I guess let's try this and see if it works well.
"sync" | ||
"time" | ||
|
||
"github.com/uber-go/atomic" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uber-go/atomic is not an existing dependency (it may be transitive of tchannel, which is not used in production), so let's not introduce it
_, ok := t.credits[operation] | ||
if !ok { | ||
// If it is the first time this operation is being checked, synchronously fetch | ||
// the credits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronously fetch
can we make this configurable? Only yab needs it synchronously, normal services could just wait for refresh?
|
||
func (t *Throttler) fetchCredits() { | ||
t.mux.RLock() | ||
// TODO This is probably inefficient, maybe create a static slice of operations? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
allocations are a problem when they are on the hot path of the business logic of the application. In the periodic background activities - not so much.
} | ||
} | ||
|
||
func (t *Throttler) fetchCredits() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
refreshCredits
} | ||
} | ||
|
||
func (t *Throttler) fetchCreditsHelper(operations []string) []creditResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetchCredits
t.uuid.Store(uuid) | ||
} | ||
|
||
// N.B. This function should be called with the Write Lock |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should/must - https://www.ietf.org/rfc/rfc2119.txt
return false | ||
} | ||
|
||
func (t *Throttler) pollManager() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we do this on demand, and the demand happens to be very high, there will be a lot of these async requests, each only returning a small fraction of accumulated credits from the agent.
func (t *Throttler) pollManager() { | ||
defer t.stopped.Done() | ||
ticker := time.NewTicker(t.refreshInterval) | ||
defer ticker.Stop() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is that?
internal/throttler/throttler.go
Outdated
io.Closer | ||
|
||
// IsThrottled determines whether the operation should be throttled. | ||
IsThrottled(operation string) bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somehow I get a feeling that this is a double negative. Wouldn't "is allowed" be better? If you look at various rate limiter implementations, their API are typically expressed in the positive outcomes, "is allowed", "how long till it's allowed".
internal/throttler/throttler.go
Outdated
// are always sampled, a throttler can be enabled per client to rate limit the amount | ||
// of debug spans a client can start. | ||
type Throttler interface { | ||
io.Closer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be better to duck type this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may I ask why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I don't really get when to do one or the other.
internal/throttler/throttler.go
Outdated
IsThrottled(operation string) bool | ||
|
||
// SetUUID sets the UUID that identifies this client instance. | ||
SetUUID(uuid string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you see my comment this morning about unifying this with SetProcess across the board?
// ProcessSetter sets a process. This can be used by any class that requires | ||
// the process to be set as part of initialization. | ||
// See internal/throttler/remote/throttler.go for an example. | ||
type ProcessSetter interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yurishkuro is this what you had in mind?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
mux sync.RWMutex | ||
service string | ||
uuid string | ||
uuidSet uint32 // 1 means uuid set, 0 means not set |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just use atomic.value for the string?
LGTM, but you might wants to wait on @yurishkuro. |
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
f0565f2
to
c088647
Compare
internal/throttler/remote/options.go
Outdated
|
||
// SynchronousInitialization creates an Option that determines whether the throttler should synchronously | ||
// fetch credits from the agent when an operation is seen for the first time. | ||
func (options) SynchronousInitialization(b bool) Option { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add some details on when it's a good idea to set this option to the docstring?
assert.Equal(t, "localhost:5778", options.hostPort) | ||
assert.Equal(t, time.Second*5, options.refreshInterval) | ||
assert.NotNil(t, options.metrics) | ||
assert.NotNil(t, options.logger) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we test the default SynchronousInitialization
?
// Process holds process specific metadata that's relevant to this client. | ||
type Process struct { | ||
Service string | ||
UUID string |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this called UUID
?
We only seem to be using it as an arbitrary identifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At one point we actually did use a UUID. Not sure what happened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should continue calling this field UUID if it's not going to be holding a UUID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See #248
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be a UUID, I just have to combine this with https://github.com/jaegertracing/jaeger-client-go/pull/248/files
} | ||
} | ||
|
||
func (t *Throttler) fetchCredits(operations []string) []creditResponse { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not return a ([]creditResponse, error) ?
Signed-off-by: Won Jun Jang <wjang@uber.com>
Adding a throttler that will allow clients to rate limit the amount of debug spans that they can generate.
Signed-off-by: Won Jun Jang wjang@uber.com