Permalink
Browse files

fix id overwrite bug when indexing from many nodes at once

  • Loading branch information...
1 parent 9dc5269 commit 02f35ed3a42f1f6599a63cc72ac55ddb160d353a @tjake committed Jul 6, 2011
Showing with 7 additions and 13 deletions.
  1. +7 −13 src/lucandra/cluster/CassandraIndexManager.java
@@ -49,8 +49,7 @@
{
// To increase throughput we distribute docs across a number of shards at
- // once
- // The idea being different shards live on different boxes
+ // once. The idea being different shards live on different boxes
protected final int shardsAtOnce;
private int[] randomSeq;
@@ -180,7 +179,7 @@ public Long getNextId()
int nextId = info.currentId.incrementAndGet();
if (nextId <= info.endId)
- {
+ {
return (long) (maxDocsPerShard * info.shard) + nextId;
}
else
@@ -805,7 +804,6 @@ private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken)
else
{
//Mark this offset as taken.
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, updateNodeOffset(indexName, myToken, node.shard, nextOffset));
usedNodeInfo.nodes.put(""+nextOffset, new AtomicInteger(1));
// we lost, try try again...
@@ -902,9 +900,8 @@ private int getRandomSequenceOffset(int offset)
// initialize shards we didn't know about
if (offset == null)
{
- RowMutation rm = updateNodeOffset(shards.indexName, myToken, shard.getKey(), -1);
+ updateNodeOffset(shards.indexName, myToken, shard.getKey(), -1);
offset = nodes.nodes.get(myToken);
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
}
int randomSeqOffset = getRandomSequenceOffset(offset.get());
@@ -927,16 +924,15 @@ private int getRandomSequenceOffset(int offset)
// new shards
for (int i = pickedShard; i < shardsAtOnce; i++)
{
- picked[i] = addNewShard(shards.indexName);
+ picked[i] = addNewShard(shards);
}
return picked;
}
- private NodeInfo addNewShard(String indexName) throws IOException
+ private NodeInfo addNewShard(ShardInfo shards) throws IOException
{
- ShardInfo shards = getShardInfo(indexName, true);
// get max shard
Integer maxShard = -1;
@@ -963,11 +959,9 @@ private NodeInfo addNewShard(String indexName) throws IOException
NodeInfo dupNodes = null;
if ((dupNodes = shards.shards.putIfAbsent(nodes.shard, nodes)) == null)
{
- logger.info("added new shard for " + indexName + "("+getToken()+") " + nodes.shard);
+ logger.info("added new shard for " + shards.indexName + "("+getToken()+") " + nodes.shard);
- RowMutation rm = updateNodeOffset(indexName, getToken(), nodes.shard, -1);
-
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ updateNodeOffset(shards.indexName, getToken(), nodes.shard, -1);
}

0 comments on commit 02f35ed

Please sign in to comment.