Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need help understanding why rate limiter doesn't behave like I think it should? #336

Closed
johanhaleby opened this issue Feb 8, 2019 · 24 comments

Comments

@johanhaleby
Copy link
Contributor

johanhaleby commented Feb 8, 2019

Hi,

I'm using resilience4j and the rate-limiting module together with Spring project reactor. The code I have looks essentially like this:

public class App {


    private final WebClient webClient;
    private final InMemoryRateLimiterRegistry rateLimiterRegistry;

    public App(int port, int rps) {
        webClient = WebClient.builder()
                .baseUrl("http://localhost:" + port)
                .build();

        rateLimiterRegistry = new InMemoryRateLimiterRegistry(RateLimiterConfig.ofDefaults());
        rateLimiterRegistry.rateLimiter("test", RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(1))
                .limitForPeriod(rps)
                .timeoutDuration(Duration.ofHours(1))
                .build());
    }

    public Mono<String> makeRequest() {
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test");
        Mono<String> result = webClient.post()
                .uri("/testing")
                .accept(ALL)
                .contentType(TEXT_PLAIN)
                .syncBody("hello world")
                .retrieve().bodyToMono(String.class);

        return result.transform(RateLimiterOperator.of(rateLimiter));
    }
}

I've tried creating a test case in which I'd like to verify that the rate limiter actually limits the requests per second to 10. The test case looks like this:

public class AppTest {

    @Rule
    public WireMockRule wireMockRule = new WireMockRule();

    @Test
    public void whyDoesntThisWorkQuestionMarkQuestionMark() {
        // Given
        int numberOfRequests = 30;
        int rps = 10;
        App app = new App(wireMockRule.port(), rps);

        wireMockRule.addStubMapping(stubFor(post(urlPathEqualTo("/testing"))
                .willReturn(aResponse().withStatus(200).withBody("hello hello!"))));

        // When
        ParallelFlux<String> flux = Flux.range(0, numberOfRequests)
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(___ -> app.makeRequest());

        long startTime = new Date().getTime();
        StepVerifier.create(flux).expectNextCount(numberOfRequests).verifyComplete();
        long endTime = new Date().getTime();

        assertThat(endTime - startTime)
                .describedAs("I don't understand why this is not taking longer than this")
                .isGreaterThanOrEqualTo((numberOfRequests / rps) * 1000);
    }
}

But this fails with:

whyDoesntThisWorkQuestionMarkQuestionMark(se.haleby.AppTest)  Time elapsed: 2.376 sec  <<< FAILURE!
java.lang.AssertionError: [I don't understand why this is not taking longer than this] 
Expecting:
 <1466L>
to be greater than or equal to:
 <3000L> 
        at se.haleby.AppTest.whyDoesntThisWorkQuestionMarkQuestionMark(AppTest.java:43)

I don't really understand why the test is only taking roughly 1,5 seconds instead of the expected 3 seconds (since I'm making 30 requests with an RPS of 10 requests per second). I'm probably missing something quite essential here and I'm ready to be schooled :).

My code can be found here: https://github.com/johanhaleby/resilience4j-ratelimiter-question

@johanhaleby
Copy link
Contributor Author

johanhaleby commented Feb 14, 2019

Not sure, but could this actually be a bug?

@storozhukBM
Copy link
Member

storozhukBM commented Mar 6, 2019

Hi @johanhaleby
For me it also looks like a bug. Maybe @madgnome can clarify.

@madgnome
Copy link
Contributor

madgnome commented Mar 7, 2019

It shouldn't take 3s but 2:

Given 30 requests with 10 requests max by second
T0: 10 requests
T1: 20 requests
T2: 30 requests

I need to investigate more to see why we are 600ms short.

@johanhaleby
Copy link
Contributor Author

johanhaleby commented Mar 7, 2019

@madgnome Thanks for your answer. Although I have to say that I still don't quite get it.

Given that limitForPeriod is 10 and limitRefreshPeriod is 1s I would assume this (given that the number of requests are not accumulative):

T0: 10 requests
T1: 10 requests
T2: 10 requests

How come T1 actually allows 20 requests when "limitForPeriod" is 10? Or maybe I'm miss-reading you? Either way I don't see why it should only take 2 seconds.

@johanhaleby
Copy link
Contributor Author

johanhaleby commented Mar 29, 2019

Anything I can do to help?

@RobWin
Copy link
Member

RobWin commented Apr 3, 2019

Unfortunately @madgnome seems to be busy. Can you try to debug and find the bug?

@RobWin
Copy link
Member

RobWin commented May 9, 2019

I think the issue is related to #311

