-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move adaptive sampling processor #1179
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1179 +/- ##
==========================================
+ Coverage 99.82% 99.83% +<.01%
==========================================
Files 173 179 +6
Lines 8203 8550 +347
==========================================
+ Hits 8189 8536 +347
Misses 7 7
Partials 7 7
Continue to review full report at Codecov.
|
// samplingCacheEntry keeps track of the probability and whether a service-operation is using adaptive sampling | ||
type samplingCacheEntry struct { | ||
probability float64 | ||
usingAdapative bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/usingAdapative/usingAdaptive
|
||
package adaptive | ||
|
||
// samplingCacheEntry keeps track of the probability and whether a service-operation is using adaptive sampling |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"whether a service-operation is using adaptive sampling" - in what sense, configured to use or observed to use?
|
||
// Factory implements strategystore.Factory for an adaptive strategy store. | ||
type Factory struct { | ||
options *Options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need a pointer (faster access)
// before. | ||
if len(p.serviceCache) > 1 { | ||
if e := p.serviceCache[1].Get(service, operation); e != nil { | ||
return e.usingAdapative && e.probability != p.DefaultSamplingProbability |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
call floatEquals for the right side?
return false | ||
} | ||
|
||
// generateStrategyResponses generates a SamplingStrategyResponse from the calculated sampling probabilities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generates and caches
"sync" | ||
) | ||
|
||
// weightsCache stores normalizing weights of different lengths. The head of the weights slice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
weight vectors of different lengths. Could've used 'vector' in the type name as well, makes it much clearer.
} | ||
l := float64(length) | ||
// closed form of sum l^4 ie 1^4 + 2^4 + ... + l^4 | ||
sum := (l / 30) * (l + 1) * ((2 * l) + 1) * ((3 * l * l) + (3 * l) - 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have no idea what's going on here
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
Signed-off-by: Won Jun Jang <wjang@uber.com>
9b16980
to
b00cd26
Compare
Signed-off-by: Yuri Shkuro <ys@uber.com>
Signed-off-by: Yuri Shkuro <ys@uber.com>
Signed-off-by: Yuri Shkuro <ys@uber.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once merged, please update status in #365. Aside from wiring into main, is anything else missing?
|
||
// CreateStrategyStore implements strategystore.Factory | ||
func (f *Factory) CreateStrategyStore() (strategystore.StrategyStore, error) { | ||
// TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so there's more to come?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@yurishkuro I can see that in processor.go, the NewProcessor()
(https://github.com/jaegertracing/jaeger/blob/master/plugin/sampling/strategystore/adaptive/processor.go#L119-L152) is already returning a &processor{}
that has implemented the GetSamplingStrategy
(https://github.com/jaegertracing/jaeger/blob/master/plugin/sampling/strategystore/adaptive/processor.go#L155-L162) and satisfies the StrategyStore
interface. We no longer use the Factory interface anywhere inside the adaptive sampling directory (https://github.com/jaegertracing/jaeger/tree/master/plugin/sampling/strategystore/adaptive). Do we need an implementation for Factory{} inside adaptive package or can we proceed without it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
individual factories are used by the meta-factory plugin/sampling/strategystore/factory.go
:
var allSamplingTypes = []string{staticStrategyStoreType} // TODO support adaptive |
// Increase this to reduce the amount of fluctuation in the probability calculation. | ||
QPSEquivalenceThreshold float64 | ||
|
||
// CalculationInterval determines how often new probabilities are calculated. It was doubles as the interval |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was doubles -> It doubles ?
// the CalculationInterval is 1 minute (each bucket contains 1 minute of thoughput data) and the | ||
// AggregationBuckets is 3, the adaptive sampling processor will keep at most 3 buckets in memory for | ||
// all operations. | ||
// TODO(wjang): Expand on why this is needed when BucketsForCalculation seems to suffice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, please elaborate on the use case when BucketsForCalculation needs to be less than AggregationBuckets
return math.Abs(actual-expected)/expected < p.DeltaTolerance | ||
} | ||
|
||
func combineProbabilities(p1 map[string]struct{}, p2 map[string]struct{}) map[string]struct{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand this:
- you are keeping a set of probability values expressed as strings, and here you are joining two sets
- the values are kept in order to decide if the given sampler is using adaptive sampling strategy, by comparing the sampling rate found in the root span's tag with these values
Could you please add this somewhere as a comment, e.g. to the sampling/model Throughput.Probabilities
field?
defaultFollowerProbabilityInterval = 20 * time.Second | ||
|
||
// The number of past entries for samplingCache the leader keeps in memory | ||
serviceCacheSize = 25 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems only serviceCacheSize[0]
and serviceCacheSize[1]
are ever used, is that true? The latter is checked explicitly in isUsingAdaptiveSampling
, and the former is used indirectly via prependServiceCache
when it eventually moves to position 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we only use the most recent two entries in the serviceCache. We can probably remove the older entries in the cache. I'll add a TODO for now because there might've been a reason my younger and smarter self did this.
weights = append(weights, w) | ||
sum += w | ||
} | ||
// normalize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the previous reference to "close form sum" was rather confusing. I changed to explicit normalization step, please verify.
|
||
func (p *processor) calculateProbability(service, operation string, qps float64) float64 { | ||
oldProbability := p.InitialSamplingProbability | ||
// TODO: is this loop overly expensive? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which loop is this referring to?
|
||
getThroughputErrMsg = "failed to get throughput from storage" | ||
|
||
defaultFollowerProbabilityInterval = 20 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should also be configurable.
} else { | ||
newProbability = p.probabilityCalculator.Calculate(p.TargetSamplesPerSecond, qps, oldProbability) | ||
} | ||
return math.Min(maxSamplingProbability, math.Max(p.MinSamplingProbability, newProbability)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the maxSamplingProbability
const is superfluous, this stmt would read better if you simply use 1.0
going to land and address comments and wire things up in next PR |
Signed-off-by: Won Jun Jang wjang@uber.com
This PR is part 2 of moving adaptive sampling (#365) over to OSS. This is moving over processor and related components.