Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better behaviour in the presence of 429s #786

Merged
merged 30 commits into from
Sep 13, 2018

Conversation

j-baker
Copy link
Contributor

@j-baker j-baker commented Aug 2, 2018

If a client sends requests at a rate of 40 per second, but the server
demands only 20 per second by sending back 429s, the default behaviour
should be that the server should smooth the client's load down to 20
per second.

Right now this does not work, because we use exponential backoff with
a maximum number of retries, and each call is independent, so each
individual request is as likely to succeed as any other.

What this means is that given sustained attempted load of 40 per second
and a limit of 20 requests per second, eventually some of the calls
will fail.

You can see this most easily right now with an endpoint that takes 1ms
to return with a rate limit of 1 request per second, which will fail
within a few seconds.

This PR attempts to fix the issue. It uses Netflix's concurrency-limits
library to apply a tcp-like strategy to request concurrency; basically
provided a single request per second won't be limited into oblivion,
you can make progress.

I also include a test that will fail immediately unless this concurrency
limiting code exists.

At the moment, the scope is per-endpoint - I don't know how to do
better. We could later on define 'limiting domains' which get somehow
annotated by the service author according to rate limit groupings.

Note that we'd have to modify our internal retrofit codegen to properly
add the path template header.

There is scope to improve this later - particularly to do this on the
server too, and prioritise interactive over batch traffic.

The main downside of this code is that you have to make sure you always
clean up - otherwise you have a resource leak. In order to avoid this,
there is code which cleans up in the case of the resource leak, logging
at the same time. This should avoid the IOException deadlock pain
we've seen in the past.

I assert that:

If a client sends requests at a rate of 40 per second, but the server
demands only 20 per second by sending back 429s, the default behaviour
should be that the server should smooth the client's load down to 20
per second.

Right now this does not work, because we use exponential backoff with
a maximum number of retries, and each call is independent, so each
individual request is as likely to succeed as any other.

What this means is that given sustained attempted load of 40 per second
and a limit of 20 requests per second, eventually some of the calls
_will_ fail.

You can see this most easily right now with an endpoint that takes 1ms
to return with a rate limit of 1 request per second, which will fail
within a few seconds.

This PR attempts to fix the issue. It uses Netflix's concurrency-limits
library to apply a tcp-like strategy to request concurrency; basically
provided a single request per second won't be limited into oblivion,
you can make progress.

I also include a test that will fail immediately unless this concurrency
limiting code exists.

At the moment, the scope is per-endpoint - I don't know how to do
better. We could later on define 'limiting domains' which get somehow
annotated by the service author according to rate limit groupings.

Note that we'd have to modify our internal retrofit codegen to properly
add the path template header.

There is scope to improve this later - particularly to do this on the
server too, and prioritise interactive over batch traffic.

The main downside of this code is that you have to make sure you always
clean up - otherwise you have a resource leak. In order to avoid this,
there is code which cleans up in the case of the resource leak, logging
at the same time. This should avoid the IOException deadlock pain
we've seen in the past.
@j-baker j-baker requested a review from a team as a code owner August 2, 2018 02:45
@j-baker
Copy link
Contributor Author

j-baker commented Aug 2, 2018

Note - testing is insufficient right now - this is to get validation on the approach, though I would like to get this merged soon :)