@madgnome
Copy link
Contributor

madgnome commented May 9, 2019

How come T1 actually allows 20 requests when "limitForPeriod" is 10? Or maybe I'm miss-reading you?

I wasn't really clear in my explanation. Let me try again.

T0: 10 requests
T1: 20 requests
T2: 30 requests

Shows the number of requests executed since the beginning.
At 0s we can execute 10 requests, after 1s we can execute 10 more (so we have 20 requests executed already), after another second (0+1+1) we ca execute 10 more and we are at 30 requests executed in 2s and a bit.

The test would take 3s if we needed to accumulate some credit between 0 and 1s before being able to make any call. This is not the case at T0 we can make 10 requests from the get go.

@johanhaleby
Copy link
Contributor Author

johanhaleby commented May 10, 2019

@madgnome Got it, thanks a lot for the explanation.

@RobWin
Copy link
Member

RobWin commented May 17, 2019

The behavior changes in 0.15.0 because of #311
The rate limiter will only rate limit subscriptions and not events anymore.
Your test should work as expected with the new upcoming release.

@RobWin RobWin self-assigned this May 17, 2019
@RobWin RobWin added this to the 0.15.0 milestone May 17, 2019
@johanhaleby
Copy link
Contributor Author

johanhaleby commented May 17, 2019

@RobWin What's the new behavior?

@RobWin
Copy link
Member

RobWin commented May 17, 2019

What you initially expected.
The MonoRateLimiter tries to acquire a permission before it subscribes to the upstream Flux.

If the rate limiter rejects it, a RequestNotPermitted is forwarded to the downstream subscriber.
If the rate limiter permits it, the subscriber is allowed to subscribe to the upstream Mono.

In your case it means, you should use compose instead of transform. The RateLimiterOperator protects your upstream Mono (which is the webClient) from subscriptions when the rate limit is exceeded.

public Mono<String> makeRequest() {
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test");
        Mono<String> result = webClient.post()
                .uri("/testing")
                .accept(ALL)
                .contentType(TEXT_PLAIN)
                .syncBody("hello world")
                .retrieve().bodyToMono(String.class);

        return result.compose(RateLimiterOperator.of(rateLimiter));
    }

@johanhaleby
Copy link
Contributor Author

johanhaleby commented May 17, 2019

@RobWin Thank you!

@johanhaleby johanhaleby added this to Done in Release 0.15.0 via automation May 17, 2019
@johanhaleby johanhaleby reopened this May 20, 2019
@johanhaleby
Copy link
Contributor Author

johanhaleby commented May 20, 2019

