Permalink
Browse files

Support scatter-gather in rest.li client using ScatterGatherStrategy.

RB=1371503
G=si-dev
R=kbalasub,cxu,jodzga,ajing,xzhu,dhoa,mlamure
A=ajing,xzhu,mlamure
  • Loading branch information...
mchen07 committed May 28, 2018
1 parent b9368b7 commit 4834c93f96b331ff5c81848b32f992d035480804
Showing with 1,764 additions and 32 deletions.
  1. +2 −0 CHANGELOG
  2. +3 −3 d2-benchmark/src/jmh/java/com/linkedin/d2/util/hashing/URIMapperVSKeyMapperBenchmark.java
  3. +17 −0 d2/src/main/java/com/linkedin/d2/balancer/util/URIKeyPair.java
  4. +18 −1 d2/src/main/java/com/linkedin/d2/balancer/util/URIMappingResult.java
  5. +19 −19 d2/src/test/java/com/linkedin/d2/balancer/util/hashing/RingBasedURIMapperTest.java
  6. +17 −2 d2/src/test/java/com/linkedin/d2/balancer/util/hashing/URIMapperTestUtil.java
  7. +1 −0 restli-client/build.gradle
  8. +1 −1 restli-client/src/main/java/com/linkedin/restli/client/BatchGetRequest.java
  9. +10 −0 restli-client/src/main/java/com/linkedin/restli/client/BatchPartialUpdateRequest.java
  10. +9 −2 restli-client/src/main/java/com/linkedin/restli/client/BatchPartialUpdateRequestBuilder.java
  11. +5 −0 restli-client/src/main/java/com/linkedin/restli/client/Client.java
  12. +528 −0 restli-client/src/main/java/com/linkedin/restli/client/DefaultScatterGatherStrategy.java
  13. +47 −0 restli-client/src/main/java/com/linkedin/restli/client/RequestInfo.java
  14. +132 −4 restli-client/src/main/java/com/linkedin/restli/client/RestClient.java
  15. +35 −0 restli-client/src/main/java/com/linkedin/restli/client/RestLiScatterGatherException.java
  16. +112 −0 restli-client/src/main/java/com/linkedin/restli/client/ScatterGatherStrategy.java
  17. +13 −0 restli-client/src/main/java/com/linkedin/restli/client/util/RestLiClientConfig.java
  18. +9 −0 restli-client/src/test/java/com/linkedin/restli/client/TestClientBuilders.java
  19. +524 −0 restli-client/src/test/java/com/linkedin/restli/client/TestDefaultScatterGatherStrategy.java
  20. +1 −0 restli-common/src/main/java/com/linkedin/restli/common/RestConstants.java
  21. +261 −0 restli-int-test/src/test/java/com/linkedin/restli/client/TestRestLiScatterGather.java
