From 53345cb1cd1eb54e33af65046825f624ba3cadd1 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Mon, 9 Dec 2019 17:11:58 -0500 Subject: [PATCH] SOLR-12217: Support shards.preference in SolrJ for individual shard requests (#984) --- solr/CHANGES.txt | 2 + .../apache/solr/handler/StreamHandler.java | 22 +++++ .../src/distributed-requests.adoc | 2 +- .../src/streaming-expressions.adoc | 9 ++ solr/solr-ref-guide/src/using-solrj.adoc | 8 ++ .../solrj/impl/BaseCloudSolrClient.java | 82 +++++++++++-------- .../solrj/io/stream/CloudSolrStream.java | 3 +- .../solrj/io/stream/DeepRandomStream.java | 6 +- .../client/solrj/io/stream/StreamContext.java | 20 +++++ .../client/solrj/io/stream/TupleStream.java | 50 +++++++---- .../NodePreferenceRulesComparator.java | 2 +- ...equestReplicaListTransformerGenerator.java | 40 +++++++-- .../org/apache/solr/common/cloud/Replica.java | 2 +- .../solrj/impl/CloudHttp2SolrClientTest.java | 69 ++++++++++++++++ .../solrj/impl/CloudSolrClientTest.java | 71 +++++++++++++++- .../client/solrj/io/stream/StreamingTest.java | 58 ++++++++++++- .../apache/solr/cloud/SolrCloudTestCase.java | 17 ++++ 17 files changed, 396 insertions(+), 67 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 3ae0a792154e..7ef1cc3995a0 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -62,6 +62,8 @@ New Features * SOLR-13912: Add 'countvals' aggregation in JSON FacetModule (hossman, Munendra S N) +* SOLR-12217: Support shards.preference in SolrJ for single shard collections. The parameter is now used by the CloudSolrClient and Streaming Expressions. (Houston Putman, Tomas Fernandez-Lobbe) + Improvements --------------------- diff --git a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java index ccbbb3ae72ea..150219056237 100644 --- a/solr/core/src/java/org/apache/solr/handler/StreamHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/StreamHandler.java @@ -42,7 +42,10 @@ import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter; import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; +import org.apache.solr.cloud.ZkController; import org.apache.solr.common.SolrException; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.CommonParams; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.SolrParams; @@ -162,9 +165,28 @@ public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throw return; } + + final SolrCore core = req.getCore(); // explicit check for null core (temporary?, for tests) + ZkController zkController = core == null ? null : core.getCoreContainer().getZkController(); + RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator; + if (zkController != null) { + requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator( + zkController.getZkStateReader().getClusterProperties() + .getOrDefault(ZkStateReader.DEFAULT_SHARD_PREFERENCES, "") + .toString(), + zkController.getNodeName(), + zkController.getBaseUrl(), + zkController.getSysPropsCacher() + ); + } else { + requestReplicaListTransformerGenerator = new RequestReplicaListTransformerGenerator(); + } + int worker = params.getInt("workerID", 0); int numWorkers = params.getInt("numWorkers", 1); StreamContext context = new StreamContext(); + context.setRequestParams(params); + context.setRequestReplicaListTransformerGenerator(requestReplicaListTransformerGenerator); context.put("shards", getCollectionShards(params)); context.workerID = worker; context.numWorkers = numWorkers; diff --git a/solr/solr-ref-guide/src/distributed-requests.adoc b/solr/solr-ref-guide/src/distributed-requests.adoc index dde0fea64964..4d06728c613c 100644 --- a/solr/solr-ref-guide/src/distributed-requests.adoc +++ b/solr/solr-ref-guide/src/distributed-requests.adoc @@ -160,7 +160,7 @@ Solr allows you to pass an optional string parameter named `shards.preference` t The syntax is: `shards.preference=_property_:__value__`. The order of the properties and the values are significant: the first one is the primary sort, the second is secondary, etc. -IMPORTANT: `shards.preference` only works for distributed queries, i.e., queries targeting multiple shards. Single shard scenarios are not supported. +IMPORTANT: `shards.preference` is supported for single shard scenarios when using the SolrJ clients. The properties that can be specified are as follows: diff --git a/solr/solr-ref-guide/src/streaming-expressions.adoc b/solr/solr-ref-guide/src/streaming-expressions.adoc index d666d7606ccd..57ca17cbda86 100644 --- a/solr/solr-ref-guide/src/streaming-expressions.adoc +++ b/solr/solr-ref-guide/src/streaming-expressions.adoc @@ -114,6 +114,15 @@ unless the jvm has been started with `-DStreamingExpressionMacros=true` (usually Because streaming expressions relies on the `/export` handler, many of the field and field type requirements to use `/export` are also requirements for `/stream`, particularly for `sort` and `fl` parameters. Please see the section <> for details. +=== Request Routing + +Streaming Expressions respect the <> for any call to Solr. + +The value of `shards.preference` that is used to route requests is determined in the following order. The first option available is used. +- Provided as a parameter in the streaming expression (e.g. `search(...., shards.preference="replica.type:PULL")`) +- Provided in the URL Params of the streaming expression (e.g. `http://solr_url:8983/solr/stream?expr=....&shards.preference=replica.type:PULL`) +- Set as a default in the Cluster properties. + === Adding Custom Expressions Creating your own custom expressions can be easily done by implementing the {solr-javadocs}/solr-solrj/org/apache/solr/client/solrj/io/stream/expr/Expressible.html[Expressible] interface. To add a custom expression to the diff --git a/solr/solr-ref-guide/src/using-solrj.adoc b/solr/solr-ref-guide/src/using-solrj.adoc index e4e41f50be51..f60664d34cca 100644 --- a/solr/solr-ref-guide/src/using-solrj.adoc +++ b/solr/solr-ref-guide/src/using-solrj.adoc @@ -120,6 +120,14 @@ include::{example-source-dir}UsingSolrJRefGuideExamplesTest.java[tag=solrj-solrc When these values are not explicitly provided, SolrJ falls back to using the defaults for the OS/environment is running on. +=== Cloud Request Routing + +The SolrJ `CloudSolrClient` implementations (`CloudSolrClient` and `CloudHttp2SolrClient`) respect the <>. +Therefore requests sent to single-sharded collections, using either of the above clients, will route requests the same way that distributed requests are routed to individual shards. +If no `shards.preference` parameter is provided, the clients will default to sorting replicas randomly. + +For update requests, while the replicas are sorted in the order defined by the request, leader replicas will always be sorted first. + == Querying in SolrJ `SolrClient` has a number of `query()` methods for fetching results from Solr. Each of these methods takes in a `SolrParams`,an object encapsulating arbitrary query-parameters. And each method outputs a `QueryResponse`, a wrapper which can be used to access the result documents and other related metadata. diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java index 0461e67f4fae..52038ad8fd80 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/impl/BaseCloudSolrClient.java @@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.solr.client.solrj.ResponseParser; import org.apache.solr.client.solrj.SolrClient; @@ -55,6 +56,8 @@ import org.apache.solr.client.solrj.request.RequestWriter; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.request.V2Request; +import org.apache.solr.client.solrj.routing.ReplicaListTransformer; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; import org.apache.solr.client.solrj.util.ClientUtils; import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; @@ -69,7 +72,6 @@ import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; import org.apache.solr.common.cloud.ZkCoreNodeProps; -import org.apache.solr.common.cloud.ZkNodeProps; import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; import org.apache.solr.common.params.ShardParams; @@ -100,6 +102,7 @@ public abstract class BaseCloudSolrClient extends SolrClient { private final boolean updatesToLeaders; private final boolean directUpdatesToLeadersOnly; + private final RequestReplicaListTransformerGenerator requestRLTGenerator; boolean parallelUpdates; //TODO final private ExecutorService threadPool = ExecutorUtil .newMDCAwareCachedThreadPool(new SolrjNamedThreadFactory( @@ -221,6 +224,7 @@ protected BaseCloudSolrClient(boolean updatesToLeaders, boolean parallelUpdates, this.updatesToLeaders = updatesToLeaders; this.parallelUpdates = parallelUpdates; this.directUpdatesToLeadersOnly = directUpdatesToLeadersOnly; + this.requestRLTGenerator = new RequestReplicaListTransformerGenerator(); } /** Sets the cache ttl for DocCollection Objects cached . This is only applicable for collections which are persisted outside of clusterstate.json @@ -467,6 +471,8 @@ private NamedList directUpdate(AbstractUpdateRequest request, String col for(String param : NON_ROUTABLE_PARAMS) { routableParams.remove(param); } + } else { + params = new ModifiableSolrParams(); } if (collection == null) { @@ -492,10 +498,12 @@ private NamedList directUpdate(AbstractUpdateRequest request, String col return null; } + ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(params); + //Create the URL map, which is keyed on slice name. //The value is a list of URLs for each replica in the slice. //The first value in the list is the leader for the slice. - final Map> urlMap = buildUrlMap(col); + final Map> urlMap = buildUrlMap(col, replicaListTransformer); final Map routes = createRoutes(updateRequest, routableParams, col, router, urlMap, idField); if (routes == null) { if (directUpdatesToLeadersOnly && hasInfoToFindLeaders(updateRequest, idField)) { @@ -616,12 +624,12 @@ protected RouteException getRouteException(SolrException.ErrorCode serverError, return urlMap == null ? null : updateRequest.getRoutesToCollection(router, col, urlMap, routableParams, idField); } - private Map> buildUrlMap(DocCollection col) { + private Map> buildUrlMap(DocCollection col, ReplicaListTransformer replicaListTransformer) { Map> urlMap = new HashMap<>(); Slice[] slices = col.getActiveSlicesArr(); for (Slice slice : slices) { String name = slice.getName(); - List urls = new ArrayList<>(); + List sortedReplicas = new ArrayList<>(); Replica leader = slice.getLeader(); if (directUpdatesToLeadersOnly && leader == null) { for (Replica replica : slice.getReplicas( @@ -638,20 +646,22 @@ private Map> buildUrlMap(DocCollection col) { // take unoptimized general path - we cannot find a leader yet return null; } - ZkCoreNodeProps zkProps = new ZkCoreNodeProps(leader); - String url = zkProps.getCoreUrl(); - urls.add(url); + if (!directUpdatesToLeadersOnly) { for (Replica replica : slice.getReplicas()) { - if (!replica.getNodeName().equals(leader.getNodeName()) && - !replica.getName().equals(leader.getName())) { - ZkCoreNodeProps zkProps1 = new ZkCoreNodeProps(replica); - String url1 = zkProps1.getCoreUrl(); - urls.add(url1); + if (!replica.equals(leader)) { + sortedReplicas.add(replica); } } } - urlMap.put(name, urls); + + // Sort the non-leader replicas according to the request parameters + replicaListTransformer.transform(sortedReplicas); + + // put the leaderUrl first. + sortedReplicas.add(0, leader); + + urlMap.put(name, sortedReplicas.stream().map(Replica::getCoreUrl).collect(Collectors.toList())); } return urlMap; } @@ -1046,6 +1056,8 @@ protected NamedList sendRequest(SolrRequest request, List inputC reqParams = new ModifiableSolrParams(); } + ReplicaListTransformer replicaListTransformer = requestRLTGenerator.getReplicaListTransformer(reqParams); + final Set liveNodes = getClusterStateProvider().getLiveNodes(); final List theUrlList = new ArrayList<>(); // we populate this as follows... @@ -1087,34 +1099,38 @@ protected NamedList sendRequest(SolrRequest request, List inputC } // Gather URLs, grouped by leader or replica - // TODO: allow filtering by group, role, etc - Set seenNodes = new HashSet<>(); - List replicas = new ArrayList<>(); - String joinedInputCollections = StrUtils.join(inputCollections, ','); + List sortedReplicas = new ArrayList<>(); + List replicas = new ArrayList<>(); for (Slice slice : slices.values()) { - for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) { - ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps); - String node = coreNodeProps.getNodeName(); + Replica leader = slice.getLeader(); + for (Replica replica : slice.getReplicas()) { + String node = replica.getNodeName(); if (!liveNodes.contains(node) // Must be a live node to continue - || Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue + || replica.getState() != Replica.State.ACTIVE) // Must be an ACTIVE replica to continue continue; - if (seenNodes.add(node)) { // if we haven't yet collected a URL to this node... - String url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), joinedInputCollections); - if (sendToLeaders && coreNodeProps.isLeader()) { - theUrlList.add(url); // put leaders here eagerly (if sendToLeader mode) - } else { - replicas.add(url); // replicas here - } + if (sendToLeaders && replica.equals(leader)) { + sortedReplicas.add(replica); // put leaders here eagerly (if sendToLeader mode) + } else { + replicas.add(replica); // replicas here } } } - // Shuffle the leaders, if any (none if !sendToLeaders) - Collections.shuffle(theUrlList, rand); + // Sort the leader replicas, if any, according to the request preferences (none if !sendToLeaders) + replicaListTransformer.transform(sortedReplicas); + + // Sort the replicas, if any, according to the request preferences and append to our list + replicaListTransformer.transform(replicas); - // Shuffle the replicas, if any, and append to our list - Collections.shuffle(replicas, rand); - theUrlList.addAll(replicas); + sortedReplicas.addAll(replicas); + + String joinedInputCollections = StrUtils.join(inputCollections, ','); + Set seenNodes = new HashSet<>(); + sortedReplicas.forEach( replica -> { + if (seenNodes.add(replica.getNodeName())) { + theUrlList.add(ZkCoreNodeProps.getCoreUrl(replica.getBaseUrl(), joinedInputCollections)); + } + }); if (theUrlList.isEmpty()) { collectionStateCache.keySet().removeAll(collectionNames); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java index 8464cf3e01f7..222098966d02 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java @@ -371,12 +371,13 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead protected void constructStreams() throws IOException { try { - List shardUrls = getShards(this.zkHost, this.collection, this.streamContext); ModifiableSolrParams mParams = new ModifiableSolrParams(params); mParams = adjustParams(mParams); mParams.set(DISTRIB, "false"); // We are the aggregator. + List shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams); + for(String shardUrl : shardUrls) { SolrStream solrStream = new SolrStream(shardUrl, mParams); if(streamContext != null) { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java index d2ef18c2aa80..3881a642c20f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/DeepRandomStream.java @@ -309,12 +309,12 @@ public static Slice[] getSlices(String collectionName, ZkStateReader zkStateRead protected void constructStreams() throws IOException { try { - - List shardUrls = getShards(this.zkHost, this.collection, this.streamContext); - ModifiableSolrParams mParams = new ModifiableSolrParams(params); mParams = adjustParams(mParams); mParams.set(DISTRIB, "false"); // We are the aggregator. + + List shardUrls = getShards(this.zkHost, this.collection, this.streamContext, mParams); + String rows = mParams.get(ROWS); int r = Integer.parseInt(rows); int newRows = r/shardUrls.size(); diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java index 778aacea1101..8243b2af745f 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/StreamContext.java @@ -24,6 +24,8 @@ import org.apache.solr.client.solrj.io.ModelCache; import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; +import org.apache.solr.common.params.SolrParams; /** * The StreamContext is passed to TupleStreams using the TupleStream.setStreamContext() method. @@ -45,6 +47,8 @@ public class StreamContext implements Serializable{ private SolrClientCache clientCache; private ModelCache modelCache; private StreamFactory streamFactory; + private SolrParams requestParams; + private RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator; public ConcurrentMap getObjectCache() { return this.objectCache; @@ -101,4 +105,20 @@ public Map getTupleContext() { public StreamFactory getStreamFactory() { return this.streamFactory; } + + public void setRequestParams(SolrParams requestParams) { + this.requestParams = requestParams; + } + + public SolrParams getRequestParams() { + return requestParams; + } + + public void setRequestReplicaListTransformerGenerator(RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator) { + this.requestReplicaListTransformerGenerator = requestReplicaListTransformerGenerator; + } + + public RequestReplicaListTransformerGenerator getRequestReplicaListTransformerGenerator() { + return requestReplicaListTransformerGenerator; + } } \ No newline at end of file diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java index 94dd9209ad7b..c2957bc0cd62 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/io/stream/TupleStream.java @@ -21,26 +21,28 @@ import java.io.PrintWriter; import java.io.Serializable; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.Random; +import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.Map; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.io.SolrClientCache; import org.apache.solr.client.solrj.io.Tuple; import org.apache.solr.client.solrj.io.comp.StreamComparator; import org.apache.solr.client.solrj.io.stream.expr.Explanation; import org.apache.solr.client.solrj.io.stream.expr.StreamFactory; +import org.apache.solr.client.solrj.routing.ReplicaListTransformer; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; import org.apache.solr.common.IteratorWriter; import org.apache.solr.common.MapWriter; import org.apache.solr.common.cloud.ClusterState; import org.apache.solr.common.cloud.Replica; import org.apache.solr.common.cloud.Slice; -import org.apache.solr.common.cloud.ZkCoreNodeProps; import org.apache.solr.common.cloud.ZkStateReader; +import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.SolrParams; /** @@ -118,6 +120,14 @@ public static List getShards(String zkHost, String collection, StreamContext streamContext) throws IOException { + return getShards(zkHost, collection, streamContext, new ModifiableSolrParams()); + } + + public static List getShards(String zkHost, + String collection, + StreamContext streamContext, + SolrParams requestParams) + throws IOException { Map> shardsMap = null; List shards = new ArrayList(); @@ -130,24 +140,34 @@ public static List getShards(String zkHost, shards = shardsMap.get(collection); } else { //SolrCloud Sharding - CloudSolrClient cloudSolrClient = streamContext.getSolrClientCache().getCloudSolrClient(zkHost); + CloudSolrClient cloudSolrClient = + Optional.ofNullable(streamContext.getSolrClientCache()).orElseGet(SolrClientCache::new).getCloudSolrClient(zkHost); ZkStateReader zkStateReader = cloudSolrClient.getZkStateReader(); ClusterState clusterState = zkStateReader.getClusterState(); Slice[] slices = CloudSolrStream.getSlices(collection, zkStateReader, true); Set liveNodes = clusterState.getLiveNodes(); + + + ModifiableSolrParams solrParams = new ModifiableSolrParams(streamContext.getRequestParams()); + solrParams.add(requestParams); + + RequestReplicaListTransformerGenerator requestReplicaListTransformerGenerator = + Optional.ofNullable(streamContext.getRequestReplicaListTransformerGenerator()).orElseGet(RequestReplicaListTransformerGenerator::new); + + ReplicaListTransformer replicaListTransformer = requestReplicaListTransformerGenerator.getReplicaListTransformer(solrParams); + for(Slice slice : slices) { - Collection replicas = slice.getReplicas(); - List shuffler = new ArrayList<>(); - for(Replica replica : replicas) { - if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) - shuffler.add(replica); + List sortedReplicas = new ArrayList<>(); + for(Replica replica : slice.getReplicas()) { + if(replica.getState() == Replica.State.ACTIVE && liveNodes.contains(replica.getNodeName())) { + sortedReplicas.add(replica); + } } - Collections.shuffle(shuffler, new Random()); - Replica rep = shuffler.get(0); - ZkCoreNodeProps zkProps = new ZkCoreNodeProps(rep); - String url = zkProps.getCoreUrl(); - shards.add(url); + replicaListTransformer.transform(sortedReplicas); + if (sortedReplicas.size() > 0) { + shards.add(sortedReplicas.get(0).getCoreUrl()); + } } } diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java index 4fdab0f918a2..bb8cecb7da55 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/NodePreferenceRulesComparator.java @@ -166,7 +166,7 @@ private static boolean hasReplicaType(Object o, String preferred) { return false; } final String s = ((Replica)o).getType().toString(); - return s.equals(preferred); + return s.equalsIgnoreCase(preferred); } public List getSortRules() { diff --git a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java index 58c8b2e2dfc9..12ce4cf10f3c 100644 --- a/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java +++ b/solr/solrj/src/java/org/apache/solr/client/solrj/routing/RequestReplicaListTransformerGenerator.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Random; import org.apache.solr.common.SolrException; @@ -41,9 +42,13 @@ public class RequestReplicaListTransformerGenerator { (String configSpec, SolrParams requestParams, ReplicaListTransformerFactory fallback) -> shufflingReplicaListTransformer; private final ReplicaListTransformerFactory stableRltFactory; private final ReplicaListTransformerFactory defaultRltFactory; + private final String defaultShardPreferences; + private final String nodeName; + private final String localHostAddress; + private final NodesSysPropsCacher sysPropsCacher; public RequestReplicaListTransformerGenerator() { - this(RANDOM_RLTF); + this(null); } public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory) { @@ -51,16 +56,24 @@ public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defa } public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory) { - this.defaultRltFactory = defaultRltFactory; - if (stableRltFactory == null) { - this.stableRltFactory = new AffinityReplicaListTransformerFactory(); - } else { - this.stableRltFactory = stableRltFactory; - } + this(defaultRltFactory, stableRltFactory, null, null, null, null); + } + + public RequestReplicaListTransformerGenerator(String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) { + this(null, null, defaultShardPreferences, nodeName, localHostAddress, sysPropsCacher); + } + + public RequestReplicaListTransformerGenerator(ReplicaListTransformerFactory defaultRltFactory, ReplicaListTransformerFactory stableRltFactory, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) { + this.defaultRltFactory = Optional.ofNullable(defaultRltFactory).orElse(RANDOM_RLTF); + this.stableRltFactory = Optional.ofNullable(stableRltFactory).orElseGet(AffinityReplicaListTransformerFactory::new); + this.defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(""); + this.nodeName = nodeName; + this.localHostAddress = localHostAddress; + this.sysPropsCacher = sysPropsCacher; } public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams) { - return getReplicaListTransformer(requestParams, ""); + return getReplicaListTransformer(requestParams, null); } public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences) { @@ -70,6 +83,7 @@ public ReplicaListTransformer getReplicaListTransformer(final SolrParams request public ReplicaListTransformer getReplicaListTransformer(final SolrParams requestParams, String defaultShardPreferences, String nodeName, String localHostAddress, NodesSysPropsCacher sysPropsCacher) { @SuppressWarnings("deprecation") final boolean preferLocalShards = requestParams.getBool(CommonParams.PREFER_LOCAL_SHARDS, false); + defaultShardPreferences = Optional.ofNullable(defaultShardPreferences).orElse(this.defaultShardPreferences); final String shardsPreferenceSpec = requestParams.get(ShardParams.SHARDS_PREFERENCE, defaultShardPreferences); if (preferLocalShards || !shardsPreferenceSpec.isEmpty()) { @@ -84,7 +98,15 @@ public ReplicaListTransformer getReplicaListTransformer(final SolrParams request preferenceRules.add(new PreferenceRule(ShardParams.SHARDS_PREFERENCE_REPLICA_LOCATION, ShardParams.REPLICA_LOCAL)); } - NodePreferenceRulesComparator replicaComp = new NodePreferenceRulesComparator(preferenceRules, requestParams, nodeName, localHostAddress, sysPropsCacher, defaultRltFactory, stableRltFactory); + NodePreferenceRulesComparator replicaComp = + new NodePreferenceRulesComparator( + preferenceRules, + requestParams, + Optional.ofNullable(nodeName).orElse(this.nodeName), + Optional.ofNullable(localHostAddress).orElse(this.localHostAddress), + Optional.ofNullable(sysPropsCacher).orElse(this.sysPropsCacher), + defaultRltFactory, + stableRltFactory); ReplicaListTransformer baseReplicaListTransformer = replicaComp.getBaseReplicaListTransformer(); if (replicaComp.getSortRules() == null) { // only applying base transformation diff --git a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java index bc57176377a7..5ff10c262af7 100644 --- a/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java +++ b/solr/solrj/src/java/org/apache/solr/common/cloud/Replica.java @@ -102,7 +102,7 @@ public enum Type { PULL; public static Type get(String name){ - return name == null ? Type.NRT : Type.valueOf(name); + return name == null ? Type.NRT : Type.valueOf(name.toUpperCase(Locale.ROOT)); } } diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java index 793ee5fa9e55..74b02cb4da79 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudHttp2SolrClientTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -488,6 +489,74 @@ private void queryWithShardsPreferenceRules(CloudHttp2SolrClient cloudClient, shardAddresses.size() > 1 && ports.size()==1); } + + + /** + * Tests if the 'shards.preference' parameter works with single-sharded collections. + */ + @Test + public void singleShardedPreferenceRules() throws Exception { + String collectionName = "singleShardPreferenceTestColl"; + + int liveNodes = cluster.getJettySolrRunners().size(); + + // For testing replica.type, we want to have all replica types available for the collection + CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3) + .setMaxShardsPerNode(liveNodes) + .processAndWait(cluster.getSolrClient(), TIMEOUT); + cluster.waitForActiveCollection(collectionName, 1, liveNodes); + + // Add some new documents + new UpdateRequest() + .add(id, "0", "a_t", "hello1") + .add(id, "2", "a_t", "hello2") + .add(id, "3", "a_t", "hello2") + .commit(getRandomClient(), collectionName); + + // Run the actual test for 'queryReplicaType' + queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName); + queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName); + queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName); + } + + private void queryReplicaType(CloudHttp2SolrClient cloudClient, + Replica.Type typeToQuery, + String collectionName) + throws Exception + { + SolrQuery qRequest = new SolrQuery("*:*"); + + ModifiableSolrParams qParams = new ModifiableSolrParams(); + qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString()); + qParams.add(ShardParams.SHARDS_INFO, "true"); + qRequest.add(qParams); + + Map replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName)); + + QueryResponse qResponse = cloudClient.query(collectionName, qRequest); + + Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO); + assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo); + + // Iterate over shards-info and check what cores responded + SimpleOrderedMap shardsInfoMap = (SimpleOrderedMap)shardsInfo; + Iterator> itr = shardsInfoMap.asMap(100).entrySet().iterator(); + List shardAddresses = new ArrayList(); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map); + String shardAddress = (String)((Map)e.getValue()).get("shardAddress"); + if (shardAddress.endsWith("/")) { + shardAddress = shardAddress.substring(0, shardAddress.length() - 1); + } + assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress); + shardAddresses.add(shardAddress); + } + assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size()); + + assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT)); + } + private Long getNumRequests(String baseUrl, String collectionName) throws SolrServerException, IOException { return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java index 0025aced05b3..57050ce2e438 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/impl/CloudSolrClientTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -427,8 +428,8 @@ public void preferLocalShardsTest() throws Exception { @SuppressWarnings("deprecation") private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient, - boolean useShardsPreference, - String collectionName) + boolean useShardsPreference, + String collectionName) throws Exception { SolrQuery qRequest = new SolrQuery("*:*"); @@ -476,6 +477,72 @@ private void queryWithShardsPreferenceRules(CloudSolrClient cloudClient, shardAddresses.size() > 1 && ports.size()==1); } + /** + * Tests if the 'shards.preference' parameter works with single-sharded collections. + */ + @Test + public void singleShardedPreferenceRules() throws Exception { + String collectionName = "singleShardPreferenceTestColl"; + + int liveNodes = cluster.getJettySolrRunners().size(); + + // For testing replica.type, we want to have all replica types available for the collection + CollectionAdminRequest.createCollection(collectionName, "conf", 1, liveNodes/3, liveNodes/3, liveNodes/3) + .setMaxShardsPerNode(liveNodes) + .processAndWait(cluster.getSolrClient(), TIMEOUT); + cluster.waitForActiveCollection(collectionName, 1, liveNodes); + + // Add some new documents + new UpdateRequest() + .add(id, "0", "a_t", "hello1") + .add(id, "2", "a_t", "hello2") + .add(id, "3", "a_t", "hello2") + .commit(getRandomClient(), collectionName); + + // Run the actual test for 'queryReplicaType' + queryReplicaType(getRandomClient(), Replica.Type.PULL, collectionName); + queryReplicaType(getRandomClient(), Replica.Type.TLOG, collectionName); + queryReplicaType(getRandomClient(), Replica.Type.NRT, collectionName); + } + + private void queryReplicaType(CloudSolrClient cloudClient, + Replica.Type typeToQuery, + String collectionName) + throws Exception + { + SolrQuery qRequest = new SolrQuery("*:*"); + + ModifiableSolrParams qParams = new ModifiableSolrParams(); + qParams.add(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":" + typeToQuery.toString()); + qParams.add(ShardParams.SHARDS_INFO, "true"); + qRequest.add(qParams); + + Map replicaTypeToReplicas = mapReplicasToReplicaType(getCollectionState(collectionName)); + + QueryResponse qResponse = cloudClient.query(collectionName, qRequest); + + Object shardsInfo = qResponse.getResponse().get(ShardParams.SHARDS_INFO); + assertNotNull("Unable to obtain "+ShardParams.SHARDS_INFO, shardsInfo); + + // Iterate over shards-info and check what cores responded + SimpleOrderedMap shardsInfoMap = (SimpleOrderedMap)shardsInfo; + Iterator> itr = shardsInfoMap.asMap(100).entrySet().iterator(); + List shardAddresses = new ArrayList(); + while (itr.hasNext()) { + Map.Entry e = itr.next(); + assertTrue("Did not find map-type value in "+ShardParams.SHARDS_INFO, e.getValue() instanceof Map); + String shardAddress = (String)((Map)e.getValue()).get("shardAddress"); + if (shardAddress.endsWith("/")) { + shardAddress = shardAddress.substring(0, shardAddress.length() - 1); + } + assertNotNull(ShardParams.SHARDS_INFO+" did not return 'shardAddress' parameter", shardAddress); + shardAddresses.add(shardAddress); + } + assertEquals("Shard addresses must be of size 1, since there is only 1 shard in the collection", 1, shardAddresses.size()); + + assertEquals("Make sure that the replica queried was the replicaType desired", typeToQuery.toString().toUpperCase(Locale.ROOT), replicaTypeToReplicas.get(shardAddresses.get(0)).toUpperCase(Locale.ROOT)); + } + private Long getNumRequests(String baseUrl, String collectionName) throws SolrServerException, IOException { return getNumRequests(baseUrl, collectionName, "QUERY", "/select", null, false); diff --git a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java index 2ecdf24dabc2..475b74d93389 100644 --- a/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java +++ b/solr/solrj/src/test/org/apache/solr/client/solrj/io/stream/StreamingTest.java @@ -48,8 +48,11 @@ import org.apache.solr.client.solrj.io.stream.metrics.SumMetric; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; +import org.apache.solr.client.solrj.routing.RequestReplicaListTransformerGenerator; import org.apache.solr.cloud.SolrCloudTestCase; +import org.apache.solr.common.cloud.ZkStateReader; import org.apache.solr.common.params.ModifiableSolrParams; +import org.apache.solr.common.params.ShardParams; import org.apache.solr.common.params.SolrParams; import org.junit.Assume; import org.junit.Before; @@ -68,6 +71,7 @@ public class StreamingTest extends SolrCloudTestCase { public static final String COLLECTIONORALIAS = "streams"; +public static final String MULTI_REPLICA_COLLECTIONORALIAS = "streams-multi-replica"; private static final StreamFactory streamFactory = new StreamFactory() .withFunctionName("search", CloudSolrStream.class) @@ -100,7 +104,8 @@ public static void configureCluster() throws Exception { } else { collection = COLLECTIONORALIAS; } - CollectionAdminRequest.createCollection(collection, "conf", numShards, 1).process(cluster.getSolrClient()); + CollectionAdminRequest.createCollection(collection, "conf", numShards, 1) + .process(cluster.getSolrClient()); cluster.waitForActiveCollection(collection, numShards, numShards); if (useAlias) { CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection).process(cluster.getSolrClient()); @@ -108,6 +113,20 @@ public static void configureCluster() throws Exception { zkHost = cluster.getZkServer().getZkAddress(); streamFactory.withCollectionZkHost(COLLECTIONORALIAS, zkHost); + + // Set up multi-replica collection + if (useAlias) { + collection = MULTI_REPLICA_COLLECTIONORALIAS + "_collection"; + } else { + collection = MULTI_REPLICA_COLLECTIONORALIAS; + } + CollectionAdminRequest.createCollection(collection, "conf", numShards, 1, 1, 1) + .setMaxShardsPerNode(numShards * 3) + .process(cluster.getSolrClient()); + cluster.waitForActiveCollection(collection, numShards, numShards * 3); + if (useAlias) { + CollectionAdminRequest.createAlias(MULTI_REPLICA_COLLECTIONORALIAS, collection).process(cluster.getSolrClient()); + } } private static final String id = "id"; @@ -2435,6 +2454,43 @@ private void tryWithQt(String which) throws IOException { } } + + @Test + public void testTupleStreamGetShardsPreference() throws Exception { + StreamContext streamContext = new StreamContext(); + streamContext.setSolrClientCache(new SolrClientCache()); + streamContext.setRequestReplicaListTransformerGenerator(new RequestReplicaListTransformerGenerator(ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":TLOG", null, null, null)); + + streamContext.setRequestParams(mapParams(ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":nrt")); + + try { + ZkStateReader zkStateReader = cluster.getSolrClient().getZkStateReader(); + List strings = zkStateReader.aliasesManager.getAliases().resolveAliases(MULTI_REPLICA_COLLECTIONORALIAS); + String collName = strings.size() > 0 ? strings.get(0) : MULTI_REPLICA_COLLECTIONORALIAS; + Map replicaTypeMap = mapReplicasToReplicaType(zkStateReader.getClusterState().getCollectionOrNull(collName)); + + // Test from extra params + SolrParams sParams = mapParams("q", "*:*", ShardParams.SHARDS_PREFERENCE, ShardParams.SHARDS_PREFERENCE_REPLICA_TYPE + ":pull"); + testTupleStreamSorting(streamContext, sParams, "PULL", replicaTypeMap); + + // Test defaults from streamContext.getParams() + testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "NRT", replicaTypeMap); + + // Test defaults from the RLTG + streamContext.setRequestParams(new ModifiableSolrParams()); + testTupleStreamSorting(streamContext, new ModifiableSolrParams(), "TLOG", replicaTypeMap); + } finally { + streamContext.getSolrClientCache().close(); + } + } + + public void testTupleStreamSorting(StreamContext streamContext, SolrParams solrParams, String replicaType, Map replicaTypeMap) throws Exception { + List shards = TupleStream.getShards(cluster.getZkClient().getZkServerAddress(), MULTI_REPLICA_COLLECTIONORALIAS, streamContext, solrParams); + for (String shard : shards) { + assertEquals(shard, replicaType.toUpperCase(Locale.ROOT), replicaTypeMap.getOrDefault(shard, "").toUpperCase(Locale.ROOT)); + } + } + protected List getTuples(TupleStream tupleStream) throws IOException { tupleStream.open(); List tuples = new ArrayList(); diff --git a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java index 4ce7a5ebd823..b8e0798dc0e1 100644 --- a/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java +++ b/solr/test-framework/src/java/org/apache/solr/cloud/SolrCloudTestCase.java @@ -490,4 +490,21 @@ public static void ensureRunningJettys(int nodeCount, int timeoutSeconds) throws cluster.waitForAllNodes(timeoutSeconds); } + public static Map mapReplicasToReplicaType(DocCollection collection) { + Map replicaTypeMap = new HashMap<>(); + for (Slice slice : collection.getSlices()) { + for (Replica replica : slice.getReplicas()) { + String coreUrl = replica.getCoreUrl(); + // It seems replica reports its core URL with a trailing slash while shard + // info returned from the query doesn't. Oh well. We will include both, just in case + replicaTypeMap.put(coreUrl, replica.getType().toString()); + if (coreUrl.endsWith("/")) { + replicaTypeMap.put(coreUrl.substring(0, coreUrl.length() - 1), replica.getType().toString()); + }else { + replicaTypeMap.put(coreUrl + "/", replica.getType().toString()); + } + } + } + return replicaTypeMap; + } }