@RobWin It still doesn't seem to work the way I would expect. From what I understand you should now catch the RequestNotPermitted exception and perform the backoff yourself. I've tried updating the example (https://github.com/johanhaleby/resilience4j-ratelimiter-question) with version 0.15.0 but as far as I can tell it's still too fast. Here's the essentials of the code now:

public class App {


    private final WebClient webClient;
    private final InMemoryRateLimiterRegistry rateLimiterRegistry;

    public App(int port, int rps) {
        webClient = WebClient.builder()
                .baseUrl("http://localhost:" + port)
                .build();

        rateLimiterRegistry = new InMemoryRateLimiterRegistry(RateLimiterConfig.ofDefaults());
        rateLimiterRegistry.rateLimiter("test", RateLimiterConfig.custom()
                .limitRefreshPeriod(Duration.ofSeconds(1))
                .limitForPeriod(rps)
                .timeoutDuration(Duration.ofHours(1))
                .build());
    }

    public Mono<String> makeRequest() {
        RateLimiter rateLimiter = rateLimiterRegistry.rateLimiter("test");
        Mono<String> result = webClient.post()
                .uri("/testing")
                .accept(ALL)
                .contentType(TEXT_PLAIN)
                .syncBody("hello world")
                .retrieve().bodyToMono(String.class);

        return result
                .compose(RateLimiterOperator.of(rateLimiter))
                .retryWhen(anyOf(RequestNotPermitted.class).exponentialBackoff(Duration.ofMillis(50), Duration.ofMillis(100)));
    }
}

The test still looks the same:

public class AppTest {

    @Rule
    public WireMockRule wireMockRule = new WireMockRule();

    @Test
    public void whyDoesntThisWorkQuestionMarkQuestionMark() {
        // Given
        int numberOfRequests = 30;
        int rps = 10;
        App app = new App(wireMockRule.port(), rps);

        wireMockRule.addStubMapping(stubFor(post(urlPathEqualTo("/testing"))
                .willReturn(aResponse().withStatus(200).withBody("hello hello!"))));

        // When
        ParallelFlux<String> flux = Flux.range(0, numberOfRequests)
                .parallel()
                .runOn(Schedulers.parallel())
                .flatMap(___ -> app.makeRequest());

        long startTime = new Date().getTime();
        StepVerifier.create(flux).expectNextCount(numberOfRequests).verifyComplete();
        long endTime = new Date().getTime();

        assertThat(endTime - startTime)
                .describedAs("I don't understand why this is not taking longer")
                .isGreaterThanOrEqualTo((numberOfRequests / rps) * 1000);
    }
}

where anyOf is a method imported from the reactor-extra project.

What am I doing wrong or misunderstanding?

@RobWin
Copy link
Member

RobWin commented May 20, 2019

Just to make sure that there is no misunderstand.
Our RateLimiter is not an implementation of a leaky bucket algorithm which leaks out at a constant rate. It uses a fixed window algorithm which can have burst effects at time window boundaries.
The RateLimiter allows 10 requests per second in your case and rejects every further call with a RequestNotPermitted.
Since reactive streams must be non-blocking, the timeoutDuration is not taken into account, but instead it's always 0.

The implementation is described here:
https://resilience4j.readme.io/docs/ratelimiter

This is a nice blog post in general: https://konghq.com/blog/how-to-design-a-scalable-rate-limiting-algorithm/

I debug your test app tomorrow and come back to you.

@RobWin
Copy link
Member

RobWin commented May 21, 2019

Some further information.
The first time window (refresh period) starts when the RateLimiter is initialized, not when the first call asks for a permission.
Since your WireMock Stub does not simulate any delay, the test can fire requests very fast.
That means it's possible that a burst of 10 requests could be processed at the end of the first time window (refresh period). The measured total time would be less than a second for the first time window, for example 500 ms.

The second time window can process 10 requests in 1 second.
The third time window can potentially process another burst of 10 requests in less than a second, for example 500ms.

The total measured time of your test would be ~2 seconds.

@johanhaleby
Copy link
Contributor Author

johanhaleby commented May 21, 2019

@RobWin Oh, that probably explains it. I think I get it now and it makes sense. Thanks a lot for taking the time to explain this, really appreciated. And thanks for all the work on resilience4j.

@RobWin
Copy link
Member

RobWin commented May 21, 2019

You can see this better when you increase the limitRefreshPeriod to 5 or 10 seconds.

When I use 5 seconds, the total measured time is: ~13 seconds (~4+5+4)
When I use 10 seconds, the total measured time is: ~23 seconds (~5+10+8)

@RobWin
Copy link
Member

RobWin commented May 21, 2019

Anyway a Leaky Bucket and a Sliding Window implementation could be interesting as well.

@martin-tarjanyi
Copy link

martin-tarjanyi commented May 21, 2019

Since reactive streams must be non-blocking, the timeoutDuration is not taken into account, but instead it's always 0.

The timeout seems to be a quite essential part of the rate limiter. In most of our use cases we don't want to reject calls, but wait instead until we have permission again. Wouldn't it be possible to implement the waiting in a non-blocking way in the reactor module?

For this use case with the current implementation you need to do some retry for the not permitted exception (like it's done in @johanhaleby's example code) which is not a fair implementation, since new calls can get ahead of older calls.

@RobWin
Copy link
Member

RobWin commented May 21, 2019

We could modify the rate limiter and test delaySubscription(Duration delay).
Not sure yet how it behaves.

@RobWin
Copy link
Member

RobWin commented May 24, 2019

I've created a PR where I test a delay of a subscription.

@martin-tarjanyi

Please see:

long waitDuration = rateLimiter.reservePermission();
if(waitDuration >= 0){
if(waitDuration > 0){
Mono.delay(Duration.ofNanos(waitDuration))
.subscribe((w) -> source.subscribe(new RateLimiterSubscriber<>(actual)));
}else{
source.subscribe(new RateLimiterSubscriber<>(actual));
}
}else{
Operators.error(actual, new RequestNotPermitted(rateLimiter));
}

@RobWin
Copy link
Member

RobWin commented May 24, 2019

That unfortunately means that the subscription happens on another thread, because I can't use Schedulers.immediate() . This scheduler is not capable of time-based scheduling.

Is that something you would expect? Mono.delay uses Schedulers.parallel() internally.

@martin-tarjanyi
Copy link

martin-tarjanyi commented May 24, 2019

Awesome!

I don't see any issue with the thread handling, right now. I plan to give it a try with my code and see how it behaves over the weekend. I will share my findings here.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Development

No branches or pull requests

5 participants