Permalink
Browse files

Adding 'fast recovery' mode to help host recover in low QPS situation

When average QPS is low for individual host, it is generally harder for the host to recover from
bad state because it has less chance to get the requests. RecoveryMap comes to help by increasing
the host points even though its callCount is 0. However the host is removed whenever it gets traffic once.
Therefore it can stay in bad state for long time if its point is still low.

This change tries to keep the host in recoveryMap for longer time even if it get traffic, until its point reaches the threshold.

RB=1048086
G=si-core-reviewers
R=ssheng,bfeng,dhoa
A=ssheng,dhoa
  • Loading branch information...
ChaoLinkedIn committed Jul 20, 2017
1 parent fc6b58c commit b4266ff07bf74d47e90d1982fcf42ec2eb0d9dfd
View
@@ -1,5 +1,8 @@
16.0.2
------
(RB=1048086)
Adding 'fast recovery' mode to help host recover in low QPS situation
16.0.1
------

Large diffs are not rendered by default.

Oops, something went wrong.
@@ -20,6 +20,7 @@
import com.linkedin.d2.discovery.event.SynchronousExecutorService;
import com.linkedin.d2.discovery.stores.mock.MockStore;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestException;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestRequestBuilder;
import com.linkedin.r2.message.rest.RestResponse;
@@ -38,6 +39,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Delayed;
@@ -57,7 +59,6 @@
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import static org.testng.Assert.assertFalse;
/**
@@ -90,7 +91,7 @@
private final SimpleLoadBalancer _loadBalancer;
private final SimpleLoadBalancerState _loadBalancerState;
private final TimedValueGenerator<String> _delayGenerator;
private final TimedValueGenerator<String, Long> _delayGenerator;
private final QPSGenerator _qpsGenerator;
private final ClockedExecutor _clockedExecutor;
@@ -121,13 +122,13 @@
/**
* For a stream of values which changes periodically, get the value at the specific time
*/
interface TimedValueGenerator<T>
interface TimedValueGenerator<T, R>
{
long getValue(T t, long time, TimeUnit unit);
R getValue(T t, long time, TimeUnit unit);
}
LoadBalancerSimulator(ServiceProperties serviceProperties, ClusterProperties clusterProperties,
UriProperties uriProperties, TimedValueGenerator<String> delayGenerator,
UriProperties uriProperties, TimedValueGenerator<String, Long> delayGenerator,
QPSGenerator qpsGenerator, EventEmitter eventEmitter) throws ExecutionException, InterruptedException
{
_executorService = new SynchronousExecutorService();
@@ -297,7 +298,8 @@ public SimpleLoadBalancerState getLoadBalancerState()
URI serviceUri = URI.create("d2://" + serviceName);
Ring<URI> ring = _loadBalancer.getRings(serviceUri).get(partition);
Map<URI, Integer> pointsMap = new HashMap<>();
Iterator<URI> iter = ring.getIterator(0);
Random random = new Random();
Iterator<URI> iter = ring.getIterator(random.nextInt());
iter.forEachRemaining(uri -> pointsMap.compute(uri, (k, v) -> v == null ? 1: v + 1));
@@ -400,7 +402,7 @@ public void run()
}
TransportCallback<RestResponse> restCallback = (response) -> {
assertFalse(response.hasError());
// assertFalse(response.hasError());
_log.debug("Got response for {} @ {}", response.getResponse(), _clockedExecutor.currentTimeMillis());
// Do nothing for now for the response
};
@@ -428,9 +430,10 @@ public void run()
public TransportClient getClient(Map<String, ? extends Object> properties)
{
ClockedExecutor clockedExecutor = (ClockedExecutor) properties.get("ClockedExecutor");
TimedValueGenerator<String> delayGen = (TimedValueGenerator<String>) properties.get("DelayGenerator");
TimedValueGenerator<String, Long> delayGen = (TimedValueGenerator<String, Long>) properties.get("DelayGenerator");
TimedValueGenerator<String, String> errorGen = (TimedValueGenerator<String, String>) properties.get("ErrorGenerator");
return new DelayClient(clockedExecutor, delayGen);
return new DelayClient(clockedExecutor, delayGen, errorGen);
}
/**
@@ -439,12 +442,14 @@ public TransportClient getClient(Map<String, ? extends Object> properties)
private class DelayClient implements TransportClient
{
final private ClockedExecutor _clockedExecutor;
final private TimedValueGenerator<String> _delayGen;
final private TimedValueGenerator<String, Long> _delayGen;
final private TimedValueGenerator<String, String> _errorGen;
DelayClient(ClockedExecutor executor, TimedValueGenerator<String> delayGen)
DelayClient(ClockedExecutor executor, TimedValueGenerator<String, Long> delayGen, TimedValueGenerator<String, String> errorGen)
{
_clockedExecutor = executor;
_delayGen = delayGen;
_errorGen = errorGen;
}
@Override
@@ -468,8 +473,19 @@ public void restRequest(RestRequest request,
@Override
public void run()
{
RestResponse restResponse = new RestResponseBuilder().setEntity(request.getURI().getRawPath().getBytes()).build();
callback.onResponse(TransportResponseImpl.success(restResponse));
RestResponseBuilder restResponseBuilder = new RestResponseBuilder().setEntity(request.getURI().getRawPath().getBytes());
if (_errorGen != null) {
String retError = _errorGen.getValue(request.getURI().getAuthority(), _clockedExecutor.currentTimeMillis(),
TimeUnit.MILLISECONDS);
if (retError != null)
{
restResponseBuilder.setStatus(500); // only 500 errors are counted
RestException restException = new RestException(restResponseBuilder.build(), new Throwable(retError));
callback.onResponse(TransportResponseImpl.error(restException));
return;
}
}
callback.onResponse(TransportResponseImpl.success(restResponseBuilder.build()));
}
}, delay, TimeUnit.MILLISECONDS);
}
Oops, something went wrong.

0 comments on commit b4266ff

Please sign in to comment.