Permalink
Browse files

account for ids not yet reflected on disk

  • Loading branch information...
1 parent c9056c1 commit a10ea4081e864392cd280044b07c05d7f3273e99 @tjake committed Jul 24, 2011
@@ -19,16 +19,16 @@ solandra.cache.invalidation.check.interval = 1000
#
#*NOTE* This value should not be changed once documents are indexed
#*NOTE* This value must be a power of 2
-solandra.maximum.docs.per.shard = 1048576
+solandra.maximum.docs.per.shard = 2097152
#The number of index ids to reserve at a time
#*NOTE* this value must be a power of 2
-solandra.index.id.reserve.size = 16384
+solandra.index.id.reserve.size = 65536
#The number of shards to write to at once
#This should roughly equal the number of
#nodes in your cluster
-solandra.shards.at.once = 4
+solandra.shards.at.once = 8
#The number of documents to buffer per index
#before forcing a commit.
@@ -194,11 +194,9 @@ public Long getNextId()
if (nextId <= info.endId)
{
- long id = (long) (maxDocsPerShard * info.shard) + nextId;
- if(id == 1048577)
- logger.error("FOUND ID: "+nextId + " "+info.shard+" "+getToken());
- return id;
- } else
+ return (long) (maxDocsPerShard * info.shard) + nextId;
+ }
+ else
{
rsvpList.set(pos, null);
}
@@ -379,7 +377,7 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
// Load this reserve if there is more to go.
if (offset.get() < (maxDocsPerShard - 1))
{
- int seqOffset = getRandomSequenceOffset(offset.get() + 1);
+ int seqOffset = getRandomSequenceOffset(offset.get() + 100);
int prevSeqOffset = getRandomSequenceOffset(offset.get() - 1);
// Only save if this is not on a slot boundry
@@ -388,9 +386,9 @@ private synchronized ShardInfo getShardInfo(String indexName, boolean force) thr
if (token.equals(getToken()))
{
logger.info("Found reserved shard" + shardStr + "(" + token + "):"
- + (offset.get() + 1) + " TO "
+ + (offset.get() + 100) + " TO "
+ (randomSeq[seqOffset] + reserveSlabSize));
- allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 1, (randomSeq[seqOffset]
+ allNodeRsvps.rsvpList.add(new RsvpInfo(offset.get() + 100, (randomSeq[seqOffset]
+ reserveSlabSize - 1), nodes.shard, token, nextTTL));
}
}
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -56,6 +57,7 @@
private static Random random = new Random(System.currentTimeMillis());
private static Map<String,CommonsHttpSolrServer> streamingClients = new HashMap<String,CommonsHttpSolrServer>();
+
private static Runnable getRunnable() {
try {
@@ -73,7 +75,7 @@ private SolrInputDocument getDocument(){
SolrInputDocument doc = new SolrInputDocument();
doc.addField("text", text);
doc.addField("type", types[random.nextInt(types.length-1)]);
- doc.addField("id", ""+System.nanoTime()+Math.random());
+ doc.addField("id", UUID.randomUUID());
return doc;
}
@@ -106,7 +108,7 @@ public void run() {
if(indexName.equals(""))
fullUrl = urls[myThreadId % urls.length] + ":" + port + "/solr";
else
- fullUrl = urls[myThreadId % urls.length] + ":" + port + "/solandra/"+indexName;
+ fullUrl = urls[myThreadId % urls.length] + ":" + port + "/solandra/" + (type == Type.write ? "~" : "") + indexName;
if(type == Type.write)
solrClient = getStreamingServer(fullUrl);
@@ -225,7 +225,7 @@ public void testCassandraIncrement() throws IOException
//Tests id generations across many nodes and many shards
//waits for reserves to time out
- @Test
+ //@Test
public void testCassandraIncrement2() throws Exception
{
@@ -322,7 +322,7 @@ public void testCassandraIncrement2() throws Exception
}
//Tests the number of shards created
- @Test
+ //@Test
public void testCassandraIncrement4() throws Exception
{
@@ -347,7 +347,7 @@ public Object call()
long startTime = System.currentTimeMillis();
- for (int i = 0; i < CassandraIndexManager.maxDocsPerShard/32; i++)
+ for (int i = 0; i < CassandraIndexManager.maxDocsPerShard; i++)
{
String iname = indexName+rnd.nextInt(3);

0 comments on commit a10ea40

Please sign in to comment.