Skip to content

Commit

Permalink
Count server error when calculating error rate
Browse files Browse the repository at this point in the history
RB=838063
G=si-core-reviewers
R=epham,xnzhang,dhoa,cxu,jwehrwei
A=dhoa
  • Loading branch information
angxu committed Oct 20, 2016
1 parent f442dd9 commit 23c883f
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 52 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
8.0.8
----
8.1.1
-----


8.1.0
-----
(RB=838063)
Count server error when calculating error rate in degrader

8.0.7
-----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,12 @@
]
},
"optional": true
},
{
"name": "errorStatusRegex",
"type": "string",
"doc": "Regular expression to match the status code indicates a server-side error.",
"optional": true
}
]
}
2 changes: 2 additions & 0 deletions d2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ configurations {
testArtifacts {
visible = true
}
// exclude slf4j-log4j12 which is pulled in from zookeeper
all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.linkedin.data.ByteString;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.r2.message.RequestContext;
import com.linkedin.r2.message.rest.RestException;
import com.linkedin.r2.message.rest.RestRequest;
import com.linkedin.r2.message.rest.RestResponse;
import com.linkedin.r2.message.stream.StreamException;
import com.linkedin.r2.message.stream.StreamRequest;
import com.linkedin.r2.message.stream.StreamResponse;
import com.linkedin.r2.message.stream.entitystream.EntityStream;
Expand All @@ -53,6 +55,9 @@
import java.util.HashMap;
import java.util.Map;

import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -62,33 +67,48 @@

public class TrackerClient implements LoadBalancerClient
{
public static final String DEFAULT_ERROR_STATUS_REGEX = "(5..)";
public static final Pattern DEFAULT_ERROR_STATUS_PATTERN = Pattern.compile(DEFAULT_ERROR_STATUS_REGEX);

private static final Logger _log = LoggerFactory.getLogger(TrackerClient.class);

private final TransportClient _wrappedClient;
// The keys for the maps are partitionIds
private final Map<Integer, PartitionState> _partitionStates;
private final CallTracker _callTracker;
private final URI _uri;
private final Pattern _errorStatusPattern;

public TrackerClient(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient)
{
this(uri, partitionDataMap, wrappedClient, SystemClock.instance(), null,
DegraderLoadBalancerStrategyConfig.DEFAULT_UPDATE_INTERVAL_MS);
DegraderLoadBalancerStrategyConfig.DEFAULT_UPDATE_INTERVAL_MS, DEFAULT_ERROR_STATUS_REGEX);
}

public TrackerClient(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, Config config)
{
this(uri, partitionDataMap, wrappedClient, clock, config,
DegraderLoadBalancerStrategyConfig.DEFAULT_UPDATE_INTERVAL_MS);
DegraderLoadBalancerStrategyConfig.DEFAULT_UPDATE_INTERVAL_MS, DEFAULT_ERROR_STATUS_REGEX);
}

public TrackerClient(URI uri, Map<Integer, PartitionData> partitionDataMap, TransportClient wrappedClient,
Clock clock, Config config, long interval)
Clock clock, Config config, long interval, String errorStatusRegex)
{
_uri = uri;
_wrappedClient = wrappedClient;
_callTracker = new CallTrackerImpl(interval, clock);
Pattern errorPattern;
try
{
errorPattern = Pattern.compile(errorStatusRegex != null ? errorStatusRegex : DEFAULT_ERROR_STATUS_REGEX);
}
catch (PatternSyntaxException ex)
{
_log.warn("Invalid error status regex: {}. Falling back to default regex: {}", errorStatusRegex, DEFAULT_ERROR_STATUS_REGEX);
errorPattern = DEFAULT_ERROR_STATUS_PATTERN;
}
_errorStatusPattern = errorPattern;

if (config == null)
{
Expand Down Expand Up @@ -225,7 +245,7 @@ public String toString()
+ ", _uri=" + _uri + ", _partitionStates=" + _partitionStates + ", _wrappedClient=" + _wrappedClient + "]";
}

private static class TrackerClientRestCallback implements TransportCallback<RestResponse>
private class TrackerClientRestCallback implements TransportCallback<RestResponse>
{
private TransportCallback<RestResponse> _wrappedCallback;
private CallCompletion _callCompletion;
Expand Down Expand Up @@ -254,7 +274,7 @@ public void onResponse(TransportResponse<RestResponse> response)
}
}

