Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Deadlock fix

thread 1 gets lock on indexName, from getNextId, for instance
thread 2 calls synch'd getShardInfo method
thread 1 calls getShardInfo inside of indexName lock, blocks waiting
for thread 2 to finish
thread 2 hits the indexName lock inside of getShardInfo and deadlocks

traced all paths and made some lock changes:
- nextReservedId is only called from inside indexName lock, so no need
to lock it in that method
- almost all calls to getShardInfo were inside of an indexName lock, so
took out the indexName lock in that method
- added an indexName lock around the getShardInfo call inside of
getMaxId, for safety's sake
  • Loading branch information...
commit b533b818f3bcde9cc93ccbdb0e3572af96a998f3 1 parent 7b18f06
Steven Scott authored
Showing with 257 additions and 263 deletions.
  1. +257 −263 src/lucandra/cluster/CassandraIndexManager.java
View
520 src/lucandra/cluster/CassandraIndexManager.java
@@ -304,7 +304,6 @@ private long getNewTTL()
private synchronized ShardInfo getShardInfo(String indexName, boolean force) throws IOException
{
-
ShardInfo shards = indexShards.get(indexName);
ShardInfo currentShards = shards;
@@ -319,128 +318,124 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
}
}
- synchronized (indexName.intern())
- {
+ ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"), CassandraUtils.delimeterBytes,
+ "shards".getBytes("UTF-8"));
- ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"), CassandraUtils.delimeterBytes,
- "shards".getBytes("UTF-8"));
+ ReadCommand cmd = new SliceFromReadCommand(CassandraUtils.keySpace, key, new ColumnParent(
+ CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
- ReadCommand cmd = new SliceFromReadCommand(CassandraUtils.keySpace, key, new ColumnParent(
- CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
+ List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);
- List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);
+ shards = new ShardInfo(indexName);
+ AllNodeRsvps allNodeRsvps = new AllNodeRsvps();
- shards = new ShardInfo(indexName);
- AllNodeRsvps allNodeRsvps = new AllNodeRsvps();
+ long nextTTL = shards.ttl();
- long nextTTL = shards.ttl();
+ if (rows != null && !rows.isEmpty())
+ {
+ assert rows.size() == 1;
- if (rows != null && !rows.isEmpty())
+ Row row = rows.get(0);
+
+ if (row.cf != null && !row.cf.isMarkedForDelete())
{
- assert rows.size() == 1;
- Row row = rows.get(0);
+ assert row.cf.getSortedColumns() != null;
- if (row.cf != null && !row.cf.isMarkedForDelete())
+ // Each column represents each shard and latest id for each
+ // node
+ // {"shard1" : {"node1" : 1234}}
+ for (IColumn c : row.cf.getSortedColumns())
{
-
- assert row.cf.getSortedColumns() != null;
-
- // Each column represents each shard and latest id for each
- // node
- // {"shard1" : {"node1" : 1234}}
- for (IColumn c : row.cf.getSortedColumns())
+ String shardStr = ByteBufferUtil.string(c.name());
+ Integer shardNum = null;
+
+ try
{
- String shardStr = ByteBufferUtil.string(c.name());
- Integer shardNum = null;
+ shardNum = Integer.valueOf(shardStr);
+ }
+ catch(NumberFormatException e)
+ {
+ logger.warn("invalid shard name encountered: "+shardStr+" "+c.getSubColumns().size());
+ continue;
+ }
- try
- {
- shardNum = Integer.valueOf(shardStr);
- }
- catch(NumberFormatException e)
- {
- logger.warn("invalid shard name encountered: "+shardStr+" "+c.getSubColumns().size());
- continue;
- }
-
- assert c instanceof SuperColumn;
+ assert c instanceof SuperColumn;
- NodeInfo nodes = new NodeInfo(shardNum);
+ NodeInfo nodes = new NodeInfo(shardNum);
- for (IColumn subCol : c.getSubColumns())
- {
+ for (IColumn subCol : c.getSubColumns())
+ {
- String token = ByteBufferUtil.string(subCol.name());
+ String token = ByteBufferUtil.string(subCol.name());
- AtomicInteger offset = new AtomicInteger(Integer.valueOf(ByteBufferUtil.string(subCol
- .value())));
- int startSeqOffset = getRandomSequenceOffset(offset.get());
+ AtomicInteger offset = new AtomicInteger(Integer.valueOf(ByteBufferUtil.string(subCol
+ .value())));
+ int startSeqOffset = getRandomSequenceOffset(offset.get());
- // Leave a mark at each shard so we track the
- // offsets
- // hit.
- nodes.nodes.put(token, offset);
- shards.shards.put(shardNum, nodes);
+ // Leave a mark at each shard so we track the
+ // offsets
+ // hit.
+ nodes.nodes.put(token, offset);
+ shards.shards.put(shardNum, nodes);
- // Load this reserve if there is at least 100 more to go.
- if ((offset.get()+100) < (maxDocsPerShard - 1))
- {
- int seqOffset = getRandomSequenceOffset(offset.get() + 100);
- int prevSeqOffset = getRandomSequenceOffset(offset.get() - 1);
+ // Load this reserve if there is at least 100 more to go.
+ if ((offset.get()+100) < (maxDocsPerShard - 1))
+ {
+ int seqOffset = getRandomSequenceOffset(offset.get() + 100);
+ int prevSeqOffset = getRandomSequenceOffset(offset.get() - 1);
- // Only save if this is not on a slot boundry
- if (startSeqOffset == seqOffset && prevSeqOffset == seqOffset)
+ // Only save if this is not on a slot boundry
+ if (startSeqOffset == seqOffset && prevSeqOffset == seqOffset)
+ {
+ if (token.equals(getToken()))
{
- if (token.equals(getToken()))
- {
- logger.info("Found reserved shard" + shardStr + "(" + token + "):"
- + (offset.get() + 100) + " TO "
- + (randomSeq[seqOffset] + reserveSlabSize));
- allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 100 , (randomSeq[seqOffset]
- + reserveSlabSize - 1), nodes.shard, token, nextTTL));
- }
+ logger.info("Found reserved shard" + shardStr + "(" + token + "):"
+ + (offset.get() + 100) + " TO "
+ + (randomSeq[seqOffset] + reserveSlabSize));
+ allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 100 , (randomSeq[seqOffset]
+ + reserveSlabSize - 1), nodes.shard, token, nextTTL));
}
}
}
}
}
- } else
- {
- logger.info("No shard info found for :" + indexName);
}
+ } else
+ {
+ logger.info("No shard info found for :" + indexName);
+ }
+
+ if (currentShards == null)
+ {
+ currentShards = indexShards.put(indexName, shards);
if (currentShards == null)
{
- currentShards = indexShards.put(indexName, shards);
+ indexReserves.put(indexName, allNodeRsvps);
- if (currentShards == null)
- {
- indexReserves.put(indexName, allNodeRsvps);
-
- return shards;
- }
- } else
+ return shards;
+ }
+ } else
+ {
+ // Merge together active and new shards
+ for (Map.Entry<Integer, NodeInfo> entry : shards.shards.entrySet())
{
- // Merge together active and new shards
- for (Map.Entry<Integer, NodeInfo> entry : shards.shards.entrySet())
- {
- Integer shard = entry.getKey();
-
- if (currentShards.shards.get(shard) == null)
- currentShards.shards.put(shard, entry.getValue());
- }
+ Integer shard = entry.getKey();
- currentShards.renew(nextTTL);
+ if (currentShards.shards.get(shard) == null)
+ currentShards.shards.put(shard, entry.getValue());
}
-
- indexReserves.put(indexName, allNodeRsvps);
-
-
- return currentShards;
+ currentShards.renew(nextTTL);
}
+
+
+ indexReserves.put(indexName, allNodeRsvps);
+
+
+ return currentShards;
}
// TODO
@@ -452,7 +447,11 @@ public void deleteId(String indexName, long id)
public long getMaxId(String indexName) throws IOException
{
// find the max shard
- ShardInfo shards = getShardInfo(indexName, false);
+ ShardInfo shards;
+ synchronized(indexName.intern())
+ {
+ shards = getShardInfo(indexName, false);
+ }
if (shards.shards.isEmpty())
return 0;
@@ -649,245 +648,240 @@ public void resetCounter(String indexName) throws IOException
private Long nextReservedId(String indexName, ShardInfo shards, NodeInfo[] nodes, String myToken)
throws IOException
{
+ if (logger.isDebugEnabled())
+ logger.debug("in reserveIds for index " + indexName);
- synchronized (indexName.intern())
+ AllNodeRsvps currentRsvpd = indexReserves.get(indexName);
+
+ if (currentRsvpd != null)
{
+ Long nextId = currentRsvpd.getNextId();
- if (logger.isDebugEnabled())
- logger.debug("in reserveIds for index " + indexName);
+ if (nextId != null)
+ return nextId;
- AllNodeRsvps currentRsvpd = indexReserves.get(indexName);
+ //if (logger.isDebugEnabled())
+ logger.info("need more ids for " +indexName+" "+myToken);
+ }
- if (currentRsvpd != null)
- {
- Long nextId = currentRsvpd.getNextId();
+ AllNodeRsvps allNewRsvps = new AllNodeRsvps();
+ ShardInfo usedShardInfo = indexUsed.get(indexName);
+ if (usedShardInfo == null)
+ {
+ usedShardInfo = new ShardInfo(indexName);
+ indexUsed.put(indexName, usedShardInfo);
+ }
- if (nextId != null)
- return nextId;
+ // Pick a new shard
+ for (NodeInfo node : nodes)
+ {
+ AtomicInteger offset = node.nodes.get(myToken);
- //if (logger.isDebugEnabled())
- logger.info("need more ids for " +indexName+" "+myToken);
- }
+ assert offset != null;
+
+ int startingOffset = offset.get();
+ int nextOffset = startingOffset;
+
+ // goto next offset marker (unless its the first or last)
+ int randomSequenceOffset = getRandomSequenceOffset(startingOffset);
- AllNodeRsvps allNewRsvps = new AllNodeRsvps();
- ShardInfo usedShardInfo = indexUsed.get(indexName);
- if (usedShardInfo == null)
+ NodeInfo usedNodeInfo = usedShardInfo.shards.get(node.shard);
+ if (usedNodeInfo == null)
{
- usedShardInfo = new ShardInfo(indexName);
- indexUsed.put(indexName, usedShardInfo);
+ usedNodeInfo = new NodeInfo(node.shard);
+ usedShardInfo.shards.put(node.shard, usedNodeInfo);
}
- // Pick a new shard
- for (NodeInfo node : nodes)
+ if (startingOffset != randomSeq[0])
{
- AtomicInteger offset = node.nodes.get(myToken);
-
- assert offset != null;
-
- int startingOffset = offset.get();
- int nextOffset = startingOffset;
-
- // goto next offset marker (unless its the first or last)
- int randomSequenceOffset = getRandomSequenceOffset(startingOffset);
-
- NodeInfo usedNodeInfo = usedShardInfo.shards.get(node.shard);
- if (usedNodeInfo == null)
+ if (randomSequenceOffset != (offsetSlots - 1))
{
- usedNodeInfo = new NodeInfo(node.shard);
- usedShardInfo.shards.put(node.shard, usedNodeInfo);
- }
-
- if (startingOffset != randomSeq[0])
+ nextOffset = randomSeq[randomSequenceOffset + 1];
+ } else
{
- if (randomSequenceOffset != (offsetSlots - 1))
- {
- nextOffset = randomSeq[randomSequenceOffset + 1];
- } else
- {
- continue;
- }
+ continue;
}
+ }
+
+ if (logger.isTraceEnabled())
+ logger.trace(myToken + " startingOffset = " + startingOffset + ", nextOffset = " + nextOffset);
- if (logger.isTraceEnabled())
- logger.trace(myToken + " startingOffset = " + startingOffset + ", nextOffset = " + nextOffset);
+ while (true)
+ {
- while (true)
+ // Avoid re-checking used slabs
+ if (usedNodeInfo != null)
{
-
- // Avoid re-checking used slabs
- if (usedNodeInfo != null)
+ if (usedNodeInfo.nodes.get("" + nextOffset) != null)
{
- if (usedNodeInfo.nodes.get("" + nextOffset) != null)
- {
- updateNodeOffset(indexName, myToken, node.shard, nextOffset);
+ updateNodeOffset(indexName, myToken, node.shard, nextOffset);
- // try next offset
- int seqOffset = getRandomSequenceOffset(nextOffset);
- if (seqOffset < (offsetSlots - 1))
- {
- nextOffset = randomSeq[seqOffset + 1];
- continue;
- } else
- {
- break;
- }
+ // try next offset
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if (seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset + 1];
+ continue;
+ } else
+ {
+ break;
}
}
+ }
- ByteBuffer key = CassandraUtils.hashKeyBytes((indexName + "~" + node.shard).getBytes("UTF-8"),
- CassandraUtils.delimeterBytes, "rsvp".getBytes("UTF-8"));
+ ByteBuffer key = CassandraUtils.hashKeyBytes((indexName + "~" + node.shard).getBytes("UTF-8"),
+ CassandraUtils.delimeterBytes, "rsvp".getBytes("UTF-8"));
- // Write the reserves
- RowMutation rm = new RowMutation(CassandraUtils.keySpace, key);
+ // Write the reserves
+ RowMutation rm = new RowMutation(CassandraUtils.keySpace, key);
- ByteBuffer id = ByteBufferUtil.bytes(String.valueOf(nextOffset));
- ByteBuffer off = id;
+ ByteBuffer id = ByteBufferUtil.bytes(String.valueOf(nextOffset));
+ ByteBuffer off = id;
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
- .getBytes("UTF-8"))), off, System.currentTimeMillis(), expirationTime);
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
+ .getBytes("UTF-8"))), off, System.currentTimeMillis(), expirationTime);
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- // Give it time to sink in, in-case clocks are off across
- // nodes...
+ // Give it time to sink in, in-case clocks are off across
+ // nodes...
+ try
+ {
+ Thread.sleep(100);
+ } catch (InterruptedException e1)
+ {
+ }
+
+ // Read the columns back
+ IColumn supercol = null;
+ int attempts = 0;
+ while (supercol == null && attempts < CassandraUtils.retryAttempts)
+ {
try
{
- Thread.sleep(100);
- } catch (InterruptedException e1)
- {
- }
+ List<Row> rows = CassandraUtils.robustRead(key, new QueryPath(
+ CassandraUtils.schemaInfoColumnFamily), Arrays.asList(id), ConsistencyLevel.QUORUM);
- // Read the columns back
- IColumn supercol = null;
- int attempts = 0;
- while (supercol == null && attempts < CassandraUtils.retryAttempts)
- {
- try
+ if (rows == null || rows.size() == 0)
{
- List<Row> rows = CassandraUtils.robustRead(key, new QueryPath(
- CassandraUtils.schemaInfoColumnFamily), Arrays.asList(id), ConsistencyLevel.QUORUM);
+ continue;
+ }
- if (rows == null || rows.size() == 0)
+ if (rows.size() == 1)
+ {
+ Row row = rows.get(0);
+
+ if (row.cf == null || row.cf.isMarkedForDelete())
{
continue;
}
- if (rows.size() == 1)
- {
- Row row = rows.get(0);
-
- if (row.cf == null || row.cf.isMarkedForDelete())
- {
- continue;
- }
-
- supercol = rows.get(0).cf.getColumn(id);
- }
- } catch (IOException e)
- {
- // let's try again...
+ supercol = rows.get(0).cf.getColumn(id);
}
-
- attempts++;
+ } catch (IOException e)
+ {
+ // let's try again...
}
- if (supercol == null)
- throw new IllegalStateException("just wrote " + offset + ", but didn't read it");
-
- long minTtl = Long.MAX_VALUE;
- ByteBuffer winningToken = null;
-
- // See which ones we successfully reserved
- for (IColumn c : supercol.getSubColumns())
- {
+ attempts++;
+ }
- // someone already took this id
- if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
- {
- if (logger.isDebugEnabled())
- logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
+ if (supercol == null)
+ throw new IllegalStateException("just wrote " + offset + ", but didn't read it");
- winningToken = null;
- break;
- }
+ long minTtl = Long.MAX_VALUE;
+ ByteBuffer winningToken = null;
- // expired reservation
- if (c.isMarkedForDelete())
- {
- continue;
- }
+ // See which ones we successfully reserved
+ for (IColumn c : supercol.getSubColumns())
+ {
- if (c.timestamp() < minTtl)
- {
- minTtl = c.timestamp();
- winningToken = c.name();
- }
+ // someone already took this id
+ if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
- // incase of a tie the token is the tiebreaker
- if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
- {
- winningToken = c.name();
- }
+ winningToken = null;
+ break;
}
- String winningTokenStr;
- try
+ // expired reservation
+ if (c.isMarkedForDelete())
{
- winningTokenStr = winningToken == null ? "" : ByteBufferUtil.string(winningToken);
- } catch (CharacterCodingException e)
+ continue;
+ }
+
+ if (c.timestamp() < minTtl)
{
- throw new RuntimeException(e);
+ minTtl = c.timestamp();
+ winningToken = c.name();
}
- // we won!
- if (winningTokenStr.equals(myToken))
+ // incase of a tie the token is the tiebreaker
+ if (c.timestamp() == minTtl && winningToken.compareTo(c.name()) <= 0)
{
- // Mark this as permanently taken
- rm = new RowMutation(CassandraUtils.keySpace, key);
+ winningToken = c.name();
+ }
+ }
- rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
- .getBytes("UTF-8"))), off, System.currentTimeMillis());
+ String winningTokenStr;
+ try
+ {
+ winningTokenStr = winningToken == null ? "" : ByteBufferUtil.string(winningToken);
+ } catch (CharacterCodingException e)
+ {
+ throw new RuntimeException(e);
+ }
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
+ // we won!
+ if (winningTokenStr.equals(myToken))
+ {
+ // Mark this as permanently taken
+ rm = new RowMutation(CassandraUtils.keySpace, key);
- // Add to active rsvp list
- // Start just ahead of the first offset because on expiration we check if the offset
- //is on a boundry point of
- allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset+1, (nextOffset + reserveSlabSize - 1),
- node.shard, myToken, shards.ttl()));
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, id, ByteBuffer.wrap(myToken
+ .getBytes("UTF-8"))), off, System.currentTimeMillis());
- // if (logger.isTraceEnabled())
- logger.info("Reserved " + reserveSlabSize + " ids for " + indexName + "(" + myToken
- + ") shard " + node.shard + " from slot " + getRandomSequenceOffset(nextOffset) + " "
- + nextOffset + " TO " + (nextOffset + reserveSlabSize - 1));
+ CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
- break;
+ // Add to active rsvp list
+ // Start just ahead of the first offset because on expiration we check if the offset
+ //is on a boundry point of
+ allNewRsvps.rsvpList.add(new RsvpInfo(nextOffset+1, (nextOffset + reserveSlabSize - 1),
+ node.shard, myToken, shards.ttl()));
+
+ // if (logger.isTraceEnabled())
+ logger.info("Reserved " + reserveSlabSize + " ids for " + indexName + "(" + myToken
+ + ") shard " + node.shard + " from slot " + getRandomSequenceOffset(nextOffset) + " "
+ + nextOffset + " TO " + (nextOffset + reserveSlabSize - 1));
+
+ break;
+ } else
+ {
+ // Mark this offset as taken.
+ usedNodeInfo.nodes.put("" + nextOffset, new AtomicInteger(1));
+
+ // we lost, try try again...
+ int seqOffset = getRandomSequenceOffset(nextOffset);
+ if (seqOffset < (offsetSlots - 1))
+ {
+ nextOffset = randomSeq[seqOffset + 1];
} else
{
- // Mark this offset as taken.
- usedNodeInfo.nodes.put("" + nextOffset, new AtomicInteger(1));
-
- // we lost, try try again...
- int seqOffset = getRandomSequenceOffset(nextOffset);
- if (seqOffset < (offsetSlots - 1))
- {
- nextOffset = randomSeq[seqOffset + 1];
- } else
- {
- break;
- }
+ break;
}
}
}
+ }
- // store new reserves
- indexReserves.put(indexName, allNewRsvps);
+ // store new reserves
+ indexReserves.put(indexName, allNewRsvps);
- if (logger.isTraceEnabled())
- logger.trace("Reserved " + allNewRsvps.rsvpList.size() + " shards for " + myToken);
+ if (logger.isTraceEnabled())
+ logger.trace("Reserved " + allNewRsvps.rsvpList.size() + " shards for " + myToken);
- return allNewRsvps.getNextId();
- }
+ return allNewRsvps.getNextId();
}
private int getRandomSequenceOffset(int offset)
Please sign in to comment.
Something went wrong with that request. Please try again.