Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

ignore shards already taken by other tokens

  • Loading branch information...
commit da2edc3905d0cd99287ed8fee5f5fe7f9e057428 1 parent 02f35ed
@tjake authored
Showing with 35 additions and 27 deletions.
  1. +35 −27 src/lucandra/cluster/CassandraIndexManager.java
View
62 src/lucandra/cluster/CassandraIndexManager.java
@@ -68,7 +68,6 @@
public final int expirationTime = 120; // seconds
private final ConcurrentMap<String, AllNodeRsvps> indexReserves = new MapMaker().makeMap();
-
private final ConcurrentMap<String, ShardInfo> indexShards = new MapMaker().makeMap();
private final ConcurrentMap<String, ShardInfo> indexUsed = new MapMaker().makeMap();
@@ -76,15 +75,26 @@
private class ShardInfo
{
- public final String indexName;
- public final long ttl = System.currentTimeMillis()
- + (expirationTime * 1000) - 1000;
+ public final String indexName;
+ private long ttl;
public final ConcurrentSkipListMap<Integer, NodeInfo> shards = new ConcurrentSkipListMap<Integer, NodeInfo>();
public ShardInfo(String indexName)
{
this.indexName = indexName;
+ renew();
+ }
+
+ public void renew()
+ {
+ ttl = System.currentTimeMillis() + (expirationTime * 1000) - 1000;
}
+
+ public long ttl()
+ {
+ return ttl;
+ }
+
}
private class NodeInfo
@@ -286,9 +296,9 @@ public CassandraIndexManager(int shardsAtOnce)
randomSeq = shuffle(randomSeq, r);
}
- private ShardInfo getShardInfo(String indexName, boolean force) throws IOException
- {
-
+ private synchronized ShardInfo getShardInfo(String indexName, boolean force) throws IOException
+ {
+
ShardInfo shards = indexShards.get(indexName);
ShardInfo currentShards = shards;
@@ -361,9 +371,12 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
if(startSeqOffset == seqOffset)
{
- logger.info("Found reserved shard"+shardStr+"("+token+"):"+(offset.get()+1)+" TO " + (randomSeq[seqOffset]+reserveSlabSize));
- allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get()+1, (randomSeq[seqOffset]+reserveSlabSize), nodes.shard, token));
- }
+ if(token.equals(getToken()))
+ {
+ logger.info("Found reserved shard"+shardStr+"("+token+"):"+(offset.get()+1)+" TO " + (randomSeq[seqOffset]+reserveSlabSize));
+ allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get()+1, (randomSeq[seqOffset]+reserveSlabSize), nodes.shard, token));
+ }
+ }
}
}
}
@@ -376,7 +389,7 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
if (currentShards == null)
{
- currentShards = indexShards.putIfAbsent(indexName, shards);
+ currentShards = indexShards.put(indexName, shards);
if (currentShards == null)
{
@@ -385,21 +398,15 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
return shards;
}
}
- else if (indexShards.replace(indexName, currentShards, shards))
- {
-
- logger.info(indexName + " has " + shards.shards.size() + " shards");
-
-
- currentShards = shards;
- }
else
{
//Merge together active and new
for (Map.Entry<Integer, NodeInfo> entry : shards.shards.entrySet())
{
- currentShards.shards.put(entry.getKey(), entry.getValue());
+ currentShards.shards.putIfAbsent(entry.getKey(), entry.getValue());
}
+
+ currentShards.renew();
}
AllNodeRsvps currentNodeRsvps = indexReserves.get(indexName);
@@ -758,17 +765,18 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
{
continue;
}
-
- if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
- {
- winningToken = c.name();
- }
-
+
if (c.timestamp() < minTtl)
{
minTtl = c.timestamp();
winningToken = c.name();
- }
+ }
+
+ //incase of a tie the token is the tiebreaker
+ if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
+ {
+ winningToken = c.name();
+ }
}
String winningTokenStr;
Please sign in to comment.
Something went wrong with that request. Please try again.