private static class TrackerClientStreamCallback implements TransportCallback<StreamResponse>
private class TrackerClientStreamCallback implements TransportCallback<StreamResponse>
{
private TransportCallback<StreamResponse> _wrappedCallback;
private CallCompletion _callCompletion;
Expand Down Expand Up @@ -320,9 +340,13 @@ public void onError(Throwable e)
}
}

private static void handleError(CallCompletion callCompletion, Throwable throwable)
private void handleError(CallCompletion callCompletion, Throwable throwable)
{
if (throwable instanceof RemoteInvocationException)
if (isServerError(throwable))
{
callCompletion.endCallWithError(ErrorType.SERVER_ERROR);
}
else if (throwable instanceof RemoteInvocationException)
{
Throwable originalThrowable = LoadBalancerUtil.findOriginalThrowable(throwable);
if (originalThrowable instanceof ConnectException)
Expand All @@ -333,6 +357,10 @@ else if (originalThrowable instanceof ClosedChannelException)
{
callCompletion.endCallWithError(ErrorType.CLOSED_CHANNEL_EXCEPTION);
}
else if (originalThrowable instanceof TimeoutException)
{
callCompletion.endCallWithError(ErrorType.TIMEOUT_EXCEPTION);
}
else
{
callCompletion.endCallWithError(ErrorType.REMOTE_INVOCATION_EXCEPTION);
Expand All @@ -344,6 +372,35 @@ else if (originalThrowable instanceof ClosedChannelException)
}
}

/**
* Returns true if the given throwable indicates a server-side error.
*/
private boolean isServerError(Throwable throwable)
{
if (throwable instanceof RestException)
{
RestException restException = (RestException) throwable;
if (restException.getResponse() != null)
{
return matchErrorStatus(restException.getResponse().getStatus());
}
}
else if (throwable instanceof StreamException)
{
StreamException streamException = (StreamException) throwable;
if (streamException.getResponse() != null)
{
return matchErrorStatus(streamException.getResponse().getStatus());
}
}
// default to false
return false;
}

private boolean matchErrorStatus(int status) {
return _errorStatusPattern.matcher(Integer.toString(status)).matches();
}

// we organize all data of a partition together so we don't have to maintain multiple maps in tracker client
private class PartitionState
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ public static Map<String, Object> toProperties(D2LoadBalancerStrategyProperties
map.put(PropertyKeys.HTTP_LB_QUARANTINE_LATENCY, quarantineInfo.getQuarantineLatency().longValue());
}
}
if (config.hasErrorStatusRegex())
{
map.put(PropertyKeys.HTTP_LB_ERROR_STATUS_REGEX, config.getErrorStatusRegex());
}
return map;
}

Expand Down Expand Up @@ -283,6 +287,10 @@ else if (DegraderRingFactory.MULTI_PROBE_CONSISTENT_HASH.equalsIgnoreCase(consis
}
config.setQuarantineCfg(quarantineInfo);
}
if (properties.containsKey(PropertyKeys.HTTP_LB_ERROR_STATUS_REGEX))
{
config.setErrorStatusRegex(coerce(properties.get(PropertyKeys.HTTP_LB_ERROR_STATUS_REGEX), String.class));
}
return config;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public class PropertyKeys
public static final String HTTP_LB_QUARANTINE_EXECUTOR_SERVICE = "http.loadBalancer.quarantine.executorService";
public static final String HTTP_LB_QUARANTINE_METHOD = "http.loadBalancer.quarantine.method";
public static final String HTTP_LB_QUARANTINE_LATENCY = "http.loadBalancer.quarantine.latency";
public static final String HTTP_LB_ERROR_STATUS_REGEX = "http.loadBalancer.errorStatusRegex";

