Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PinUntilErrorChannel doesn't switch on 429 #661

Merged
merged 6 commits into from
Apr 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/@unreleased/pr-661.v2.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type: fix
fix:
description: PinUntilErrorChannel doesn't switch on 429, to unblock transactional
workflows
links:
- https://github.com/palantir/dialogue/pull/661
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,13 @@ public Optional<ListenableFuture<Response>> maybeExecute(Endpoint endpoint, Requ
.map(future -> DialogueFutures.addDirectCallback(future, new FutureCallback<Response>() {
@Override
public void onSuccess(Response response) {
if (Responses.isQosStatus(response) || Responses.isServerError(response)) {
// We specifically don't switch 429 responses to support transactional
// workflows where it is important for a large number of requests to all land on the same node,
// even if a couple of them get rate limited in the middle.
if (Responses.isServerError(response)
|| (Responses.isQosStatus(response) && !Responses.isTooManyRequests(response))) {
OptionalInt next = incrementHostIfNecessary(currentIndex);
instrumentation.receivedErrorStatus(currentIndex, channel, response, next);
// TODO(dfox): handle 308 See Other somehow, as we currently don't have a host -> channel
carterkozak marked this conversation as resolved.
Show resolved Hide resolved
// mapping
} else {
instrumentation.successfulResponse(currentIndex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,31 @@ public void channels_are_shuffled_initially_and_successful_requests_stay_on_chan

@Test
public void various_error_status_codes_cause_node_switch() {
testStatusCausesNodeSwitch(429);
testStatusCausesNodeSwitch(308);
testStatusCausesNodeSwitch(503);
for (int errorStatus = 500; errorStatus < 600; errorStatus++) {
testStatusCausesNodeSwitch(errorStatus);
}
}

@Test
void http_429_responses_do_not_cause_node_switch() {
setResponse(channel1, 100);
setResponse(channel2, 204);

assertThat(IntStream.range(0, 6).map(number -> getCode(pinUntilErrorWithoutReshuffle)))
.describedAs("Should be locked on to channel2 initially")
.contains(204, 204, 204, 204, 204, 204);

setResponse(channel2, 429);

assertThat(IntStream.range(0, 6).map(number -> getCode(pinUntilErrorWithoutReshuffle)))
.describedAs("Even after receiving a 429, we must stay pinned on the same channel to support "
+ "transactional workflows like the internal atlas-replacement, which rely on all requests "
+ "hitting the same node. See PDS-117063 for an example.")
.contains(429, 429, 429, 429, 429, 429);
}

private void testStatusCausesNodeSwitch(int errorStatus) {
before();
setResponse(channel1, 100);
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion simulation/src/test/resources/report.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
live_reloading[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=89.9% client_mean=PT3.8310808S server_cpu=PT1H55M21.66S client_received=2500/2500 server_resps=2500 codes={200=2248, 500=252}
live_reloading[UNLIMITED_ROUND_ROBIN].txt: success=60.2% client_mean=PT2.84698S server_cpu=PT1H58M37.45S client_received=2500/2500 server_resps=2500 codes={200=1504, 500=996}
one_big_spike[CONCURRENCY_LIMITER_BLACKLIST_ROUND_ROBIN].txt: success=79.0% client_mean=PT1.478050977S server_cpu=PT1M59.71393673S client_received=1000/1000 server_resps=790 codes={200=790, Failed to make a request=210}
one_big_spike[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT1.286733552S server_cpu=PT2M48.75S client_received=1000/1000 server_resps=1125 codes={200=1000}
one_big_spike[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=100.0% client_mean=PT1.135007332S server_cpu=PT2M49.65S client_received=1000/1000 server_resps=1131 codes={200=1000}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As expected, the graph shows one big spike of requests (i.e. this exact use case) remains pinned to one uri. https://github.com/palantir/dialogue/blob/dfox/pin-until-error-fix/simulation/src/test/resources/report.md#one_big_spikeconcurrency_limiter_pin_until_error

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to update this simulation to respond 429 instead of 503

Copy link
Contributor Author

@iamdanfox iamdanfox Apr 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's intended to be representative of this exact workflow, so it responds 429 above some threshold:

 public void one_big_spike() {
        int capacity = 100;
        servers = servers(
                SimulationServer.builder()
                        .serverName("node1")
                        .simulation(simulation)
                        .handler(h -> h.respond200UntilCapacity(429, capacity).responseTime(Duration.ofMillis(150)))
                        .build(),
                SimulationServer.builder()
                        .serverName("node2")
                        .simulation(simulation)
                        .handler(h -> h.respond200UntilCapacity(429, capacity).responseTime(Duration.ofMillis(150)))
                        .build());

one_big_spike[CONCURRENCY_LIMITER_ROUND_ROBIN].txt: success=100.0% client_mean=PT0.73895799S server_cpu=PT2M45S client_received=1000/1000 server_resps=1100 codes={200=1000}
one_big_spike[UNLIMITED_ROUND_ROBIN].txt: success=100.0% client_mean=PT1.115837367S server_cpu=PT8M3.3S client_received=1000/1000 server_resps=3222 codes={200=1000}
one_endpoint_dies_on_each_server[CONCURRENCY_LIMITER_PIN_UNTIL_ERROR].txt: success=64.4% client_mean=PT2.6866096S server_cpu=PT25M client_received=2500/2500 server_resps=2500 codes={200=1611, 500=889}
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
success=100.0% client_mean=PT1.286733552S server_cpu=PT2M48.75S client_received=1000/1000 server_resps=1125 codes={200=1000}
success=100.0% client_mean=PT1.135007332S server_cpu=PT2M49.65S client_received=1000/1000 server_resps=1131 codes={200=1000}