Skip to content

Commit

Permalink
Fix a search routing issue in a multi-node cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
vroyer committed Feb 27, 2017
1 parent d35d128 commit abf7e17
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright (c) 2017 Strapdata (http://www.strapdata.com)
* Contains some code from Elasticsearch (http://www.elastic.co)
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.service;

import static com.google.common.collect.Sets.newHashSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2389,7 +2389,7 @@ private static Object value(FieldMapper mapper, Object value) throws IOException
}
}

public static int defaultPrecisionStep = 8;
public static int defaultPrecisionStep = 16;
public static Query newTokenRangeQuery(Collection<Range<Token>> tokenRanges) {
Query tokenRangeQuery = null;
if (tokenRanges != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public abstract class Router {
protected final TokenMetadata metadata;
protected final AbstractReplicationStrategy strategy;

public Router(final String index, final String ksName, final Map<UUID, ShardRoutingState> shardStates, final ClusterState clusterState)
public Router(final String index, final String ksName, final Map<UUID, ShardRoutingState> shardStates, final ClusterState clusterState, boolean includeReplica)
{
this.index = index;
this.ksName = ksName;
Expand All @@ -112,10 +112,10 @@ public Router(final String index, final String ksName, final Map<UUID, ShardRout
} else {
this.strategy = null;
this.metadata = null;
this.tokenToNodes.put(TOKEN_MAX, localNode);
}

this.tokens = new ArrayList<Token>(this.tokenToNodes.keys());
this.tokens.add(TOKEN_MAX);
Collections.sort(tokens);
if (logger.isTraceEnabled())
logger.trace("index=[{}] keyspace=[{}] ordered tokens={}",index, ksName, this.tokens);
Expand All @@ -132,14 +132,18 @@ public Router(final String index, final String ksName, final Map<UUID, ShardRout
UUID uuid = StorageService.instance.getHostId(endpoint);
DiscoveryNode node = (uuid == null) ? clusterState.nodes().findByInetAddress(endpoint) : clusterState.nodes().get(uuid.toString());
assert node != null : "Cannot find node with ip = " + endpoint ;
if (node != null && ShardRoutingState.STARTED.equals(shardStates.get(node.uuid()))) {
orphanRange = false;
BitSet bs = greenShards.get(node);
if (bs == null) {
bs = new BitSet(tokens.size() - 1);
greenShards.put(node, bs);
if (node != null) {
if (ShardRoutingState.STARTED.equals(shardStates.get(node.uuid()))) {
orphanRange = false;
BitSet bs = greenShards.get(node);
if (bs == null) {
bs = new BitSet(tokens.size() - 1);
greenShards.put(node, bs);
}
bs.set(i);
if (!includeReplica)
break;
}
bs.set(i);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class PrimaryFirstRouter extends Router {
Route route;

public PrimaryFirstRouter(final String index, final String ksName, final Map<UUID, ShardRoutingState> shardStates, final ClusterState clusterState) {
super(index, ksName, shardStates, clusterState);
super(index, ksName, shardStates, clusterState, false);

if (!StorageService.instance.isJoined() || !Keyspace.isInitialized()) {
// temporary fake routing table in order to start local shards before cassandra services.
Expand All @@ -65,22 +65,6 @@ public Map<DiscoveryNode, BitSet> selectedShards() {
return;
}

// clear replica ranges from bitset of greenShards.
for(DiscoveryNode node : greenShards.keySet()) {
for(Range<Token> primaryRange : StorageService.instance.getPrimaryRangeForEndpointWithinDC(ksName, node.getInetAddress())) {
int rightTokenIndex = this.tokens.indexOf(primaryRange.right);
if (logger.isTraceEnabled())
logger.trace("primaryRange={} idx={} wrapped={}", primaryRange, rightTokenIndex, primaryRange.isWrapAround());

if (primaryRange.isWrapAround())
// remove the higher token even if the current token belongs to another DC
clearReplicaRange(node.getInetAddress(), this.tokens.size() - 1, TOKEN_MAX, clusterState);

if (rightTokenIndex >= 0)
clearReplicaRange(node.getInetAddress(), rightTokenIndex, primaryRange.right, clusterState);
}
}

if (logger.isTraceEnabled())
logger.trace("index={} keyspace={} greenShards={} yellowShards={} redShards={}", index, ksName, greenShards, yellowShards, redShards);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class RandomRouter extends Router {
Random rnd = new Random();

public RandomRouter(final String index, final String ksName, final Map<UUID, ShardRoutingState> shardStates, final ClusterState clusterState) {
super(index, ksName, shardStates, clusterState);
super(index, ksName, shardStates, clusterState, true);
}

@Override
Expand Down

0 comments on commit abf7e17

Please sign in to comment.