Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Rate limiting sampler #819

Merged
merged 16 commits into from Dec 11, 2018
14 changes: 14 additions & 0 deletions brave/README.md
Expand Up @@ -243,6 +243,14 @@ By default, there's a global sampler that applies a single rate to all
traced operations. `Tracing.Builder.sampler` is how you indicate this,
and it defaults to trace every request.

For example, to choose 10 traces every second, you'd initialize like so:
```java
tracing = Tracing.newBuilder()
.sampler(RateLimitingSampler.create(10))
--snip--
.build();
```

### Declarative sampling

Some need to sample based on the type or annotations of a java method.
Expand Down Expand Up @@ -990,3 +998,9 @@ recorded (such as is the case on spans intentionally dropped).
### Public namespace
Brave 4's public namespace is more defensive that the past, using a package
accessor design from [OkHttp](https://github.com/square/okhttp).

### Rate-limiting sampler
`RateLimitingSampler` was made to allow Amazon X-Ray rules to be
expressed in Brave. We considered their [Reservoir design](https://github.com/aws/aws-xray-sdk-java/blob/2.0.1/aws-xray-recorder-sdk-core/src/main/java/com/amazonaws/xray/strategy/sampling/reservoir/Reservoir.java).
Our implementation differs as it removes a race condition and attempts
to be more fair by distributing accept decisions every decisecond.
122 changes: 122 additions & 0 deletions brave/src/main/java/brave/sampler/RateLimitingSampler.java
@@ -0,0 +1,122 @@
package brave.sampler;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* The rate-limited sampler allows you to choose an amount of traces to accept on a per-second
* interval. The minimum number is 1 and there is no maximum.
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>For example, to allow 10 traces per second, you'd initialize the following:
* <pre>{@code
* tracingBuilder.sampler(RateLimitingSampler.create(10));
* }</pre>
*
* <h3>Appropriate Usage</h3>
*
* <p>If the rate is 10 or more traces per second, an attempt is made to distribute the accept
* decisions equally across the second. For example, if the rate is 100, 10 will pass every
* decisecond as opposed to bunching all pass decisions at the beginning of the second.
*
* <p>This sampler is efficient, but not as efficient as the {@link brave.sampler.BoundarySampler}.
* However, this sampler is insensitive to the trace ID and will operate correctly even if they are
* not perfectly random.
*
* <h3>Implementation</h3>
*
* <p>The implementation uses {@link System#nanoTime} and tracks how many yes decisions occur
* across a second window. When the rate is at least 10/s, the yes decisions are equally split over
* 10 deciseconds, allowing a roll-over of unused yes decisions up until the end of the second.
*/
public class RateLimitingSampler extends Sampler {
public static Sampler create(int tracesPerSecond) {
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
if (tracesPerSecond < 0) throw new IllegalArgumentException("tracesPerSecond < 0");
if (tracesPerSecond == 0) return Sampler.NEVER_SAMPLE;
return new RateLimitingSampler(tracesPerSecond);
}

static final long NANOS_PER_SECOND = TimeUnit.SECONDS.toNanos(1);
static final int NANOS_PER_DECISECOND = (int) (NANOS_PER_SECOND / 10);

final MaxFunction maxFunction;
final AtomicInteger usage = new AtomicInteger(0);
final AtomicLong nextReset;

RateLimitingSampler(int tracesPerSecond) {
codefromthecrypt marked this conversation as resolved.
Show resolved Hide resolved
this.maxFunction =
tracesPerSecond < 10 ? new LessThan10(tracesPerSecond) : new AtLeast10(tracesPerSecond);
long now = System.nanoTime();
this.nextReset = new AtomicLong(now + NANOS_PER_SECOND);
}

@Override public boolean isSampled(long ignoredTraceId) {
long now = System.nanoTime();
long updateAt = nextReset.get();

long nanosUntilReset = -(now - updateAt); // because nanoTime can be negative
boolean shouldReset = nanosUntilReset <= 0;
if (shouldReset) {
if (nextReset.compareAndSet(updateAt, updateAt + NANOS_PER_SECOND)) {
usage.set(0);
}
}

int max = maxFunction.max(shouldReset ? 0 : nanosUntilReset);
int prev, next;
do { // same form as java 8 AtomicLong.getAndUpdate
prev = usage.get();
next = prev + 1;
if (next > max) return false;
} while (!usage.compareAndSet(prev, next));
return true;
}

static abstract class MaxFunction {
/** @param nanosUntilReset zero if was just reset */
abstract int max(long nanosUntilReset);
}

/** For a reservoir of less than 10, we permit draining it completely at any time in the second */
static final class LessThan10 extends MaxFunction {
final int tracesPerSecond;

LessThan10(int tracesPerSecond) {
this.tracesPerSecond = tracesPerSecond;
}

@Override int max(long nanosUntilReset) {
return tracesPerSecond;
}
}

/**
* For a reservoir of at least 10, we permit draining up to a decisecond watermark. Because the
* rate could be odd, we may have a remainder, which is arbitrarily available. We allow any
* remainders in the 1st decisecond or any time thereafter.
*
* <p>Ex. If the rate is 10/s then you can use 1 in the first decisecond, another 1 in the 2nd,
* or up to 10 by the last.
*
* <p>Ex. If the rate is 103/s then you can use 13 in the first decisecond, another 10 in the
* 2nd, or up to 103 by the last.
*/
static final class AtLeast10 extends MaxFunction {
final int[] max;

AtLeast10(int tracesPerSecond) {
int tracesPerDecisecond = tracesPerSecond / 10, remainder = tracesPerSecond % 10;
max = new int[10];
max[0] = tracesPerDecisecond + remainder;
for (int i = 1; i < 10; i++) {
max[i] = max[i - 1] + tracesPerDecisecond;
}
}

@Override int max(long nanosUntilReset) {
int decisecondsUntilReset = ((int) nanosUntilReset / NANOS_PER_DECISECOND);
int index = decisecondsUntilReset == 0 ? 0 : 10 - decisecondsUntilReset;
return max[index];
}
}
}
6 changes: 5 additions & 1 deletion brave/src/main/java/brave/sampler/Sampler.java
Expand Up @@ -34,7 +34,11 @@ public abstract class Sampler {
}
};

/** Returns true if the trace ID should be measured. */
/**
* Returns true if the trace ID should be measured.
*
* @param traceId The trace ID to be decided on, can be ignored
*/
public abstract boolean isSampled(long traceId);

/**
Expand Down
114 changes: 114 additions & 0 deletions brave/src/test/java/brave/sampler/RateLimitingSamplerTest.java
@@ -0,0 +1,114 @@
package brave.sampler;

import java.util.Random;
import org.junit.Test;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

import static brave.sampler.RateLimitingSampler.NANOS_PER_DECISECOND;
import static brave.sampler.RateLimitingSampler.NANOS_PER_SECOND;
import static brave.sampler.SamplerTest.INPUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.powermock.api.mockito.PowerMockito.when;

@RunWith(PowerMockRunner.class)
// Added to declutter console: tells power mock not to mess with implicit classes we aren't testing
@PowerMockIgnore({"org.apache.logging.*", "javax.script.*"})
@PrepareForTest(RateLimitingSampler.class)
public class RateLimitingSamplerTest {

@Test public void samplesOnlySpecifiedNumber() {
mockStatic(System.class);
when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
Sampler sampler = RateLimitingSampler.create(2);

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + 1);
assertThat(sampler.isSampled(0L)).isTrue();
when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + 2);
assertThat(sampler.isSampled(0L)).isTrue();
when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + 2);
assertThat(sampler.isSampled(0L)).isFalse();
}

@Test public void resetsAfterASecond() {
mockStatic(System.class);

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
Sampler sampler = RateLimitingSampler.create(10);
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isFalse();

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_DECISECOND);
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isFalse();

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_DECISECOND * 9);
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isTrue();
assertThat(sampler.isSampled(0L)).isFalse();

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_SECOND);
assertThat(sampler.isSampled(0L)).isTrue();
}

@Test public void allowsOddRates() {
mockStatic(System.class);

when(System.nanoTime()).thenReturn(NANOS_PER_SECOND);
Sampler sampler = RateLimitingSampler.create(11);
when(System.nanoTime()).thenReturn(NANOS_PER_SECOND + NANOS_PER_DECISECOND * 9);
for (int i = 0; i < 11; i++) {
assertThat(sampler.isSampled(0L))
.withFailMessage("failed after " + (i + 1))
.isTrue();
}
assertThat(sampler.isSampled(0L)).isFalse();
}

@Test public void worksOnRollover() {
mockStatic(System.class);
when(System.nanoTime()).thenReturn(-NANOS_PER_SECOND);
Sampler sampler = RateLimitingSampler.create(2);
assertThat(sampler.isSampled(0L)).isTrue();

when(System.nanoTime()).thenReturn(-NANOS_PER_SECOND / 2);
assertThat(sampler.isSampled(0L)).isTrue(); // second request

when(System.nanoTime()).thenReturn(-NANOS_PER_SECOND / 4);
assertThat(sampler.isSampled(0L)).isFalse();

when(System.nanoTime()).thenReturn(0L); // reset
assertThat(sampler.isSampled(0L)).isTrue();
}

@DataPoints public static final int[] SAMPLE_RESERVOIRS = {1, 10, 100};

@Theory public void retainsPerSampleRate(int reservoir) {
Sampler sampler = RateLimitingSampler.create(reservoir);

// parallel to ensure there aren't any unsynchronized race conditions
long passed = new Random().longs(INPUT_SIZE).parallel().filter(sampler::isSampled).count();

assertThat(passed).isEqualTo(reservoir);
}

@Test public void zeroMeansDropAllTraces() {
assertThat(RateLimitingSampler.create(0)).isSameAs(Sampler.NEVER_SAMPLE);
}

@Test(expected = IllegalArgumentException.class)
public void tracesPerSecond_cantBeNegative() {
RateLimitingSampler.create(-1);
}
}
6 changes: 6 additions & 0 deletions instrumentation/benchmarks/pom.xml
Expand Up @@ -37,6 +37,12 @@
<artifactId>log4j-core</artifactId>
</dependency>

<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-xray-recorder-sdk-core</artifactId>
<version>2.1.0</version>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>brave-instrumentation-http</artifactId>
Expand Down