Permalink
Browse files

No more supercolumns, upgrade to latest 0.7 rc

  • Loading branch information...
1 parent 0cdcabd commit 89797a155d83182a1337f748e84aa89f3e611ac2 @tjake committed Dec 20, 2010
View
@@ -36,7 +36,7 @@
<dependency org="snakeyaml" name="snakeyaml" rev="1.6" conf="* -> *,!sources,!javadoc" />
<dependency org="org.safehaus.jug" name="jug-asl" rev="2.0.0" conf="* -> *,!sources,!javadoc" />
<dependency org="libthrift" name="libthrift" rev="0.5" conf="* -> *,!sources,!javadoc" />
- <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.7.0-beta3" conf="* -> *,!sources,!javadoc">
+ <dependency org="org.apache.cassandra" name="apache-cassandra" rev="0.7.0-rc2" conf="* -> *,!sources,!javadoc">
<exclude module="hadoop-core"/>
<exclude module="hadoop-streaming"/>
<exclude module="avro"/>
@@ -1,11 +1,30 @@
-create keyspace L with replication_factor=1 and placement_strategy='org.apache.cassandra.locator.SimpleStrategy'
+create keyspace L
+ with replication_factor=1 and
+ placement_strategy='org.apache.cassandra.locator.SimpleStrategy';
-use L
+use L;
-create column family Docs with comparator=BytesType and keys_cached=10000000 and rows_cached=1000 and comment='Stores the document and field data for each doc with docId as key'
+create column family Docs
+ with comparator=BytesType and
+ keys_cached=10000000 and
+ rows_cached=1000 and
+ comment='Stores the document and field data for each doc with docId as key';
-create column family TI with comparator = 'lucandra.VIntType' and column_type = Super and keys_cached = 10000000 and rows_cached = 1000 and comment = 'Stores term information with indexName/field/term as composite key'
+create column family TI
+ with comparator = 'lucandra.VIntType' and
+ keys_cached = 10000000 and
+ rows_cached = 1000 and
+ comment = 'Stores term information with indexName/field/term as composite key';
-create column family TL with comparator = BytesType and keys_cached = 1000000 and rows_cached = 1000 and comment = 'Stores ordered list of terms for a given field with indexName/field as composite key'
+create column family TL
+ with comparator = BytesType and
+ keys_cached = 1000000 and
+ rows_cached = 1000 and
+ comment = 'Stores ordered list of terms for a given field with indexName/field as composite key';
-create column family SI with comparator = BytesType and column_type = Super and keys_cached = 100000 and rows_cached = 1000 and comment = 'Stores solr and index id information'
+create column family SI
+ with comparator = BytesType and
+ column_type = Super and
+ keys_cached = 100000 and
+ rows_cached = 1000 and
+ comment = 'Stores solr and index id information';
@@ -19,7 +19,7 @@
#
#Increase this to increase the number of shards loaded at once
#
-SHARDS_AT_ONCE="1"
+SHARDS_AT_ONCE="2"
#
#Log to stderr
@@ -75,7 +75,7 @@
public static final int maxDocsPerShard = (int) Math.pow(2, 17);
- public static final List<Number> emptyArray = Arrays.asList(new Number[] { 0 });
+ public static final List<Number> emptyArray = Arrays.asList(new Number[] {});
public static final String delimeter = new String("\uffff");
public static final byte[] delimeterBytes;
@@ -219,31 +219,21 @@ public static final ByteBuffer intVectorToByteArray(List<Number> intVector)
if (intVector.get(0) instanceof Byte)
return ByteBuffer.wrap(new byte[] { intVector.get(0).byteValue() });
- ByteBuffer buffer = ByteBuffer.allocate(4 * intVector.size());
+
+
+ ByteBuffer buffer = ByteBuffer.allocate(4 * (intVector.size()+1));
+ //Number of int's
+ buffer.putInt(intVector.size());
+
for (Number i : intVector)
{
buffer.putInt(i.intValue());
}
- buffer.rewind();
+ buffer.flip();
return buffer;
}
- public static boolean compareByteArrays(byte[] a, byte[] b)
- {
-
- if (a.length != b.length)
- return false;
-
- for (int i = 0; i < a.length; i++)
- {
- if (a[i] != b[i])
- return false;
- }
-
- return true;
-
- }
public static final int[] byteArrayToIntArray(ByteBuffer b)
{
@@ -263,42 +253,24 @@ public static boolean compareByteArrays(byte[] a, byte[] b)
return intArray;
}
- public static final byte[] encodeLong(long l)
- {
- ByteBuffer buffer = ByteBuffer.allocate(8);
+
- buffer.putLong(l);
-
- return buffer.array();
- }
-
- public static final long decodeLong(byte[] bytes)
- {
-
- if (bytes.length != 8)
- throw new RuntimeException("must be 8 bytes");
-
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
-
- return buffer.getLong();
- }
-
-
+
public static void addMutations(Map<ByteBuffer, RowMutation> mutationList, String columnFamily, byte[] column,
- ByteBuffer key, byte[] value, Map<ByteBuffer, List<Number>> superColumns)
+ ByteBuffer key, byte[] value)
{
- addMutations(mutationList, columnFamily, ByteBuffer.wrap(column), key, ByteBuffer.wrap(value), superColumns);
+ addMutations(mutationList, columnFamily, ByteBuffer.wrap(column), key, ByteBuffer.wrap(value));
}
public static void addMutations(Map<ByteBuffer, RowMutation> mutationList, String columnFamily, byte[] column,
- ByteBuffer key, ByteBuffer value, Map<ByteBuffer, List<Number>> superColumns)
+ ByteBuffer key, ByteBuffer value)
{
- addMutations(mutationList, columnFamily, ByteBuffer.wrap(column), key, value, superColumns);
+ addMutations(mutationList, columnFamily, ByteBuffer.wrap(column), key, value);
}
public static void addMutations(Map<ByteBuffer, RowMutation> mutationList, String columnFamily, ByteBuffer column,
- ByteBuffer key, ByteBuffer value, Map<ByteBuffer, List<Number>> superColumns)
+ ByteBuffer key, ByteBuffer value)
{
// Find or create row mutation
@@ -311,12 +283,12 @@ public static void addMutations(Map<ByteBuffer, RowMutation> mutationList, Strin
mutationList.put(key, rm);
}
- if (value == null && superColumns == null)
+ if (value == null)
{ // remove
if (column != null)
{
- rm.delete(new QueryPath(columnFamily, column), System.currentTimeMillis());
+ rm.delete(new QueryPath(columnFamily, null, column), System.currentTimeMillis());
}
else
{
@@ -327,21 +299,8 @@ public static void addMutations(Map<ByteBuffer, RowMutation> mutationList, Strin
else
{ // insert
- if (superColumns == null)
- {
-
- rm.add(new QueryPath(columnFamily, null, column), value, System.currentTimeMillis());
-
- }
- else
- {
-
- for (Map.Entry<ByteBuffer, List<Number>> e : superColumns.entrySet())
- {
- rm.add(new QueryPath(columnFamily, column, e.getKey()), intVectorToByteArray(e.getValue()), System
- .currentTimeMillis());
- }
- }
+ rm.add(new QueryPath(columnFamily, null, column), value, System.currentTimeMillis());
+
}
}
@@ -365,6 +324,7 @@ public static void robustInsert(ConsistencyLevel cl, RowMutation... mutations )
{
}
+
try
{
@@ -516,14 +476,32 @@ public static ByteBuffer hashKeyBytes(byte[]... keys)
return ByteBuffer.wrap(hashedKey);
}
- public static int readVInt(ByteBuffer buf)
+ public static int mreadVInt(ByteBuffer buf)
{
int length = buf.remaining();
+
+ if(length == 0)
+ return 0;
+
+ byte b = buf.get();
+ int i = b & 0x7F;
+ for (int pos = 1, shift = 7; (b & 0x80) != 0 && pos < length; shift += 7, pos++)
+ {
+ b = buf.get();
+ i |= (b & 0x7F) << shift;
+ }
+ return i;
+ }
+
+ public static int readVInt(ByteBuffer buf)
+ {
+ int length = buf.remaining();
+
if(length == 0)
return 0;
- byte b = buf.array()[buf.position() + buf.arrayOffset()];
+ byte b = buf.array()[buf.position()+buf.arrayOffset()];
int i = b & 0x7F;
for (int pos = 1, shift = 7; (b & 0x80) != 0 && pos < length; shift += 7, pos++)
{
@@ -534,7 +512,7 @@ public static int readVInt(ByteBuffer buf)
return i;
}
- public static ByteBuffer writeVInt(int i)
+ public static byte[] writeVInt(int i)
{
int length = 0;
int p = i;
@@ -556,6 +534,6 @@ public static ByteBuffer writeVInt(int i)
}
buf[pos] = (byte) i;
- return ByteBuffer.wrap(buf);
+ return buf;
}
}
@@ -438,30 +438,23 @@ public TermEnum terms(Term term) throws IOException {
return termEnum;
}
- public void addDocumentNormalizations(Collection<IColumn> allDocs, String field) {
+ public void addDocumentNormalizations(LucandraTermInfo[] allDocs, String field) {
Map<String, byte[]> fieldNorms = getFieldNorms();
byte[] norms = fieldNorms.get(field);
- for (IColumn docInfo : allDocs) {
+ for (LucandraTermInfo docInfo : allDocs) {
- int idx = CassandraUtils.readVInt(docInfo.name());
+ int idx = docInfo.docId;
if (idx > numDocs)
throw new IllegalStateException("numDocs reached");
getDocsHit().set(idx);
- Byte norm = null;
- IColumn normCol = docInfo.getSubColumn(CassandraUtils.normsKeyBytes);
- if (normCol != null) {
- if (normCol.value().remaining() != 1)
- throw new IllegalStateException("Norm for field '" + field + "' must be a single byte, currently "+normCol.value().remaining());
-
- norm = normCol.value().array()[normCol.value().position() + normCol.value().arrayOffset()];
- }
-
+ Byte norm = docInfo.norm;
+
if (norm == null)
norm = defaultNorm;
@@ -98,7 +98,7 @@ public void addDocument(Document doc, Analyzer analyzer, String indexName, int d
docNumber = docNumber % CassandraUtils.maxDocsPerShard;
- ByteBuffer docId = CassandraUtils.writeVInt(docNumber);
+ ByteBuffer docId = ByteBuffer.wrap(CassandraUtils.writeVInt(docNumber));
int position = 0;
for (Fieldable field : (List<Fieldable>) doc.getFields()) {
@@ -226,9 +226,8 @@ public void addDocument(Document doc, Analyzer analyzer, String indexName, int d
term.getValue().put(CassandraUtils.normsKeyBytes, bnorm);
}
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, null, term.getValue());
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.metaInfoColumnFamily, term.getKey().text().getBytes("UTF-8"), termkey, FBUtilities.EMPTY_BYTE_BUFFER, null);
-
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, new LucandraTermInfo(docNumber, term.getValue()).serialize());
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.metaInfoColumnFamily, term.getKey().text().getBytes("UTF-8"), termkey, FBUtilities.EMPTY_BYTE_BUFFER);
}
}
@@ -247,8 +246,8 @@ public void addDocument(Document doc, Analyzer analyzer, String indexName, int d
termMap.put(CassandraUtils.termFrequencyKeyBytes, CassandraUtils.emptyArray);
termMap.put(CassandraUtils.positionVectorKeyBytes, CassandraUtils.emptyArray);
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, null, termMap);
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.metaInfoColumnFamily, field.stringValue().getBytes("UTF-8"), termkey, FBUtilities.EMPTY_BYTE_BUFFER, null);
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, new LucandraTermInfo(docNumber, termMap).serialize());
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.metaInfoColumnFamily, field.stringValue().getBytes("UTF-8"), termkey, FBUtilities.EMPTY_BYTE_BUFFER);
}
// Stores each field as a column under this doc key
@@ -283,12 +282,12 @@ public void addDocument(Document doc, Analyzer analyzer, String indexName, int d
// Store each field as a column under this docId
for (Map.Entry<String, byte[]> field : fieldCache.entrySet()) {
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.docColumnFamily, field.getKey().getBytes("UTF-8"), key, field.getValue(), null);
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.docColumnFamily, field.getKey().getBytes("UTF-8"), key, field.getValue());
}
// Finally, Store meta-data so we can delete this document
CassandraUtils.addMutations(getMutationList(), CassandraUtils.docColumnFamily, CassandraUtils.documentMetaFieldBytes, key, CassandraUtils
- .toBytes(allIndexedTerms), null);
+ .toBytes(allIndexedTerms));
if (isAutoCommit()) {
CassandraUtils.robustInsert(ConsistencyLevel.ONE, getMutationList().values().toArray(new RowMutation[]{}));
@@ -378,12 +377,12 @@ private void deleteLucandraDocument(byte[] docId) {
throw new RuntimeException("JVM doesn't support UTF-8", e);
}
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, (ByteBuffer)null, null);
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.termVecColumnFamily, docId, key, (ByteBuffer)null);
}
// finally delete ourselves
ByteBuffer selfKey = CassandraUtils.hashKeyBytes(getIndexName().getBytes(), CassandraUtils.delimeterBytes, docId);
- CassandraUtils.addMutations(getMutationList(), CassandraUtils.docColumnFamily, (ByteBuffer)null, selfKey, (ByteBuffer)null, null);
+ CassandraUtils.addMutations(getMutationList(), CassandraUtils.docColumnFamily, (ByteBuffer)null, selfKey, (ByteBuffer)null);
if (isAutoCommit()){
CassandraUtils.robustInsert(ConsistencyLevel.ONE, getMutationList().values().toArray(new RowMutation[]{}));
@@ -392,9 +391,7 @@ private void deleteLucandraDocument(byte[] docId) {
}
public void updateDocument(Term updateTerm, Document doc, Analyzer analyzer, int docNumber) throws CorruptIndexException, IOException {
-
-
-
+
deleteDocuments(updateTerm);
addDocument(doc, analyzer, docNumber);
@@ -50,7 +50,7 @@ public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
List<ByteBuffer> filteredValues = new ArrayList<ByteBuffer>();
for(int i=0; i<docsHit.capacity(); i++){
if(docsHit.fastGet(i))
- filteredValues.add(CassandraUtils.writeVInt(i));
+ filteredValues.add(ByteBuffer.wrap(CassandraUtils.writeVInt(i)));
}
if (filteredValues.size() == 0)
@@ -59,7 +59,7 @@ public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
LucandraTermDocs termDocs = (LucandraTermDocs) reader.termDocs();
for (Term term : terms) {
- IColumn[] terms = termDocs.filteredSeek(term, filteredValues);
+ LucandraTermInfo[] terms = termDocs.filteredSeek(term, filteredValues);
// This is a conjunction and at least one value must match
if (terms == null)
return null;
Oops, something went wrong.

0 comments on commit 89797a1

Please sign in to comment.