This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

fix bug in *:* documents are deleted then re-added

  • Loading branch information...
tjake committed Jul 21, 2011
1 parent cb0fcbe commit d1e0ff1df1161c5f067beff980043e615170e6e8
@@ -508,9 +508,8 @@ public void updateDocument(String indexName, Term updateTerm, Document doc, Anal
boolean autoCommit) throws CorruptIndexException, IOException
{
- deleteDocuments(indexName, updateTerm, autoCommit);
- addDocument(indexName, doc, analyzer, docNumber, autoCommit, null);
-
+ deleteDocuments(indexName, updateTerm, false);
+ addDocument(indexName, doc, analyzer, docNumber, autoCommit, null);
}
public int docCount()
@@ -592,6 +591,14 @@ private void appendMutations(String indexName, Map<ByteBuffer, RowMutation> muta
mutationQ.right.addAll(mutations.values());
}
+ // append complete mutations to the list
+ public void appendMutations(String indexName, RowMutation... mutations)
+ {
+ Pair<AtomicInteger, ConcurrentLinkedQueue<RowMutation>> mutationQ = getMutationQueue(indexName);
+
+ mutationQ.right.addAll(Arrays.asList(mutations));
+ }
+
private Pair<AtomicInteger, ConcurrentLinkedQueue<RowMutation>> getMutationQueue(String indexName)
{
@@ -25,17 +25,29 @@
import java.nio.charset.CharacterCodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
-import java.util.*;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Random;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lucandra.CassandraUtils;
-
-import com.google.common.collect.MapMaker;
-
-import org.apache.cassandra.db.*;
+import lucandra.Pair;
+
+import org.apache.cassandra.db.DeletedColumn;
+import org.apache.cassandra.db.ExpiringColumn;
+import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.SliceFromReadCommand;
+import org.apache.cassandra.db.SuperColumn;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.ColumnParent;
@@ -461,6 +473,25 @@ public Long getId(String indexName, String key) throws IOException
{
return checkForUpdate(indexName, key);
}
+
+
+ public RowMutation getIdMutation(String indexName, String key, Long id) throws IOException
+ {
+
+ int shard = getShardFromDocId(id);
+ ByteBuffer idCol = ByteBufferUtil.bytes(String.valueOf(getShardedDocId(id)));
+ ByteBuffer keyCol = ByteBuffer.wrap(key.getBytes("UTF-8"));
+
+ // Permanently mark the id as taken
+ ByteBuffer idKey = CassandraUtils.hashKeyBytes((indexName + "~" + shard).getBytes("UTF-8"),
+ CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
+
+ RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
+ rm.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, idCol, ByteBuffer.wrap(getToken().getBytes("UTF-8"))), keyCol, System.currentTimeMillis());
+
+
+ return rm;
+ }
public Long checkForUpdate(String indexName, String key) throws IOException
{
@@ -23,20 +23,23 @@
import java.net.InetAddress;
import java.net.URL;
import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import lucandra.CassandraUtils;
+import lucandra.Pair;
import lucandra.cluster.CassandraIndexManager;
import lucandra.cluster.IndexManagerService;
-import com.google.common.collect.MapMaker;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
-
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
@@ -46,9 +49,11 @@
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.log4j.Logger;
import org.apache.lucene.index.Term;
-import org.apache.lucene.search.*;
+import org.apache.lucene.search.Query;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
import org.apache.solr.common.SolrException;
@@ -59,7 +64,14 @@
import org.apache.solr.core.SolrCore;
import org.apache.solr.schema.SchemaField;
import org.apache.solr.search.QueryParsing;
-import org.apache.solr.update.*;
+import org.apache.solr.update.AddUpdateCommand;
+import org.apache.solr.update.CommitUpdateCommand;
+import org.apache.solr.update.DeleteUpdateCommand;
+import org.apache.solr.update.MergeIndexesCommand;
+import org.apache.solr.update.RollbackUpdateCommand;
+import org.apache.solr.update.UpdateHandler;
+
+import com.google.common.collect.MapMaker;
public class SolandraIndexWriter extends UpdateHandler
{
@@ -225,16 +237,18 @@ public int addDoc(AddUpdateCommand cmd) throws IOException
Long docId = null;
RowMutation[] rms = null;
-
+
if (!coreInfo.bulk && !cmd.allowDups)
docId = IndexManagerService.instance.getId(coreInfo.indexName, key);
boolean isUpdate = false;
if (docId != null)
{
isUpdate = true;
+
if(logger.isDebugEnabled())
logger.debug("update for document " + docId);
+
}
else
{
@@ -259,12 +273,17 @@ public int addDoc(AddUpdateCommand cmd) throws IOException
Term idTerm = this.idTerm.createTerm(cmd.indexedId);
if (isUpdate)
+ {
writer.updateDocument(indexName, idTerm, cmd.getLuceneDocument(schema), schema.getAnalyzer(),
shardedId, false);
+
+ writer.appendMutations(indexName, IndexManagerService.instance.getIdMutation(coreInfo.indexName, key, docId));
+ }
else
+ {
writer.addDocument(indexName, cmd.getLuceneDocument(schema), schema.getAnalyzer(), shardedId, false,
rms);
-
+ }
rc = 1;
// Notify readers
@@ -0,0 +1,94 @@
+package solandra;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.impl.CommonsHttpSolrServer;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class DeleteTests extends SolandraTestRunner
+{
+ static String indexName = String.valueOf(System.nanoTime());
+
+ // Set test schema
+ static String schemaXml = "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n"
+ + "<schema name=\"wikipedia\" version=\"1.1\">\n"
+ + "<types>\n"
+ + "<fieldType name=\"tint\" class=\"solr.TrieIntField\" precisionStep=\"8\" omitNorms=\"true\" positionIncrementGap=\"0\"/>\n"
+ + "<fieldType name=\"text\" class=\"solr.TextField\">\n"
+ + "<analyzer><tokenizer class=\"solr.StandardTokenizerFactory\"/></analyzer>\n"
+ + "</fieldType>\n"
+ + "<fieldType name=\"string\" class=\"solr.StrField\"/>\n"
+ + "<fieldType name=\"sint\" class=\"solr.SortableIntField\" omitNorms=\"true\"/>\n"
+ + "</types>\n"
+ + "<fields>\n"
+ + "<field name=\"url\" type=\"string\" indexed=\"true\" stored=\"true\"/>\n"
+ + "<field name=\"text\" type=\"text\" indexed=\"true\" stored=\"true\" termVectors=\"true\" termPositions=\"true\" termOffsets=\"true\"/>\n"
+ + "<field name=\"title\" type=\"text\" indexed=\"true\" stored=\"true\"/>\n"
+ + "<field name=\"price\" type=\"tint\" indexed=\"true\" stored=\"true\"/>\n"
+ + "<dynamicField name=\"*_i\" stored=\"false\" type=\"sint\" multiValued=\"false\" indexed=\"true\"/>"
+ + "</fields>\n" + "<uniqueKey>url</uniqueKey>\n" + "<defaultSearchField>title</defaultSearchField>\n"
+ + "</schema>\n";
+
+
+ @BeforeClass
+ public static void init() throws Exception
+ {
+ addSchema(indexName, schemaXml);
+ getSolrClient(indexName);
+ }
+
+ @Test
+ public void testMatchAll() throws Exception
+ {
+ CommonsHttpSolrServer client = getSolrClient(indexName);
+
+ List<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
+ for(int i=0; i<5; i++)
+ {
+ docs.add(createDoc(false));
+ }
+
+ //Add
+ client.add(docs);
+ client.commit(true,true);
+
+ //query
+ SolrQuery q = new SolrQuery().setQuery("*:*");
+ QueryResponse r = client.query(q);
+ assertEquals(5, r.getResults().getNumFound());
+
+ //delete
+ client.deleteByQuery("title:foo");
+ client.commit(true,true);
+
+ //Add
+ client.add(docs);
+ client.commit(true,true);
+
+
+ //query
+ r = client.query(q);
+ assertEquals(5, r.getResults().getNumFound());
+ }
+
+
+ private SolrInputDocument createDoc(boolean extra)
+ {
+ SolrInputDocument doc = new SolrInputDocument();
+
+ doc.addField("url", "" + System.nanoTime());
+ doc.addField("title", "foo");
+
+ if(extra)
+ doc.addField("text", "bar");
+
+ return doc;
+ }
+}

0 comments on commit d1e0ff1

Please sign in to comment.