View
@@ -1,5 +1,7 @@
23.0.9
------
(RB=1371503)
Support scatter-gather in rest.li client using ScatterGatherStrategy.
23.0.8
------
@@ -69,7 +69,7 @@ public Case1_StickyAndPartitioned_100Hosts_10Partition_1000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 10, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(10);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(10);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(10, 100);
@@ -118,7 +118,7 @@ public Case2_Sticky_100Hosts_1Partition_10000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 1, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(1);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(1);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(1, 10000);
@@ -167,7 +167,7 @@ public Case3_Partitioned_100Hosts_10Partition_10000Requests_State()
{
try {
HashRingProvider hashRingProvider = createStaticHashRingProvider(100, 10, getHashFunction(false));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(10);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(10);
_uriMapper = new RingBasedUriMapper(hashRingProvider, infoProvider);
_uriMapperRequests = testUtil.generateRequests(10, 1000);
@@ -19,6 +19,7 @@
import com.linkedin.d2.balancer.util.partitions.DefaultPartitionAccessor;
import com.linkedin.util.ArgumentUtil;
import java.net.URI;
import java.util.Objects;
/**
@@ -54,4 +55,20 @@ public URI getRequestUri()
{
return _requestUri;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
URIKeyPair<?> that = (URIKeyPair<?>) o;
return Objects.equals(_key, that._key) &&
Objects.equals(_requestUri, that._requestUri);
}
@Override
public int hashCode()
{
return Objects.hash(_key, _requestUri);
}
}
@@ -19,6 +19,7 @@
import java.net.URI;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -41,7 +42,7 @@ public URIMappingResult(Map<URI, Set<KEY>> mappingResults, Set<KEY> unmappedKeys
_unmappedKeys = Collections.unmodifiableSet(unmappedKeys);
}
public Map<URI, Set<KEY>> getMappedResults()
public Map<URI, Set<KEY>> getMappedKeys()
{
return _mappedKeys;
}
@@ -50,4 +51,20 @@ public URIMappingResult(Map<URI, Set<KEY>> mappingResults, Set<KEY> unmappedKeys
{
return _unmappedKeys;
}
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
URIMappingResult<?> that = (URIMappingResult<?>) o;
return Objects.equals(_mappedKeys, that._mappedKeys) &&
Objects.equals(_unmappedKeys, that._unmappedKeys);
}
@Override
public int hashCode()
{
return Objects.hash(_mappedKeys, _unmappedKeys);
}
}
@@ -51,25 +51,25 @@ public void testNeedScatterGather() throws ServiceUnavailableException
{
// Both sticky and partitioned
HashRingProvider ringProvider = createStaticHashRingProvider(100, 10, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(10);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(10);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
Assert.assertTrue(mapper.needScatterGather(TEST_SERVICE));
// Only sticky
ringProvider = createStaticHashRingProvider(100, 1, getHashFunction(true));
infoProvider = createPartitionInfoProvider(1);
infoProvider = createRangeBasedPartitionInfoProvider(1);
mapper = new RingBasedUriMapper(ringProvider, infoProvider);
Assert.assertTrue(mapper.needScatterGather(TEST_SERVICE));
// Only partitioned
ringProvider = createStaticHashRingProvider(100, 10, getHashFunction(false));
infoProvider = createPartitionInfoProvider(10);
infoProvider = createRangeBasedPartitionInfoProvider(10);
mapper = new RingBasedUriMapper(ringProvider, infoProvider);
Assert.assertTrue(mapper.needScatterGather(TEST_SERVICE));
// neither
ringProvider = createStaticHashRingProvider(100, 1, getHashFunction(false));
infoProvider = createPartitionInfoProvider(1);
infoProvider = createRangeBasedPartitionInfoProvider(1);
mapper = new RingBasedUriMapper(ringProvider, infoProvider);
Assert.assertFalse(mapper.needScatterGather(TEST_SERVICE));
}
@@ -83,13 +83,13 @@ public void testMapUrisPartitionedOnly() throws ServiceUnavailableException
HashRingProvider ringProvider =
createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(false));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
List<URIKeyPair<Integer>> requests = testUtil.generateRequests(partitionCount, requestPerPartition);
URIMappingResult<Integer> results = mapper.mapUris(requests);
Map<URI, Set<Integer>> mapping = results.getMappedResults();
Map<URI, Set<Integer>> mapping = results.getMappedKeys();
// No unmapped keys
Assert.assertTrue(results.getUnmappedKeys().isEmpty());
@@ -117,7 +117,7 @@ public void testMapUrisStickyRoutingOnly() throws ServiceUnavailableException, P
int totalHostCount = 100;
HashRingProvider ringProvider = createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
List<URIKeyPair<Integer>> requests = testUtil.generateRequests(partitionCount, requestPerPartition);
@@ -126,10 +126,10 @@ public void testMapUrisStickyRoutingOnly() throws ServiceUnavailableException, P
URIMappingResult<Integer> results2 = mapper.mapUris(requests);
// Sticky routing between two runs
Assert.assertEquals(results1.getMappedResults(), results2.getMappedResults());
Assert.assertEquals(results1.getMappedKeys(), results2.getMappedKeys());
Assert.assertEquals(results1.getUnmappedKeys(), results2.getUnmappedKeys());
Map<URI, Set<Integer>> mapping = results1.getMappedResults();
Map<URI, Set<Integer>> mapping = results1.getMappedKeys();
// Testing universal stickiness, take out 50 requests randomly and make sure they would be resolved to the same host as does URIMapper
Collections.shuffle(requests);
@@ -151,13 +151,13 @@ public void testStickyAndPartitioning() throws ServiceUnavailableException
int totalHostCount = 100;
HashRingProvider ringProvider = createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
List<URIKeyPair<Integer>> requests = testUtil.generateRequests(partitionCount, requestPerPartition);
URIMappingResult<Integer> results = mapper.mapUris(requests);
Map<URI, Set<Integer>> mapping = results.getMappedResults();
Map<URI, Set<Integer>> mapping = results.getMappedKeys();
Set<Integer> unmappedKeys = results.getUnmappedKeys();
Assert.assertTrue(unmappedKeys.isEmpty());
@@ -173,13 +173,13 @@ public void testNonStickyAndNonPartitioning() throws ServiceUnavailableException
HashRingProvider ringProvider =
createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(false));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
List<URIKeyPair<Integer>> requests = testUtil.generateRequests(partitionCount, requestPerPartition);
URIMappingResult<Integer> results = mapper.mapUris(requests);
Map<URI, Set<Integer>> mapping = results.getMappedResults();
Map<URI, Set<Integer>> mapping = results.getMappedKeys();
Set<Integer> unmappedKeys = results.getUnmappedKeys();
Assert.assertTrue(unmappedKeys.isEmpty());
@@ -206,13 +206,13 @@ public void testSameHostSupportingMultiplePartitions() throws ServiceUnavailable
StaticRingProvider ringProvider = new StaticRingProvider(rings);
ringProvider.setHashFunction(new RandomHash());
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
List<URIKeyPair<Integer>> requests = testUtil.generateRequests(partitionCount, requestPerPartition);
URIMappingResult<Integer> results = mapper.mapUris(requests);
Map<URI, Set<Integer>> mapping = results.getMappedResults();
Map<URI, Set<Integer>> mapping = results.getMappedKeys();
Set<Integer> unmappedKeys = results.getUnmappedKeys();
Assert.assertTrue(unmappedKeys.isEmpty());
@@ -227,7 +227,7 @@ public void testErrorHandling() throws ServiceUnavailableException, URISyntaxExc
int totalHostCount = 100;
HashRingProvider ringProvider = createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(true));
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
URIKeyPair<Integer> requestWithoutPartitionId = new URIKeyPair<>(42, new URI("d2://badService/2"));
@@ -238,7 +238,7 @@ public void testErrorHandling() throws ServiceUnavailableException, URISyntaxExc
Arrays.asList(requestWithoutKey, requestWithoutPartitionId, requestWithoutBoth);
URIMappingResult<Integer> result = mapper.mapUris(requests);
Assert.assertTrue(result.getMappedResults().isEmpty());
Assert.assertTrue(result.getMappedKeys().isEmpty());
Assert.assertTrue(result.getUnmappedKeys().contains(42));
Assert.assertTrue(result.getUnmappedKeys().contains(43));
Assert.assertTrue(result.getUnmappedKeys().contains(44));
@@ -252,7 +252,7 @@ public void testUniversalStickiness() throws ServiceUnavailableException, URISyn
HashRingProvider ringProvider = createStaticHashRingProvider(totalHostCount, partitionCount, getHashFunction(true));
HashFunction<Request> hashFunction = ringProvider.getRequestHashFunction(TEST_SERVICE);
PartitionInfoProvider infoProvider = createPartitionInfoProvider(partitionCount);
PartitionInfoProvider infoProvider = createRangeBasedPartitionInfoProvider(partitionCount);
URIMapper mapper = new RingBasedUriMapper(ringProvider, infoProvider);
URIKeyPair<Integer> request1 = new URIKeyPair<>(1, new URI("d2://testService/1")); // no partition, will be unmapped
@@ -289,7 +289,7 @@ public void testUniversalStickiness() throws ServiceUnavailableException, URISyn
// they should have the same results
Assert.assertEquals(uriMapperResult.getUnmappedKeys(), normalUnmapped);
for (Map.Entry<URI, Set<Integer>> resolvedKeys : uriMapperResult.getMappedResults().entrySet()) {
for (Map.Entry<URI, Set<Integer>> resolvedKeys : uriMapperResult.getMappedKeys().entrySet()) {
Set<Integer> uriMapperKeySet = resolvedKeys.getValue();
Assert.assertTrue(normalHostToKeySet.containsKey(resolvedKeys.getKey()));
Set<Integer> normalKeySet = normalHostToKeySet.get(resolvedKeys.getKey());
@@ -18,8 +18,10 @@
import com.google.common.collect.Lists;
import com.linkedin.d2.balancer.ServiceUnavailableException;
import com.linkedin.d2.balancer.properties.HashBasedPartitionProperties;
import com.linkedin.d2.balancer.properties.RangeBasedPartitionProperties;
import com.linkedin.d2.balancer.util.URIKeyPair;
import com.linkedin.d2.balancer.util.partitions.HashBasedPartitionAccessor;
import com.linkedin.d2.balancer.util.partitions.PartitionInfoProvider;
import com.linkedin.d2.balancer.util.partitions.RangeBasedPartitionAccessor;
import com.linkedin.r2.message.Request;
@@ -86,16 +88,29 @@ public static HashRingProvider createStaticHashRingProvider(int totalHostCount,
/**
* Create a mock PartitionInfoProvider that returns {@link RangeBasedPartitionAccessor} for testing
*/
public static PartitionInfoProvider createPartitionInfoProvider(int partitionCount) throws ServiceUnavailableException
public static PartitionInfoProvider createRangeBasedPartitionInfoProvider(int partitionCount) throws ServiceUnavailableException
{
PartitionInfoProvider infoProvider = Mockito.mock(PartitionInfoProvider.class);
RangeBasedPartitionProperties properties =
new RangeBasedPartitionProperties(PARTITION_KEY_REGEX, 0, 1, partitionCount);
new RangeBasedPartitionProperties(PARTITION_KEY_REGEX, 0, 1, partitionCount);
RangeBasedPartitionAccessor accessor = new RangeBasedPartitionAccessor(properties);
Mockito.when(infoProvider.getPartitionAccessor(anyObject())).thenReturn(accessor);
return infoProvider;
}
/**
* Create a mock PartitionInfoProvider that returns {@link HashBasedPartitionAccessor} for testing
*/
public static PartitionInfoProvider createHashBasedPartitionInfoProvider(int partitionCount, String regex) throws ServiceUnavailableException
{
PartitionInfoProvider infoProvider = Mockito.mock(PartitionInfoProvider.class);
HashBasedPartitionProperties properties =
new HashBasedPartitionProperties(regex, partitionCount, HashBasedPartitionProperties.HashAlgorithm.MODULO);
HashBasedPartitionAccessor accessor = new HashBasedPartitionAccessor(properties);
Mockito.when(infoProvider.getPartitionAccessor(anyObject())).thenReturn(accessor);
return infoProvider;
}
/**
* Generate a list of requests for {@link com.linkedin.d2.balancer.URIMapper}, each with a unique key
*/
@@ -2,6 +2,7 @@ dependencies {
compile project(':data')
compile project(':data-transform')
compile project(':pegasus-common')
compile project(':d2')
compile project(':r2-core')
compile project(':restli-common')
compile project(':restli-disruptor')
@@ -70,4 +70,4 @@
{
return super.getFields();
}
}
}
@@ -33,6 +33,7 @@
import com.linkedin.restli.internal.client.BatchUpdateResponseDecoder;
import java.net.HttpCookie;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -45,6 +46,8 @@
public class BatchPartialUpdateRequest<K, V extends RecordTemplate> extends
com.linkedin.restli.client.BatchRequest<BatchKVResponse<K, UpdateStatus>>
{
private final Map<K, PatchRequest<V>> _partialUpdateInputMap;
@SuppressWarnings("unchecked")
BatchPartialUpdateRequest(Map<String, String> headers,
List<HttpCookie> cookies,
@@ -55,6 +58,7 @@
String baseUriTemplate,
Map<String, Object> pathKeys,
RestliRequestOptions requestOptions,
Map<K, PatchRequest<V>> patchInputMap,
List<Object> streamingAttachments)
{
super(ResourceMethod.BATCH_PARTIAL_UPDATE,
@@ -71,5 +75,11 @@
pathKeys,
requestOptions,
streamingAttachments);
_partialUpdateInputMap = Collections.unmodifiableMap(patchInputMap);
}
public Map<K, PatchRequest<V>> getPartialUpdateInputMap()
{
return _partialUpdateInputMap;
}
}
@@ -23,6 +23,7 @@
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.internal.common.util.CollectionUtils;
import com.linkedin.restli.common.CollectionRequest;
import com.linkedin.restli.common.KeyValueRecord;
import com.linkedin.restli.common.KeyValueRecordFactory;
@@ -166,19 +167,24 @@ public BatchPartialUpdateRequestBuilder(String baseUriTemplate,
{
ensureBatchKeys();
Map<K, PatchRequest<V>> readOnlyPartialUpdateInputMap = new HashMap<>(
CollectionUtils.getMapInitialCapacity(_partialUpdateInputMap.size(), 0.75f), 0.75f);
CollectionRequest<KeyValueRecord<K, PatchRequest<V>>> readOnlyInput = buildReadOnlyInput(readOnlyPartialUpdateInputMap);
return new BatchPartialUpdateRequest<K, V>(buildReadOnlyHeaders(),
buildReadOnlyCookies(),
buildReadOnlyInput(),
readOnlyInput,
buildReadOnlyQueryParameters(),
getQueryParamClasses(),
_resourceSpec,
getBaseUriTemplate(),
buildReadOnlyPathKeys(),
getRequestOptions(),
readOnlyPartialUpdateInputMap,
_streamingAttachments == null ? null : Collections.unmodifiableList(_streamingAttachments));
}
private CollectionRequest<KeyValueRecord<K, PatchRequest<V>>> buildReadOnlyInput()
private CollectionRequest<KeyValueRecord<K, PatchRequest<V>>> buildReadOnlyInput(Map<K, PatchRequest<V>> readOnlyInputEntities)
{
try
{
@@ -190,6 +196,7 @@ public BatchPartialUpdateRequestBuilder(String baseUriTemplate,
{
K key = getReadOnlyOrCopyKey(inputEntityEntry.getKey());
PatchRequest<V> entity = getReadOnlyOrCopyDataTemplate(inputEntityEntry.getValue());
readOnlyInputEntities.put(key, entity);
KeyValueRecord<K, PatchRequest<V>> keyValueRecord = _keyValueRecordFactory.create(key, entity);
keyValueRecord.data().setReadOnly();
input.getElements().add(keyValueRecord);
Oops, something went wrong.

0 comments on commit 4834c93

Please sign in to comment.