Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fix increase in shards caused by reverves jumping ahead to the wrong …

…slot
  • Loading branch information...
commit 97079db2872cc358032f0aaa3dbeb01185144831 1 parent d1e0ff1
@tjake authored
View
409 src/lucandra/cluster/CassandraIndexManager.java
@@ -38,7 +38,6 @@
import java.util.concurrent.atomic.AtomicLong;
import lucandra.CassandraUtils;
-import lucandra.Pair;
import org.apache.cassandra.db.DeletedColumn;
import org.apache.cassandra.db.ExpiringColumn;
@@ -90,19 +89,18 @@
public ShardInfo(String indexName)
{
this.indexName = indexName;
- renew();
+ ttl = getNewTTL();
}
- public void renew()
+ public void renew(long ttl)
{
- ttl = System.currentTimeMillis() + (expirationTime * 1000) + r.nextInt(5000);
+ this.ttl = ttl;
}
public long ttl()
{
return ttl;
}
-
}
private class NodeInfo
@@ -196,7 +194,10 @@ public Long getNextId()
if (nextId <= info.endId)
{
- return (long) (maxDocsPerShard * info.shard) + nextId;
+ long id = (long) (maxDocsPerShard * info.shard) + nextId;
+ if(id == 1048577)
+ logger.error("FOUND ID: "+nextId + " "+info.shard+" "+getToken());
+ return id;
} else
{
rsvpList.set(pos, null);
@@ -214,14 +215,15 @@ public Long getNextId()
public Integer shard;
public AtomicInteger currentId;
public final int endId;
- public final long ttl = System.currentTimeMillis() + (expirationTime * 1000);
+ public final long ttl;
- public RsvpInfo(int startId, int endId, int shard, String token)
+ public RsvpInfo(int startId, int endId, int shard, String token, final long ttl)
{
currentId = new AtomicInteger(startId);
this.endId = endId;
this.token = token;
this.shard = shard;
+ this.ttl = ttl;
}
public int hashCode()
@@ -297,6 +299,11 @@ public CassandraIndexManager(int shardsAtOnce)
randomSeq = shuffle(randomSeq, r);
}
+ private long getNewTTL()
+ {
+ return System.currentTimeMillis() + (expirationTime * 1000) + r.nextInt(5000);
+ }
+
private synchronized ShardInfo getShardInfo(String indexName, boolean force) throws IOException
{
@@ -329,6 +336,8 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
shards = new ShardInfo(indexName);
AllNodeRsvps allNodeRsvps = new AllNodeRsvps();
+ long nextTTL = shards.ttl();
+
if (rows != null && !rows.isEmpty())
{
assert rows.size() == 1;
@@ -371,16 +380,18 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
if (offset.get() < (maxDocsPerShard - 1))
{
int seqOffset = getRandomSequenceOffset(offset.get() + 1);
+ int prevSeqOffset = getRandomSequenceOffset(offset.get() - 1);
- if (startSeqOffset == seqOffset)
+ // Only save if this is not on a slot boundry
+ if (startSeqOffset == seqOffset && prevSeqOffset == seqOffset)
{
if (token.equals(getToken()))
{
logger.info("Found reserved shard" + shardStr + "(" + token + "):"
+ (offset.get() + 1) + " TO "
+ (randomSeq[seqOffset] + reserveSlabSize));
- allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 1,
- (randomSeq[seqOffset] + reserveSlabSize), nodes.shard, token));
+ allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 1, (randomSeq[seqOffset]
+ + reserveSlabSize - 1), nodes.shard, token, nextTTL));
}
}
}
@@ -413,18 +424,12 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
currentShards.shards.put(shard, entry.getValue());
}
- currentShards.renew();
+ currentShards.renew(nextTTL);
}
- AllNodeRsvps currentNodeRsvps = indexReserves.get(indexName);
-
- for (RsvpInfo rsvp : allNodeRsvps.rsvpList)
- {
- if (!currentNodeRsvps.rsvpList.contains(rsvp))
- {
- currentNodeRsvps.rsvpList.add(rsvp);
- }
- }
+
+ indexReserves.put(indexName, allNodeRsvps);
+
return currentShards;
}
@@ -473,23 +478,22 @@ public Long getId(String indexName, String key) throws IOException
{
return checkForUpdate(indexName, key);
}
-
-
+
public RowMutation getIdMutation(String indexName, String key, Long id) throws IOException
{
-
+
int shard = getShardFromDocId(id);
- ByteBuffer idCol = ByteBufferUtil.bytes(String.valueOf(getShardedDocId(id)));
+ ByteBuffer idCol = ByteBufferUtil.bytes(String.valueOf(getShardedDocId(id)));
ByteBuffer keyCol = ByteBuffer.wrap(key.getBytes("UTF-8"));
-
+
// Permanently mark the id as taken
ByteBuffer idKey = CassandraUtils.hashKeyBytes((indexName + "~" + shard).getBytes("UTF-8"),
CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, idCol, ByteBuffer.wrap(getToken().getBytes("UTF-8"))), keyCol, System.currentTimeMillis());
-
-
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, idCol, ByteBuffer
+ .wrap(getToken().getBytes("UTF-8"))), keyCol, System.currentTimeMillis());
+
return rm;
}
@@ -555,7 +559,7 @@ public long getNextId(String indexName, String key, RowMutation[] rowMutations)
shards = getShardInfo(indexName, false);
nodes = pickAShard(shards);
- id = nextReservedId(indexName, nodes, myToken);
+ id = nextReservedId(indexName, shards, nodes, myToken);
if (id == null)
{
@@ -634,239 +638,248 @@ public void resetCounter(String indexName) throws IOException
CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rms.toArray(new RowMutation[] {}));
}
- private Long nextReservedId(String indexName, NodeInfo[] shards, String myToken) throws IOException
+ private Long nextReservedId(String indexName, ShardInfo shards, NodeInfo[] nodes, String myToken)
+ throws IOException
{
- if (logger.isDebugEnabled())
- logger.debug("in reserveIds for index " + indexName);
-
- AllNodeRsvps currentRsvpd = indexReserves.get(indexName);
- if (currentRsvpd != null)
+ synchronized (indexName.intern())
{
- Long nextId = currentRsvpd.getNextId();
-
- if (nextId != null)
- return nextId;
-
- // if (logger.isDebugEnabled())
- logger.debug("need more ids for " + myToken);
- }
- AllNodeRsvps allNewRsvps = new AllNodeRsvps();
- ShardInfo usedShardInfo = indexUsed.get(indexName);
- if (usedShardInfo == null)
- {
- usedShardInfo = new ShardInfo(indexName);
- indexUsed.put(indexName, usedShardInfo);
- }
+ if (logger.isDebugEnabled())
+ logger.debug("in reserveIds for index " + indexName);
- // Pick a new shard
- for (NodeInfo node : shards)
- {
- AtomicInteger offset = node.nodes.get(myToken);
+ AllNodeRsvps currentRsvpd = indexReserves.get(indexName);
- assert offset != null;
+ if (currentRsvpd != null)
+ {
+ Long nextId = currentRsvpd.getNextId();
- int startingOffset = offset.get();
- int nextOffset = startingOffset;
+ if (nextId != null)
+ return nextId;
- // goto next offset marker (unless its the first or last)
- int randomSequenceOffset = getRandomSequenceOffset(startingOffset);
+ //if (logger.isDebugEnabled())
+ logger.info("need more ids for " +indexName+" "+myToken);
+ }
- NodeInfo usedNodeInfo = usedShardInfo.shards.get(node.shard);
- if (usedNodeInfo == null)
+ AllNodeRsvps allNewRsvps = new AllNodeRsvps();
+ ShardInfo usedShardInfo = indexUsed.get(indexName);
+ if (usedShardInfo == null)
{
- usedNodeInfo = new NodeInfo(node.shard);
- usedShardInfo.shards.put(node.shard, usedNodeInfo);
+ usedShardInfo = new ShardInfo(indexName);
+ indexUsed.put(indexName, usedShardInfo);
}
- if (startingOffset != randomSeq[0])
+ // Pick a new shard
+ for (NodeInfo node : nodes)
{
- if (randomSequenceOffset != (offsetSlots - 1))
- {
- nextOffset = randomSeq[randomSequenceOffset + 1];
- } else
+ AtomicInteger offset = node.nodes.get(myToken);
+
+ assert offset != null;
+
+ int startingOffset = offset.get();
+ int nextOffset = startingOffset;
+
+ // goto next offset marker (unless its the first or last)
+ int randomSequenceOffset = getRandomSequenceOffset(startingOffset);
+
+ NodeInfo usedNodeInfo = usedShardInfo.shards.get(node.shard);
+ if (usedNodeInfo == null)
{
- continue;
+ usedNodeInfo = new NodeInfo(node.shard);
+ usedShardInfo.shards.put(node.shard, usedNodeInfo);
}
- }
- if (logger.isTraceEnabled())
- logger.trace(myToken + " startingOffset = " + startingOffset + ", nextOffset = " + nextOffset);
+ if (startingOffset != randomSeq[0])
+ {
+ if (randomSequenceOffset != (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[randomSequenceOffset + 1];
+ } else
+ {
+ continue;
+ }
+ }
- while (true)
- {
+ if (logger.isTraceEnabled())
+ logger.trace(myToken + " startingOffset = " + startingOffset + ", nextOffset = " + nextOffset);
- // Avoid re-checking used slabs
- if (usedNodeInfo != null)
+ while (true)
{
- if (usedNodeInfo.nodes.get("" + nextOffset) != null)
- {
- updateNodeOffset(indexName, myToken, node.shard, nextOffset);
- // try next offset
- int seqOffset = getRandomSequenceOffset(nextOffset);
- if (seqOffset < (offsetSlots - 1))
- {
- nextOffset = randomSeq[seqOffset + 1];
- continue;
- } else
+ // Avoid re-checking used slabs
+ if (usedNodeInfo != null)
+ {
+ if (usedNodeInfo.nodes.get("" + nextOffset) != null)
{
- break;
+ updateNodeOffset(indexName, myToken, node.shard, nextOffset);
+
+ // try next offset
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if (seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset + 1];
+ continue;
+ } else
+ {
+ break;
+ }
}
}
- }
-
- ByteBuffer key = CassandraUtils.hashKeyBytes((indexName + "~" + node.shard).getBytes("UTF-8"),
- CassandraUtils.delimeterBytes, "rsvp".getBytes("UTF-8"));
- // Write the reserves
- RowMutation rm = new RowMutation(CassandraUtils.keySpace, key);
+ ByteBuffer key = CassandraUtils.hashKeyBytes((indexName + "~" + node.shard).getBytes("UTF-8"),
+ CassandraUtils.delimeterBytes, "rsvp".getBytes("UTF-8"));
- ByteBuffer id = ByteBufferUtil.bytes(String.valueOf(nextOffset));
- ByteBuffer off = id;
+ // Write the reserves
+ RowMutation rm = new RowMutation(CassandraUtils.keySpace, key);
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
- .getBytes("UTF-8"))), off, System.currentTimeMillis(), expirationTime);
+ ByteBuffer id = ByteBufferUtil.bytes(String.valueOf(nextOffset));
+ ByteBuffer off = id;
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
+ .getBytes("UTF-8"))), off, System.currentTimeMillis(), expirationTime);
- // Give it time to sink in, in-case clocks are off...
- try
- {
- Thread.sleep(100);
- } catch (InterruptedException e1)
- {
- }
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- // Read the columns back
- IColumn supercol = null;
- int attempts = 0;
- while (supercol == null && attempts < CassandraUtils.retryAttempts)
- {
+ // Give it time to sink in, in-case clocks are off across
+ // nodes...
try
{
- List<Row> rows = CassandraUtils.robustRead(key, new QueryPath(
- CassandraUtils.schemaInfoColumnFamily), Arrays.asList(id), ConsistencyLevel.QUORUM);
-
- if (rows == null || rows.size() == 0)
- {
- continue;
- }
+ Thread.sleep(100);
+ } catch (InterruptedException e1)
+ {
+ }
- if (rows.size() == 1)
+ // Read the columns back
+ IColumn supercol = null;
+ int attempts = 0;
+ while (supercol == null && attempts < CassandraUtils.retryAttempts)
+ {
+ try
{
- Row row = rows.get(0);
+ List<Row> rows = CassandraUtils.robustRead(key, new QueryPath(
+ CassandraUtils.schemaInfoColumnFamily), Arrays.asList(id), ConsistencyLevel.QUORUM);
- if (row.cf == null || row.cf.isMarkedForDelete())
+ if (rows == null || rows.size() == 0)
{
continue;
}
- supercol = rows.get(0).cf.getColumn(id);
+ if (rows.size() == 1)
+ {
+ Row row = rows.get(0);
+
+ if (row.cf == null || row.cf.isMarkedForDelete())
+ {
+ continue;
+ }
+
+ supercol = rows.get(0).cf.getColumn(id);
+ }
+ } catch (IOException e)
+ {
+ // let's try again...
}
- } catch (IOException e)
- {
- // let's try again...
+
+ attempts++;
}
- attempts++;
- }
+ if (supercol == null)
+ throw new IllegalStateException("just wrote " + offset + ", but didn't read it");
- if (supercol == null)
- throw new IllegalStateException("just wrote " + offset + ", but didn't read it");
+ long minTtl = Long.MAX_VALUE;
+ ByteBuffer winningToken = null;
- long minTtl = Long.MAX_VALUE;
- ByteBuffer winningToken = null;
+ // See which ones we successfully reserved
+ for (IColumn c : supercol.getSubColumns())
+ {
- // See which ones we successfully reserved
- for (IColumn c : supercol.getSubColumns())
- {
+ // someone already took this id
+ if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
- // someone already took this id
- if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
- {
- if (logger.isDebugEnabled())
- logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
+ winningToken = null;
+ break;
+ }
- winningToken = null;
- break;
- }
+ // expired reservation
+ if (c.isMarkedForDelete())
+ {
+ continue;
+ }
- // expired reservation
- if (c.isMarkedForDelete())
- {
- continue;
- }
+ if (c.timestamp() < minTtl)
+ {
+ minTtl = c.timestamp();
+ winningToken = c.name();
+ }
- if (c.timestamp() < minTtl)
- {
- minTtl = c.timestamp();
- winningToken = c.name();
+ // incase of a tie the token is the tiebreaker
+ if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
+ {
+ winningToken = c.name();
+ }
}
- // incase of a tie the token is the tiebreaker
- if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
+ String winningTokenStr;
+ try
+ {
+ winningTokenStr = winningToken == null ? "" : ByteBufferUtil.string(winningToken);
+ } catch (CharacterCodingException e)
{
- winningToken = c.name();
+ throw new RuntimeException(e);
}
- }
- String winningTokenStr;
- try
- {
- winningTokenStr = winningToken == null ? "" : ByteBufferUtil.string(winningToken);
- } catch (CharacterCodingException e)
- {
- throw new RuntimeException(e);
- }
-
- // we won!
- if (winningTokenStr.equals(myToken))
- {
- // Mark this as permanently taken
- rm = new RowMutation(CassandraUtils.keySpace, key);
+ // we won!
+ if (winningTokenStr.equals(myToken))
+ {
+ // Mark this as permanently taken
+ rm = new RowMutation(CassandraUtils.keySpace, key);
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
- .getBytes("UTF-8"))), off, System.currentTimeMillis());
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
+ .getBytes("UTF-8"))), off, System.currentTimeMillis());
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- // Add to active rsvp list
- allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset, (nextOffset + reserveSlabSize - 1), node.shard,
- myToken));
+ // Add to active rsvp list
+ // Start just ahead of the first offset because on expiration we check if the offset
+ //is on a boundry point of
+ allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset+1, (nextOffset + reserveSlabSize - 1),
+ node.shard, myToken, shards.ttl()));
- // if (logger.isTraceEnabled())
- logger.info("Reserved " + reserveSlabSize + " ids for " + indexName + "(" + myToken + ") shard "
- + node.shard + " from slot " + getRandomSequenceOffset(nextOffset) + " " + nextOffset
- + " TO " + (nextOffset + reserveSlabSize - 1));
+ // if (logger.isTraceEnabled())
+ logger.info("Reserved " + reserveSlabSize + " ids for " + indexName + "(" + myToken
+ + ") shard " + node.shard + " from slot " + getRandomSequenceOffset(nextOffset) + " "
+ + nextOffset + " TO " + (nextOffset + reserveSlabSize - 1));
- break;
- } else
- {
- // Mark this offset as taken.
- usedNodeInfo.nodes.put("" + nextOffset, new AtomicInteger(1));
-
- // we lost, try try again...
- int seqOffset = getRandomSequenceOffset(nextOffset);
- if (seqOffset < (offsetSlots - 1))
- {
- nextOffset = randomSeq[seqOffset + 1];
+ break;
} else
{
- break;
+ // Mark this offset as taken.
+ usedNodeInfo.nodes.put("" + nextOffset, new AtomicInteger(1));
+
+ // we lost, try try again...
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if (seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset + 1];
+ } else
+ {
+ break;
+ }
}
}
}
- }
- // store new reserves
- indexReserves.put(indexName, allNewRsvps);
+ // store new reserves
+ indexReserves.put(indexName, allNewRsvps);
- if (logger.isTraceEnabled())
- logger.trace("Reserved " + allNewRsvps.rsvpList.size() + " shards for " + myToken);
+ if (logger.isTraceEnabled())
+ logger.trace("Reserved " + allNewRsvps.rsvpList.size() + " shards for " + myToken);
- return allNewRsvps.getNextId();
+ return allNewRsvps.getNextId();
+ }
}
private int getRandomSequenceOffset(int offset)
@@ -925,7 +938,7 @@ private int getRandomSequenceOffset(int offset)
if (logger.isDebugEnabled())
logger.debug(shards.indexName + "(" + myToken + "): shard = " + shard.getKey() + ", offset = "
- + offset.get() + ", offsetLookup = " + randomSeqOffset + ", offsetSlots = " + offsetSlots);
+ + offset.get() + ", offsetLookup = " + randomSeqOffset + ", offsetSlots = " + offsetSlots);
if (randomSeqOffset != (offsetSlots - 1))
{
View
6 test/lucandra/cluster/IndexManagerTests.java
@@ -177,7 +177,7 @@ public void testCassandraIncrement() throws IOException
Map<Integer, AtomicInteger> shardStats = new HashMap<Integer, AtomicInteger>();
// Add
- for (int i = 0; i < CassandraIndexManager.maxDocsPerShard * 2; i++)
+ for (int i = 0; i < CassandraIndexManager.maxDocsPerShard; i++)
{
Long id = idx.getNextId(indexName, "i" + i);
@@ -206,7 +206,7 @@ public void testCassandraIncrement() throws IOException
assertEquals(3, CassandraIndexManager.getShardFromDocId(idx.getMaxId(indexName)));
// Update
- for (int i = 0; i < CassandraIndexManager.maxDocsPerShard * 2; i++)
+ for (int i = 0; i < CassandraIndexManager.maxDocsPerShard; i++)
{
Long id = idx.getId(indexName, "i" + i);
@@ -275,7 +275,7 @@ public void testCassandraIncrement2() throws Exception
try
{
System.err.println("waiting for reserve expiration");
- Thread.sleep(120 * 1000);
+ Thread.sleep((120 * 1000)+5000);
}
catch (InterruptedException e)
{
Please sign in to comment.
Something went wrong with that request. Please try again.