@j-baker j-baker changed the title Better behaviour in the presence of backing off Better behaviour in the presence of 429s Aug 2, 2018
@@ -87,7 +87,7 @@ public static ClientConfiguration of(
.enableGcmCipherSuites(DEFAULT_ENABLE_GCM_CIPHERS)
.proxy(ProxySelector.getDefault())
.proxyCredentials(Optional.empty())
.maxNumRetries(uris.size())
.maxNumRetries(DEFAULT_MAX_NUM_RETRIES)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uhm, I thought we had merged such a change already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but we did it in only one of the two places (see above in this class)


public AsyncLimiter limiter(Request request) {
final String limiterKey;
String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a bit dodgy

return limiter(limiterKey);
}

static final class AsyncLimiter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this construction is pretty complicated. minimally needs some docs to explain what's going on.

private final Queue<SettableFuture<Limiter.Listener>> waitingRequests = new LinkedBlockingQueue<>();
private final Limiter<Void> limiter;

public AsyncLimiter(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to make the limiter asynchronous? from https://github.com/Netflix/concurrency-limits/blob/master/concurrency-limits-core/src/main/java/com/netflix/concurrency/limits/Limiter.java it seems like Limiter#acquire returns immediately?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i see, nvm.

if (head == null) {
acquired.onIgnore();
} else {
head.set(acquired);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this construction satisfy some basic fairness properties, i.e., every request will get scheduled/acquired eventually?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, via the FIFO queue

return;
}
Limiter.Listener acquired = maybeAcquired.get();
SettableFuture<Limiter.Listener> head = waitingRequests.poll();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would prefer to make the multi-threadedness here a little easier to understand by calling poll only

SettableFuture<> head;
// Note that different threads may be executing processQueue; this is safe because ...
while( (head = waitingRequests.poll()) != null ) {
  ..
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah but if you do that, you're not guaranteed to have any permits to give them

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see. then add a comment, please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

@Override
public void onResponse(Call call, Response response) throws IOException {
// Relay successful responses
if (response.code() / 100 <= 2) {
callback.onResponse(call, response);
listener.onSuccess();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the construction here is very brittle because you need to trace that the Listener is released (onSuccess, onDropped) on all code paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i'm gonna clean that up

@@ -236,14 +272,18 @@ public Void visit(QosException.Throttle exception) {
}

Duration backoff = exception.getRetryAfter().orElse(nonAdvertizedBackoff.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

difficult to grok what the interplay is between the Limiter and the backoffStrategy. they seem duplicative.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and yet, they are not :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly

* limitations under the License.
*/

package com.palantir.remoting3.okhttp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iamdanfox what's the deal with remoting-vs-conjure in PRs?

}

private Limiter.Listener wrap(
Map<Limiter.Listener, Runnable> activeListeners, Limiter.Listener listener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove activeListeners param?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe even make ConcurrencyLimiter non-static?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


private final ConcurrentMap<String, ConcurrencyLimiter> limiters = new ConcurrentHashMap<>();

private static Limiter<Void> newLimiter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inline in ConcurrencyLimiter constructor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


ConcurrencyLimiter limiter(Request request) {
final String limiterKey;
String pathTemplate = request.header(OkhttpTraceInterceptor.PATH_TEMPLATE_HEADER);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still dodgy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see a way of avoiding this. It seems reasonable to do this by endpoint, and if you do that you end up with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could also see this being something that uses a dynamic proxy which makes it much easier to limit per method or per some annotation. think the only sad thing about this is relying on the tracing header which is only every passed around internally (never sent across the wire)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. But then I'd rename the code bits so that they're no longer "trace"-specific. Probably also need to stop deleting the header in the trace-specific code path

public void run() {
for (int i = 0; i < 1001;) {
Limiter.Listener listener = Futures.getUnchecked(limiter.acquire());
//System.out.println(i);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

meep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to rewrite these tests into a kinda contract test thing.

public void onFailure(Throwable t) {
callback.onFailure(
RemotingOkHttpCall.this,
new IOException(new AssertionError("This should never happen", t)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explain a bit more in the message please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


/**
* Remoting calls may observe 429 or 503 responses from the server, at which point they back off in order to
* reduce excess load. Unfortunately this state on backing off is stored per-call, so 429s or 503s in one call do not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe just describe how it works rather lamenting ("Unfortunately") the decomp?

Flow control in Conjure is a collaborative effort between servers and clients: Servers advertise an overloaded state via 429/503 responses, and clients throttle the number of requests they are sending. The latter is implemented as a combination of two techniques, yielding a mechanism similar to flow control in TCP/IP: First, clients use the frequency of 429/503 responses to determine an estimate for the number of permissible concurrent requests. Second, each such request gets scheduled according to an exponential backoff algorithm. This class provides an asynchronous implementation of Netflix's concurrency-limits library for determining the above-mentioned concurrency estimates. [...]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, thanks

Copy link
Contributor

@uschi2000 uschi2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the functionality. Decomp-wise, I sort of wonder if we should roll the limit and backoff functionality into the same class? Don't know exactly what that would look like.

Waiting for @iamdanfox 's feedback to see where we should merge this.

@j-baker
Copy link
Contributor Author

j-baker commented Aug 3, 2018

got fixes to this comin in soon

@ellisjoe
Copy link
Contributor

ellisjoe commented Sep 3, 2018

have you seen the concurrency limiting in Dispatcher? https://github.com/square/okhttp/blob/master/okhttp/src/main/java/okhttp3/Dispatcher.java

It also seems like ideally we'd be able to implement the concurrency limiting at the Call level. You'd end up with a ForwardingCall that looks something like:

public Response execute() {
    Limiter limiter = limiters.limit(pathTemplateHeader);
    Response response = delegate.execute();
    if (response / 100 == 2) {
        limiter.success();
    } else if (....) {
        limiter.dropped();
    } else {
        limiter.ignored();
    }
}

which lets you avoid the async extras. Doesn't necessarily need to be exactly this, but probably worth discussing an option like this?

@ellisjoe
Copy link
Contributor

ellisjoe commented Sep 4, 2018

* An interceptor for limiting the concurrency of requests to an endpoint.
*
* Requests must be tagged (before reaching this point) with a ConcurrencyLimitTag. At this point, we block on
* receiving a permit to run the request, and store the listener in the tag.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to just use the QosHandler directly in here rather than passing around this tag and requiring it to be set

return chain.proceed(chain.request());
}

public static Callback wrapCallback(Callback callback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't follow why this is necessary?

}
}

private static final class ResourceDeallocator extends AsyncTimeout {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think it's worth worrying about this case. if clients aren't releasing resources properly they're going to lock things up eventually anyway. At a minimum it should be a separate change from the concurrency limiting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discussed more in person: going to timeout on acquiring a limit vs releasing a limit which has the added benefit of always allowing requests through after some period of time

* <li>Change code style to match Palantir baseline.</li>
* </ol>
*/
final class RemotingConcurrencyLimiter implements Limiter<Void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how to review this. Are the things listed above expected to merge upstream so we can remove this class at some point?

@j-baker j-baker changed the base branch from develop to remoting September 13, 2018 14:34
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (method.getName().equals("close") && !closed) {
closed = true;
listener.onSuccess();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the closing mechanism is now implicit, in that as long as the response body is closed (either by reading it fully during json object mapping, or via inputstream.close() if streaming and want to finish earlier) we tag as successful?

Is it viable at some point to have more control over this? One thing that occasionally happens when streaming is your stream ends too early because an error was encountered once some data (esp. headers) was already sent. We would preferably mark those as failed.

* 429 and 503 response codes are used for backpressure, whilst 200 -> 399 request codes are used for determining
* new limits and all other codes are not factored in to timings.
* <p>
* Concurrency permits are only released when the response body is closed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be more visible; the last sentence on a package-private class is not exactly discoverable. Perhaps javadoc on RetrofitClient.create (since that's the clients we use for streaming, I believe?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just put a section about this in the README too

@iamdanfox iamdanfox merged commit 14d8533 into remoting Sep 13, 2018
dansanduleac pushed a commit that referenced this pull request Sep 13, 2018
* Expose HostMetricsRegistry record methods (#780)

This is so that we can separately implement a HostMetricsSink for it (see #779) such that we can share host metrics when constructing both conjure and remoting3 clients

* publish BOM for jaxrs-client (#789)

* Excavator: Render CircleCI file using template specified in .circleci/template.sh (#791)

* Upgrade OkHttp to 3.11.0. (#794)

* AssertionErrors are converted into service exceptions with type internal (#727)

* No more ThreadLocals, delegate everything to 'palantir/tracing-java' (#799)

* Use BINTRAY_*_REMOTING envs (#802)

The project's default bintray creds are currently set up to publish to `conjure-java-runtime`.
Use these custom env vars to maintain ability to publish http-remoting.

* Better behaviour in the presence of 429s (#786)
@iamdanfox iamdanfox deleted the jbaker/better_429_behaviour branch September 14, 2018 09:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants