Permalink
Browse files

Add URIMapper class

RB=1332535
G=si-core-reviewers
R=xzhu,cxu,mnchen
A=xzhu
  • Loading branch information...
alexjinghn committed Jun 22, 2018
1 parent 4383e0b commit e91b2ac5ef25e25333fe0ff53721be6ac94019fa
Showing with 1,263 additions and 23 deletions.
  1. +6 −1 CHANGELOG
  2. +1 −0 ERROR-CODES.md
  3. +2 −0 d2-benchmark/build.gradle
  4. +201 −0 d2-benchmark/src/jmh/java/com/linkedin/d2/util/hashing/URIMapperVSKeyMapperBenchmark.java
  5. +7 −0 d2/src/main/java/com/linkedin/d2/balancer/Facilities.java
  6. +7 −0 d2/src/main/java/com/linkedin/d2/balancer/LoadBalancerWithFacilitiesDelegator.java
  7. +55 −0 d2/src/main/java/com/linkedin/d2/balancer/URIMapper.java
  8. +22 −3 d2/src/main/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancer.java
  9. +7 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/LoadBalancerStrategy.java
  10. +8 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerStrategyV3.java
  11. +8 −0 d2/src/main/java/com/linkedin/d2/balancer/strategies/random/RandomLoadBalancerStrategy.java
  12. +38 −6 d2/src/main/java/com/linkedin/d2/balancer/util/DelegatingFacilities.java
  13. +10 −2 d2/src/main/java/com/linkedin/d2/balancer/util/TogglingLoadBalancer.java
  14. +57 −0 d2/src/main/java/com/linkedin/d2/balancer/util/URIKeyPair.java
  15. +53 −0 d2/src/main/java/com/linkedin/d2/balancer/util/URIMappingResult.java
  16. 0 d2/src/{test → main}/java/com/linkedin/d2/balancer/util/URIRequest.java
  17. +19 −2 d2/src/main/java/com/linkedin/d2/balancer/util/hashing/HashRingProvider.java
  18. +230 −0 d2/src/main/java/com/linkedin/d2/balancer/util/hashing/RingBasedUriMapper.java
  19. +18 −0 d2/src/main/java/com/linkedin/d2/balancer/util/hashing/StaticRingProvider.java
  20. +5 −4 d2/src/main/java/com/linkedin/d2/balancer/util/partitions/PartitionInfoProvider.java
  21. +6 −0 d2/src/main/java/com/linkedin/d2/balancer/zkfs/LastSeenLoadBalancerWithFacilities.java
  22. +14 −2 d2/src/main/java/com/linkedin/d2/balancer/zkfs/ZKFSLoadBalancer.java
  23. +7 −0 d2/src/test/java/com/linkedin/d2/balancer/simple/SimpleLoadBalancerTest.java
  24. +7 −0 d2/src/test/java/com/linkedin/d2/balancer/strategies/degrader/DegraderLoadBalancerTest.java
  25. +7 −0 d2/src/test/java/com/linkedin/d2/balancer/util/WarmUpLoadBalancerTest.java
  26. +7 −1 d2/src/test/java/com/linkedin/d2/balancer/util/hashing/ConsistentHashKeyMapperTest.java
  27. +299 −0 d2/src/test/java/com/linkedin/d2/balancer/util/hashing/RingBasedURIMapperTest.java
  28. +160 −0 d2/src/test/java/com/linkedin/d2/balancer/util/hashing/URIMapperTestUtil.java
  29. +1 −1 gradle.properties
  30. +1 −1 restli-int-test/src/test/java/com/linkedin/restli/client/TestScatterGather.java
View
@@ -1,6 +1,11 @@
22.0.6
23.0.1
------
23.0.0
------
(RB=1332535)
Add URIMapper class and changed getPartitionAccessor API (backwards incompatible)
22.0.5
------
(RB=1318132)
View
@@ -10,6 +10,7 @@ No error code can be repeated, and no error code can be referenced more than onc
## D2 10
PEGA_1000 to PEGA_1016 ServiceUnavailableException
PEGA_1017 ServiceUnavailableException
PEGA_1030 ServiceUnavailableException
## R2 11
@@ -4,11 +4,13 @@ plugins {
jmh {
include = '.*ConsistentHashRingBenchmark.*'
include = '.*URIMapperVSKeyMapperBenchmark.*'
zip64 = true
}
dependencies {
testCompile project(':d2').sourceSets.test.output
jmh project(':d2')
jmh externalDependency.jmhCore
jmh externalDependency.jmhAnnotations
@@ -0,0 +1,201 @@
/*
Copyright (c) 2018 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.d2.util.hashing;
import com.linkedin.d2.balancer.KeyMapper;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.URIMapper;
import com.linkedin.d2.balancer.util.MapKeyResult;
import com.linkedin.d2.balancer.util.URIKeyPair;
import com.linkedin.d2.balancer.util.URIMappingResult;
import com.linkedin.d2.balancer.util.hashing.ConsistentHashKeyMapper;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.hashing.RingBasedUriMapper;
import com.linkedin.d2.balancer.util.hashing.URIMapperTestUtil;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import java.net.URI;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import static com.linkedin.d2.balancer.util.hashing.URIMapperTestUtil.*;
@Fork(2)
@Warmup(iterations = 5)
@Measurement(iterations = 5)
public class URIMapperVSKeyMapperBenchmark
{
private static final URIMapperTestUtil testUtil = new URIMapperTestUtil();
@State(Scope.Benchmark)
public static class Case1_StickyAndPartitioned_100Hosts_10Partition_1000Requests_State
{
URIMapper _uriMapper;
KeyMapper _keyMapper;
// uriMapper argument
List<URIKeyPair<Integer>> _uriMapperRequests;
// keyMapper argument
URI _serviceURI;
Iterable<Integer> _keys;
public Case1_StickyAndPartitioned_100Hosts_10Partition_1000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 10, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(10);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(10, 100);
_keyMapper = new ConsistentHashKeyMapper(hashRingProvider, infoProvider);
_serviceURI = new URI("d2://testService");
_keys = IntStream.range(0, 1000).boxed().collect(Collectors.toList());
} catch (Exception e) {
// ignore exceptions
}
}
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public URIMappingResult<Integer> Case1MeasureURIMapper(
Case1_StickyAndPartitioned_100Hosts_10Partition_1000Requests_State state) throws ServiceUnavailableException
{
return state._uriMapper.mapUris(state._uriMapperRequests);
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public MapKeyResult<URI, Integer> Case1MeasureKeymapper(
Case1_StickyAndPartitioned_100Hosts_10Partition_1000Requests_State state) throws ServiceUnavailableException
{
return state._keyMapper.mapKeysV2(state._serviceURI, state._keys);
}
@State(Scope.Benchmark)
public static class Case2_Sticky_100Hosts_1Partition_10000Requests_State
{
URIMapper _uriMapper;
KeyMapper _keyMapper;
// uriMapper argument
List<URIKeyPair<Integer>> _uriMapperRequests;
// keyMapper argument
URI _serviceURI;
Iterable<Integer> _keys;
public Case2_Sticky_100Hosts_1Partition_10000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 1, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(1);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(1, 10000);
_keyMapper = new ConsistentHashKeyMapper(hashRingProvider, infoProvider);
_serviceURI = new URI("d2://testService");
_keys = IntStream.range(0, 10000).boxed().collect(Collectors.toList());
} catch (Exception e) {
// ignore exceptions
}
}
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public URIMappingResult<Integer> Case2MeasureURIMapper(Case2_Sticky_100Hosts_1Partition_10000Requests_State state)
throws ServiceUnavailableException
{
return state._uriMapper.mapUris(state._uriMapperRequests);
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public MapKeyResult<URI, Integer> Case2MeasureKeymapper(Case2_Sticky_100Hosts_1Partition_10000Requests_State state)
throws ServiceUnavailableException
{
return state._keyMapper.mapKeysV2(state._serviceURI, state._keys);
}
@State(Scope.Benchmark)
public static class Case3_Partitioned_100Hosts_10Partition_10000Requests_State
{
URIMapper _uriMapper;
KeyMapper _keyMapper;
// uriMapper argument
List<URIKeyPair<Integer>> _uriMapperRequests;
// keyMapper argument
URI _serviceURI;
Iterable<Integer> _keys;
public Case3_Partitioned_100Hosts_10Partition_10000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 10, getHashFunction(false));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(10);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(10, 1000);
_keyMapper = new ConsistentHashKeyMapper(hashRingProvider, infoProvider);
_serviceURI = new URI("d2://testService");
_keys = IntStream.range(0, 10000).boxed().collect(Collectors.toList());
} catch (Exception e) {
// ignore exceptions
}
}
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public URIMappingResult<Integer> Case3MeasureURIMapper(
Case3_Partitioned_100Hosts_10Partition_10000Requests_State state) throws ServiceUnavailableException
{
return state._uriMapper.mapUris(state._uriMapperRequests);
}
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public MapKeyResult<URI, Integer> Case3MeasureKeymapper(
Case3_Partitioned_100Hosts_10Partition_10000Requests_State state) throws ServiceUnavailableException
{
return state._keyMapper.mapKeysV2(state._serviceURI, state._keys);
}
}
@@ -20,6 +20,7 @@
package com.linkedin.d2.balancer;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.r2.transport.common.TransportClientFactory;
@@ -43,6 +44,12 @@
*/
PartitionInfoProvider getPartitionInfoProvider();
/**
* Obtain hashRing provider
* @return HashRingProvider
*/
HashRingProvider getHashRingProvider();
/**
* Obtain d2 key mapping facility
* @return KeyMapper
@@ -3,6 +3,7 @@
import com.linkedin.common.callback.Callback;
import com.linkedin.common.util.None;
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.d2.discovery.event.PropertyEventThread;
import com.linkedin.r2.message.Request;
@@ -36,6 +37,12 @@ public PartitionInfoProvider getPartitionInfoProvider()
return _loadBalancer.getPartitionInfoProvider();
}
@Override
public HashRingProvider getHashRingProvider()
{
return _loadBalancer.getHashRingProvider();
}
@Override
public KeyMapper getKeyMapper()
{
@@ -0,0 +1,55 @@
/*
Copyright (c) 2018 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.d2.balancer;
import com.linkedin.d2.balancer.util.URIKeyPair;
import com.linkedin.d2.balancer.util.URIMappingResult;
import java.util.List;
public interface URIMapper
{
/**
* This methods tries to map d2 requests to hosts based on the underlying strategy. e.g. consistent hashing
*
* The requests should be destined for the same service.
*
* Each request in the input list will appear in exactly one routing group in the output. Requests in the same routing group will be routed to the same host.
* The requests in output routing groups are mutually exclusive and collectively exhaustive.
*
* NOTE: in context of sticky routing, the routing decision will be made based on request uri instead of keys. This achieves universal stickiness.
*
* @param <KEY> type of resource key
* @param requestUriKeyPairs a list of URIKeyPair, each contains a d2 request uri and a resource key. The resource keys should be unique.
* @return {@link URIMappingResult} that contains a mapping of host to a set of keys whose corresponding requests will be sent to that host
* and a set of unmapped keys.
* @throws ServiceUnavailableException if the requested service cannot be found
*/
<KEY> URIMappingResult<KEY> mapUris(List<URIKeyPair<KEY>> requestUriKeyPairs) throws ServiceUnavailableException;
/**
* Returns true if sticky routing is enabled (inclusive) OR the cluster of the service has more than one partitions.
*
* If sticky routing is enabled, scatter-gather is needed since different keys can be routed to different hosts.
* If cluster has more than one partitions, scatter-gather is needed since different keys can be routed to different partitions.
*
* @param serviceName
* @return true if sticky routing OR partitioning is enabled.
* @throws ServiceUnavailableException if the requested service cannot be found
*/
boolean needScatterGather(String serviceName) throws ServiceUnavailableException;
}
@@ -36,12 +36,15 @@
import com.linkedin.d2.balancer.properties.ServiceProperties;
import com.linkedin.d2.balancer.properties.UriProperties;
import com.linkedin.d2.balancer.strategies.LoadBalancerStrategy;
import com.linkedin.d2.balancer.strategies.degrader.DegraderLoadBalancerStrategyV3;
import com.linkedin.d2.balancer.util.ClientFactoryProvider;
import com.linkedin.d2.balancer.util.HostToKeyMapper;
import com.linkedin.d2.balancer.util.KeysAndHosts;
import com.linkedin.d2.balancer.util.LoadBalancerUtil;
import com.linkedin.d2.balancer.util.MapKeyResult;
import com.linkedin.d2.balancer.util.hashing.HashFunction;
import com.linkedin.d2.balancer.util.hashing.HashRingProvider;
import com.linkedin.d2.balancer.util.hashing.RandomHash;
import com.linkedin.d2.balancer.util.hashing.Ring;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessException;
import com.linkedin.d2.balancer.util.partitions.PartitionAccessor;
@@ -334,6 +337,23 @@ public TransportClientFactory getClientFactory(String scheme)
}
}
@Override
public HashFunction<Request> getRequestHashFunction(String serviceName) throws ServiceUnavailableException
{
ServiceProperties service = listenToServiceAndCluster(serviceName);
List<LoadBalancerState.SchemeStrategyPair> orderedStrategies =
_state.getStrategiesForService(serviceName, service.getPrioritizedSchemes());
if (!orderedStrategies.isEmpty())
{
return orderedStrategies.get(0).getStrategy().getHashFunction();
}
else
{
throw new ServiceUnavailableException(serviceName, "PEGA_1017. Unable to find a load balancer strategy" +
"Server Schemes: [" + String.join(", ", service.getPrioritizedSchemes()) + ']');
}
}
private void listenToServiceAndCluster(String serviceName, Callback<ServiceProperties> callback)
{
@@ -586,11 +606,10 @@ private ClusterProperties getClusterProperties(String serviceName,
}
@Override
public PartitionAccessor getPartitionAccessor(URI serviceUri)
public PartitionAccessor getPartitionAccessor(String serviceName)
throws ServiceUnavailableException
{
ServiceProperties service = listenToServiceAndCluster(serviceUri);
String serviceName = service.getServiceName();
ServiceProperties service = listenToServiceAndCluster(serviceName);
String clusterName = service.getClusterName();
return getPartitionAccessor(serviceName, clusterName);
}
Oops, something went wrong.

0 comments on commit e91b2ac

Please sign in to comment.