//used by service metadata properties
public static final String SERVICE_FOLLOW_REDIRECTION_MAX_HOP = "followRedirection.maxHop";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,7 @@ protected void handlePut(final String listenTo, final UriProperties discoveryPro
}

long trackerClientInterval = getTrackerClientInterval (serviceProperties.getProperty());
String errorStatusPattern = getErrorStatusPattern(serviceProperties.getProperty());
for (URI uri : discoveryProperties.Uris())
{
Map<Integer, PartitionData> partitionDataMap = discoveryProperties.getPartitionDataMap(uri);
Expand All @@ -926,7 +927,8 @@ protected void handlePut(final String listenTo, final UriProperties discoveryPro
partitionDataMap,
config,
clk,
trackerClientInterval);
trackerClientInterval,
errorStatusPattern);

if (client != null)
{
Expand Down Expand Up @@ -1172,7 +1174,8 @@ protected void handleRemove(final String listenTo)
}

private TrackerClient getTrackerClient(String serviceName, URI uri, Map<Integer, PartitionData> partitionDataMap,
DegraderImpl.Config config, Clock clk, long callTrackerInterval)
DegraderImpl.Config config, Clock clk, long callTrackerInterval,
String errorStatusPattern)
{
Map<String,TransportClient> clientsByScheme = _serviceClients.get(serviceName);
if (clientsByScheme == null)
Expand All @@ -1191,7 +1194,7 @@ private TrackerClient getTrackerClient(String serviceName, URI uri, Map<Integer,
new Object[]{uri.getScheme(), serviceName, uri, partitionDataMap });
return null;
}
TrackerClient trackerClient = new TrackerClient(uri, partitionDataMap, client, clk, config, callTrackerInterval);
TrackerClient trackerClient = new TrackerClient(uri, partitionDataMap, client, clk, config, callTrackerInterval, errorStatusPattern);
return trackerClient;
}

Expand Down Expand Up @@ -1306,6 +1309,17 @@ private static long getTrackerClientInterval(ServiceProperties serviceProperties
return trackerClientInterval;
}

private static String getErrorStatusPattern(ServiceProperties serviceProperties)
{
String pattern = TrackerClient.DEFAULT_ERROR_STATUS_REGEX;
if (serviceProperties.getLoadBalancerStrategyProperties() != null)
{
pattern = MapUtil.getWithDefault(serviceProperties.getLoadBalancerStrategyProperties(),
PropertyKeys.HTTP_LB_ERROR_STATUS_REGEX, TrackerClient.DEFAULT_ERROR_STATUS_REGEX, String.class);
}
return pattern;
}

void refreshTransportClientsPerService(ServiceProperties serviceProperties)
{
String serviceName = serviceProperties.getServiceName();
Expand Down Expand Up @@ -1351,10 +1365,11 @@ void refreshTransportClientsPerService(ServiceProperties serviceProperties)
newTrackerClients = new ConcurrentHashMap<URI, TrackerClient>(
CollectionUtils.getMapInitialCapacity(uris.size(), 0.75f), 0.75f, 1);
long trackerClientInterval = getTrackerClientInterval (serviceProperties);
String errorStatusPattern = getErrorStatusPattern(serviceProperties);
for (URI uri : uris)
{
TrackerClient trackerClient = getTrackerClient(serviceName, uri, uriProperties.getPartitionDataMap(uri),
config, clk, trackerClientInterval);
config, clk, trackerClientInterval, errorStatusPattern);
if (trackerClient != null)
{
newTrackerClients.put(uri, trackerClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public TrackerClient getClient(String serviceName, URI uri)
else
{
// shorten the update interval to 20ms in order to increase the possibility of deadlock
_trackerClients.putIfAbsent(uri, new TrackerClient(uri, _partitionDescriptions.get(uri), null, new SettableClock(), null, 20));
_trackerClients.putIfAbsent(uri, new TrackerClient(uri, _partitionDescriptions.get(uri), null, new SettableClock(), null, 20, null));
}

return _trackerClients.get(uri);
Expand Down
Loading

0 comments on commit 23c883f

Please sign in to comment.