Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

use collector to delete by query

  • Loading branch information...
commit fcfabe1de4c1ddd924d697c2775984a63adabc38 1 parent 951a95c
@tjake authored
View
4 scripts/post.sh
@@ -17,9 +17,9 @@
NAME=$1
FILE=$2
-URL=http://localhost:8983/solandra/schema/$NAME
+URL=http://localhost:8983/solandra/$NAME
-curl $URL --data-binary @$FILE -H 'Content-type:text/xml; charset=utf-8'
+curl $URL --data-binary @$FILE -H 'Content-type:text/xml; charset=utf-8' > /dev/null
echo "posted $FILE to $URL"
View
208 src/lucandra/IndexWriter.java
@@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import lucandra.cluster.CassandraIndexManager;
import lucandra.serializers.thrift.DocumentMetadata;
@@ -64,8 +65,7 @@
.makeMap();
private Similarity similarity = Similarity
.getDefault();
- private static final Logger logger = Logger
- .getLogger(IndexWriter.class);
+ private static final Logger logger = Logger.getLogger(IndexWriter.class);
private static TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
public IndexWriter()
@@ -81,8 +81,8 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
Map<ByteBuffer, RowMutation> workingMutations = new HashMap<ByteBuffer, RowMutation>();
byte[] indexNameBytes = indexName.getBytes("UTF-8");
- ByteBuffer indexTermsKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, "terms"
- .getBytes("UTF-8"));
+ ByteBuffer indexTermsKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes,
+ "terms".getBytes("UTF-8"));
DocumentMetadata allIndexedTerms = new DocumentMetadata();
Map<String, DocumentMetadata> fieldCache = new HashMap<String, DocumentMetadata>(1024);
@@ -98,8 +98,7 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
{
ThriftTerm firstTerm = null;
-
-
+
// Indexed field
if (field.isIndexed() && field.isTokenized())
{
@@ -108,17 +107,16 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
if (tokens == null)
{
Reader tokReader = field.readerValue();
-
- if (tokReader == null)
+
+ if (tokReader == null)
tokReader = new StringReader(field.stringValue());
-
+
tokens = analyzer.reusableTokenStream(field.name(), tokReader);
}
-
// collect term information per field
- Map<Term, Map<ByteBuffer, List<Number>>> allTermInformation = new HashMap<Term, Map<ByteBuffer, List<Number>>>();
-
+ Map<Term, Map<ByteBuffer, List<Number>>> allTermInformation = new HashMap<Term, Map<ByteBuffer, List<Number>>>();
+
int lastOffset = 0;
if (position > 0)
{
@@ -139,12 +137,12 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
// positions
PositionIncrementAttribute posIncrAttribute = null;
if (field.isStorePositionWithTermVector())
- posIncrAttribute = (PositionIncrementAttribute) tokens.addAttribute(PositionIncrementAttribute.class);
+ posIncrAttribute = (PositionIncrementAttribute) tokens
+ .addAttribute(PositionIncrementAttribute.class);
- //term as string
+ // term as string
CharTermAttribute termAttribute = (CharTermAttribute) tokens.addAttribute(CharTermAttribute.class);
-
-
+
// store normalizations of field per term per document rather
// than per field.
// this adds more to write but less to read on other side
@@ -154,12 +152,13 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
{
tokensInField++;
Term term = new Term(field.name(), termAttribute.toString());
-
- ThriftTerm tterm = new ThriftTerm(term.field()).setText(ByteBuffer.wrap(term.text().getBytes("UTF-8"))).setIs_binary(false);
-
- if(firstTerm == null)
+
+ ThriftTerm tterm = new ThriftTerm(term.field()).setText(
+ ByteBuffer.wrap(term.text().getBytes("UTF-8"))).setIs_binary(false);
+
+ if (firstTerm == null)
firstTerm = tterm;
-
+
allIndexedTerms.addToTerms(tterm);
// fetch all collected information for this term
@@ -223,12 +222,12 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
if (!field.getOmitNorms())
{
bnorm = new ArrayList<Number>();
-
+
final FieldInvertState invertState = new FieldInvertState();
invertState.setBoost(doc.getBoost() * field.getBoost());
invertState.setLength(tokensInField);
final float norm = similarity.computeNorm(field.name(), invertState);
-
+
bnorm.add(Similarity.getDefault().encodeNormValue(norm));
}
@@ -253,19 +252,21 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
new LucandraTermInfo(docNumber, term.getValue()).serialize());
// Store all terms under a row
- CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily, CassandraUtils
- .createColumnName(term.getKey()), indexTermsKey, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily,
+ CassandraUtils.createColumnName(term.getKey()), indexTermsKey,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER);
}
}
// Untokenized fields go in without a termPosition
if (field.isIndexed() && !field.isTokenized())
{
- ThriftTerm tterm = new ThriftTerm(field.name()).setText(ByteBuffer.wrap(field.stringValue().getBytes("UTF-8"))).setIs_binary(false);
-
- if(firstTerm == null)
+ ThriftTerm tterm = new ThriftTerm(field.name()).setText(
+ ByteBuffer.wrap(field.stringValue().getBytes("UTF-8"))).setIs_binary(false);
+
+ if (firstTerm == null)
firstTerm = tterm;
-
+
allIndexedTerms.addToTerms(tterm);
ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"),
@@ -280,25 +281,24 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
new LucandraTermInfo(docNumber, termMap).serialize());
// Store all terms under a row
- CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily, CassandraUtils
- .createColumnName(field), indexTermsKey, ByteBufferUtil.EMPTY_BYTE_BUFFER);
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily,
+ CassandraUtils.createColumnName(field), indexTermsKey, ByteBufferUtil.EMPTY_BYTE_BUFFER);
}
// Stores each field as a column under this doc key
if (field.isStored())
- {
+ {
ThriftTerm tt = new ThriftTerm(field.name());
-
+
if (field instanceof NumericField)
{
Number n = ((NumericField) field).getNumericValue();
tt.setLongVal(n.longValue());
}
-
- byte[] value = field.isBinary() ? field.getBinaryValue() : field.stringValue().getBytes("UTF-8");
+
+ byte[] value = field.isBinary() ? field.getBinaryValue() : field.stringValue().getBytes("UTF-8");
tt.setText(ByteBuffer.wrap(value)).setIs_binary(field.isBinary());
-
-
+
// logic to handle multiple fields w/ same name
DocumentMetadata currentValue = fieldCache.get(field.name());
if (currentValue == null)
@@ -306,29 +306,31 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
currentValue = new DocumentMetadata();
fieldCache.put(field.name(), currentValue);
}
-
+
currentValue.addToTerms(tt);
}
-
- //Store for field cache
- if(firstTerm != null)
- {
- ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, firstTerm.field.getBytes());
- CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily, CassandraUtils.writeVInt(docNumber), fieldCacheKey, firstTerm.text);
-
- if(logger.isDebugEnabled())
- logger.debug(indexName+" - firstTerm: "+ByteBufferUtil.string(fieldCacheKey));
- }
+
+ // Store for field cache
+ if (firstTerm != null)
+ {
+ ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes,
+ firstTerm.field.getBytes());
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily,
+ CassandraUtils.writeVInt(docNumber), fieldCacheKey, firstTerm.text);
+
+ if (logger.isDebugEnabled())
+ logger.debug(indexName + " - firstTerm: " + ByteBufferUtil.string(fieldCacheKey));
+ }
}
- ByteBuffer key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes,
- Integer.toHexString(docNumber).getBytes("UTF-8"));
+ ByteBuffer key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, Integer
+ .toHexString(docNumber).getBytes("UTF-8"));
// Store each field as a column under this docId
for (Map.Entry<String, DocumentMetadata> field : fieldCache.entrySet())
{
- CassandraUtils.addMutations(workingMutations, CassandraUtils.docColumnFamily, field.getKey().getBytes(
- "UTF-8"), key, toBytesUsingThrift(field.getValue()));
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.docColumnFamily,
+ field.getKey().getBytes("UTF-8"), key, toBytesUsingThrift(field.getValue()));
}
// Finally, Store meta-data so we can delete this document
@@ -353,45 +355,63 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d
commit(indexName, false);
}
- public long deleteDocuments(String indexName, Query query, boolean autoCommit) throws CorruptIndexException,
- IOException
+ public long deleteDocuments(final String indexName, Query query, final boolean autoCommit)
+ throws CorruptIndexException, IOException
{
- IndexReader reader = new IndexReader(indexName);
+ IndexReader reader = new IndexReader(indexName).reopen();
IndexSearcher searcher = new IndexSearcher(reader);
+ final AtomicLong numRemoved = new AtomicLong(0);
-
// Also delete the id lookup
- ByteBuffer idKey = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"),
- CassandraUtils.delimeterBytes, "ids".getBytes("UTF-8"));
+ final ByteBuffer idKey = CassandraUtils.hashKeyBytes(indexName.getBytes("UTF-8"), CassandraUtils.delimeterBytes,
+ "ids".getBytes("UTF-8"));
+
+ final Map<ByteBuffer, RowMutation> workingMutations = new HashMap<ByteBuffer, RowMutation>();
+ final RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
+ workingMutations.put(idKey, rm);
+
+ Collector collector = new Collector() {
- RowMutation rm = new RowMutation(CassandraUtils.keySpace, idKey);
+ @Override
+ public void setScorer(Scorer scorer) throws IOException
+ {
-
- TopDocs results = null;
- long total = 0;
- do
- {
- results = searcher.search(query, 1024);
+ }
- for (int i = 0; i < results.totalHits; i++)
+ @Override
+ public void setNextReader(org.apache.lucene.index.IndexReader reader, int docBase) throws IOException
{
- ScoreDoc doc = results.scoreDocs[i];
+
+ }
+
+ @Override
+ public void collect(int docNumber) throws IOException
+ {
+ deleteLucandraDocument(indexName, docNumber, autoCommit);
+ numRemoved.incrementAndGet();
+
+ //delete the id reference
+ rm.delete(new QueryPath(CassandraUtils.schemaInfoColumnFamily, ByteBufferUtil.bytes(Integer.toString(docNumber))), System.currentTimeMillis()-1);
+
+ }
- deleteLucandraDocument(indexName, doc.doc, true);
-
- //Scale the doc ID to the sharded id.
- ByteBuffer buf = ByteBufferUtil.bytes(String.valueOf(doc.doc));
- rm.delete(new QueryPath(CassandraUtils.schemaInfoColumnFamily, buf), System.currentTimeMillis());
+ @Override
+ public boolean acceptsDocsOutOfOrder()
+ {
+ return false;
}
-
-
- CassandraUtils.robustInsert(ConsistencyLevel.QUORUM, rm);
-
- total += results.totalHits;
- }while(results.totalHits > 0);
-
- return total;
+ };
+
+ //collector will perform deletes
+ searcher.search(query, collector);
+
+ appendMutations(indexName, workingMutations);
+
+ if(autoCommit)
+ commit(indexName, false);
+
+ return numRemoved.get();
}
public void deleteDocuments(String indexName, Term term, boolean autoCommit) throws CorruptIndexException,
@@ -432,8 +452,8 @@ private void deleteLucandraDocument(String indexName, int docNumber, boolean aut
ByteBuffer key = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, docId);
- List<Row> rows = CassandraUtils.robustRead(key, CassandraUtils.metaColumnPath, Arrays
- .asList(CassandraUtils.documentMetaFieldBytes), CassandraUtils.consistency);
+ List<Row> rows = CassandraUtils.robustRead(key, CassandraUtils.metaColumnPath,
+ Arrays.asList(CassandraUtils.documentMetaFieldBytes), CassandraUtils.consistency);
if (rows.isEmpty() || rows.get(0).cf == null)
return; // nothing to delete
@@ -445,16 +465,18 @@ private void deleteLucandraDocument(String indexName, int docNumber, boolean aut
DocumentMetadata terms = fromBytesUsingThrift(metaCol.value());
Set<String> fields = new HashSet<String>();
-
+
for (ThriftTerm term : terms.getTerms())
{
- //remove from field cache
- if(!fields.contains(term.getField()))
+ // remove from field cache
+ if (!fields.contains(term.getField()))
{
- ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, term.getField().getBytes());
-
- CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily, CassandraUtils.writeVInt(docNumber), fieldCacheKey, (ByteBuffer) null);
-
+ ByteBuffer fieldCacheKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes,
+ term.getField().getBytes());
+
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.fieldCacheColumnFamily,
+ CassandraUtils.writeVInt(docNumber), fieldCacheKey, (ByteBuffer) null);
+
fields.add(term.getField());
}
@@ -468,10 +490,10 @@ private void deleteLucandraDocument(String indexName, int docNumber, boolean aut
throw new RuntimeException("JVM doesn't support UTF-8", e);
}
- CassandraUtils.addMutations(workingMutations, CassandraUtils.termVecColumnFamily, CassandraUtils
- .writeVInt(docNumber), key, (ByteBuffer) null);
+ CassandraUtils.addMutations(workingMutations, CassandraUtils.termVecColumnFamily,
+ CassandraUtils.writeVInt(docNumber), key, (ByteBuffer) null);
}
-
+
// finally delete ourselves
ByteBuffer selfKey = CassandraUtils.hashKeyBytes(indexNameBytes, CassandraUtils.delimeterBytes, docId);
CassandraUtils.addMutations(workingMutations, CassandraUtils.docColumnFamily, (ByteBuffer) null, selfKey,
@@ -609,7 +631,7 @@ public static DocumentMetadata fromBytesUsingThrift(ByteBuffer data) throws IOEx
DocumentMetadata docMeta = new DocumentMetadata();
byte[] decompressedData = CassandraUtils.decompress(ByteBufferUtil.getArray(data));
-
+
TTransport trans = new TMemoryInputTransport(decompressedData);
TProtocol deser = protocolFactory.getProtocol(trans);
View
6 src/solandra/SolandraIndexWriter.java
@@ -402,7 +402,6 @@ public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException
SolandraCoreInfo coreInfo = SolandraCoreContainer.coreInfo.get();
String indexName = coreInfo.indexName;
- boolean isShard = coreInfo.isShard;
List<String> localShards = new ArrayList<String>();
@@ -447,8 +446,9 @@ public void deleteByQuery(DeleteUpdateCommand cmd) throws IOException
for(String subIndex : localShards)
{
Query q = QueryParsing.parseQuery(cmd.query, schema);
- long total = writer.deleteDocuments(subIndex, q, true);
- logger.info("Deleted "+ total + " Documents");
+ long total = writer.deleteDocuments(subIndex, q, false);
+ commit(subIndex, true);
+ logger.info("Deleted "+ total + " Documents in "+subIndex);
}
madeIt = true;
Please sign in to comment.
Something went wrong with that request. Please try again.