-
Notifications
You must be signed in to change notification settings - Fork 919
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Provides a throttling strategy based on the number of pending blocking tasks. #4073
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4073 +/- ##
===========================================
+ Coverage 0 74.00% +74.00%
- Complexity 0 18099 +18099
===========================================
Files 0 1530 +1530
Lines 0 67141 +67141
Branches 0 8479 +8479
===========================================
+ Hits 0 49686 +49686
- Misses 0 13392 +13392
- Partials 0 4063 +4063 ☔ View full report in Codecov by Sentry. |
...main/java/com/linecorp/armeria/server/throttling/BlockingTaskLimitingThrottlingStrategy.java
Outdated
Show resolved
Hide resolved
.../java/com/linecorp/armeria/server/throttling/BlockingTaskLimitingThrottlingStrategyTest.java
Outdated
Show resolved
Hide resolved
e48a0b9
to
30d2de4
Compare
...main/java/com/linecorp/armeria/server/throttling/BlockingTaskLimitingThrottlingStrategy.java
Outdated
Show resolved
Hide resolved
...main/java/com/linecorp/armeria/server/throttling/BlockingTaskLimitingThrottlingStrategy.java
Outdated
Show resolved
Hide resolved
This PR has an inevitable race condition where the number of pending tasks can exceed the desired limit. How about providing an API that wraps a given myWrappedExecutor = ThrottlingStrategy.wrap(myExecutor, 10 /* or myLimitSupplier */)
Server
.builder()
.blockingTaskExecutor(myWrappedExecutor)
.decorator(myWrappedExecutor.asDecorator())
// or
.decorator(ThrottlingService.newDecorator(myWrappedExecutor.throttlingStrategy()))
... |
Let me try! Thank you @trustin |
@TheWeaVer Great! Let me stay tuned ❤️ |
2ec2463
to
b94e0f4
Compare
@@ -113,6 +116,18 @@ public static <T extends Request> ThrottlingStrategy<T> rateLimiting( | |||
return new RateLimitingThrottlingStrategy<>(requestsPerSecond, name); | |||
} | |||
|
|||
/** | |||
* Returns a new {@link ThrottlingStrategy} that provides a throttling strategy based on given | |||
* {@link SettableIntSupplier} by comparing it to the size of the tasks of the {@link BlockingTaskExecutor}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* {@link SettableIntSupplier} by comparing it to the size of the tasks of the {@link BlockingTaskExecutor}. | |
* {@link IntSupplier} by comparing it to the size of the tasks of the {@link BlockingTaskExecutor}. |
* @param limitSupplier the {@link IntSupplier} which indicates limit of the tasks | ||
* @param name the name of the {@link ThrottlingStrategy} | ||
*/ | ||
public static <T extends Request> ThrottlingStrategy<T> blockingTaskLimiting( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also add a version that accepts an int
instead of IntSupplier
for a simpler use case where a user just wants to enforce a static limit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add new method 😄
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the initiative!
I think as it stands, it is difficult to keep sync between the executor state and our DefaultBlockingTaskExecutor
.
What do you think of simply following micrometer's approach and try to access the number of queued tasks directly like suggested here: #4073 (comment)
For instance, I think it's fine to watch (ThreadPoolExecutor#getQueue#size
, ForkJoinPool::getQueuedTaskCount
) and throttle requests based on this value.
If an unsupported ScheduledExecutorService
is used to throttle requests, I think we can just throw an exception on initialization.
Let me know what you think @line/dx
Nevermind, I saw this comment late.
try { | ||
command.run(); | ||
} finally { | ||
taskCounter.decrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think taskCounter
will be decremented on each scheduled task invocation, but no increment occurs - this may not correctly reflect the number of pending tasks
} | ||
}); | ||
submitted = true; | ||
return future; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If future.cancel
is called immediately without given a chance to run, I think it is possible that taskCounter
is incremented only without decrementing.
I think we may also need to wrap the returned future and check if cancel
is called successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i.e. We may need something like this:
class BlockingFuture<T> implements ScheduledFuture<T> {
private final Future<T> delegate;
BlockingFuture(Future<T> delegate) {
this.delegate = delegate;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = delegate.cancel(mayInterruptIfRunning);
if (cancelled) {
taskCounter.decrementAndGet();
}
return cancelled;
}
🔍 Build Scan® (commit: b9e34a1) |
Can you also check the CI failures? I think this PR should be almost done once my comments are addressed 🙇 |
Motivation:
Give a way to blocking request throttling strategy to prevent the situation that worsens rapidly
Modifications:
Result: