Skip to content
This repository has been archived by the owner on Nov 22, 2017. It is now read-only.

Commit

Permalink
integrate cassandra 0.7.1
Browse files Browse the repository at this point in the history
(cherry picked from commit 9a80d0c)
  • Loading branch information
tjake committed Feb 14, 2011
1 parent 852e037 commit 5a35df7
Show file tree
Hide file tree
Showing 13 changed files with 104 additions and 86 deletions.
4 changes: 2 additions & 2 deletions ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
~ under the License.
-->
<ivy-module version="2.0">
<info organisation="lucandra" module="lucandra"/>
<info organisation="solandra" module="solandra"/>

<dependencies>
<dependency org="junit" name="junit" rev="4.6" conf="* -> *,!sources,!javadoc" />
Expand All @@ -33,7 +33,7 @@
<dependency org="org.mortbay.jetty" name="jetty-util" rev="6.1.6" conf="* -> *,!sources,!javadoc" />
<dependency org="org.mortbay.jetty" name="jsp-2.1" rev="6.1.6" conf="* -> *,!sources,!javadoc" />

<dependency org="org.apache.cassandra" name="cassandra-all" rev="0.7.0" conf="* -> *,!sources,!javadoc" />
<dependency org="org.apache.cassandra" name="cassandra-all" rev="0.7.1" conf="* -> *,!sources,!javadoc" />
<dependency org="net.java.dev.jna" name="jna" rev="3.2.7" conf="* -> *,!sources,!javadoc" />
</dependencies>
</ivy-module>
Expand Down
1 change: 1 addition & 0 deletions ivysettings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<settings defaultResolver="ibiblio"/>
<resolvers>
<chain name="chain" dual="true">
<ibiblio name="apache" root="https://repository.apache.org/content/repositories/orgapachecassandra-056" m2compatible="true"/>
<ibiblio name="java.net2" root="http://download.java.net/maven/2/" m2compatible="true"/>
<ibiblio name="cloudera" root="https://repository.cloudera.com/content/repositories/releases/" m2compatible="true" />
<ibiblio name="ibiblio" m2compatible="true" />
Expand Down
6 changes: 0 additions & 6 deletions resources/log4j.properties

This file was deleted.

4 changes: 3 additions & 1 deletion resources/start-solandra.sh
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ fi
if [ "x$pidpath" != "x" ]; then
solandra_parms="$solandra_parms -Dcassandra-pidfile=$pidpath"
fi

solandra_parms="$solandra_parms -Dlog4j.configuration=log4j-solandra.properties -Dlog4j.defaultInitOverride=true"

# The solandra-foreground option will tell Cassandra not
# to close stdout/stderr, but it's up to us not to background.
Expand All @@ -126,6 +128,6 @@ then
sleep 1
echo "Waiting 10 seconds for solandra to start before bootstrapping schema..."
sleep 10
cd cassandra-tools && ./cassandra-cli --host=localhost < solandra.cml
cd cassandra-tools && ./cassandra-cli --host localhost < solandra.cml
echo "Solandra ready"
fi
4 changes: 2 additions & 2 deletions src/lucandra/CassandraUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public static final ByteBuffer intVectorToByteArray(List<Number> intVector)
{

if (intVector.size() == 0)
return FBUtilities.EMPTY_BYTE_BUFFER;
return ByteBufferUtil.EMPTY_BYTE_BUFFER;

if (intVector.get(0) instanceof Byte)
return ByteBuffer.wrap(new byte[] { intVector.get(0).byteValue() });
Expand Down Expand Up @@ -357,7 +357,7 @@ public static List<Row> robustRead(ConsistencyLevel cl, ReadCommand... rc) throw
{
try
{
rows = StorageProxy.readProtocol(Arrays.asList(rc), cl);
rows = StorageProxy.read(Arrays.asList(rc), cl);
break;
}
catch (UnavailableException e1)
Expand Down
4 changes: 2 additions & 2 deletions src/lucandra/IndexReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public Document document(int docNum, FieldSelector selector) throws CorruptIndex
{
// get all columns ( except this skips meta info )
readCommands.add(new SliceFromReadCommand(CassandraUtils.keySpace, key, columnParent,
FBUtilities.EMPTY_BYTE_BUFFER, CassandraUtils.finalTokenBytes, false, Integer.MAX_VALUE));
ByteBufferUtil.EMPTY_BYTE_BUFFER, CassandraUtils.finalTokenBytes, false, Integer.MAX_VALUE));
}
else
{
Expand All @@ -275,7 +275,7 @@ public Document document(int docNum, FieldSelector selector) throws CorruptIndex
}
}

rows = StorageProxy.readProtocol(readCommands, ConsistencyLevel.ONE);
rows = CassandraUtils.robustRead(ConsistencyLevel.ONE, readCommands.toArray(new ReadCommand[]{}));

// allow lookup by row
Map<ByteBuffer, Row> rowMap = new HashMap<ByteBuffer, Row>(keyMap.size());
Expand Down
53 changes: 19 additions & 34 deletions src/lucandra/IndexWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.cassandra.db.*;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -227,7 +228,7 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d

// Store all terms under a row
CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily, CassandraUtils
.createColumnName(term.getKey()), indexTermsKey, FBUtilities.EMPTY_BYTE_BUFFER);
.createColumnName(term.getKey()), indexTermsKey, ByteBufferUtil.EMPTY_BYTE_BUFFER);
}
}

Expand All @@ -249,7 +250,7 @@ public void addDocument(String indexName, Document doc, Analyzer analyzer, int d

// Store all terms under a row
CassandraUtils.addMutations(workingMutations, CassandraUtils.metaInfoColumnFamily, CassandraUtils
.createColumnName(field), indexTermsKey, FBUtilities.EMPTY_BYTE_BUFFER);
.createColumnName(field), indexTermsKey, ByteBufferUtil.EMPTY_BYTE_BUFFER);
}

// Stores each field as a column under this doc key
Expand Down Expand Up @@ -340,46 +341,30 @@ public void deleteDocuments(String indexName, Query query, boolean autoCommit) t
public void deleteDocuments(String indexName, Term term, boolean autoCommit) throws CorruptIndexException,
IOException
{
try
{

ColumnParent cp = new ColumnParent(CassandraUtils.termVecColumnFamily);
ColumnParent cp = new ColumnParent(CassandraUtils.termVecColumnFamily);

ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes(), CassandraUtils.delimeterBytes, term
.field().getBytes(), CassandraUtils.delimeterBytes, term.text().getBytes("UTF-8"));
ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes(), CassandraUtils.delimeterBytes, term
.field().getBytes(), CassandraUtils.delimeterBytes, term.text().getBytes("UTF-8"));

ReadCommand rc = new SliceFromReadCommand(CassandraUtils.keySpace, key, cp, FBUtilities.EMPTY_BYTE_BUFFER,
FBUtilities.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
ReadCommand rc = new SliceFromReadCommand(CassandraUtils.keySpace, key, cp, ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);

List<Row> rows = StorageProxy.readProtocol(Arrays.asList(rc), ConsistencyLevel.ONE);
List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.ONE, rc);

// delete by documentId
for (Row row : rows)
// delete by documentId
for (Row row : rows)
{
if (row.cf != null)
{
if (row.cf != null)
Collection<IColumn> columns = row.cf.getSortedColumns();

for (IColumn col : columns)
{
Collection<IColumn> columns = row.cf.getSortedColumns();

for (IColumn col : columns)
{
deleteLucandraDocument(indexName, CassandraUtils.readVInt(col.name()), autoCommit);
}
}
deleteLucandraDocument(indexName, CassandraUtils.readVInt(col.name()), autoCommit);
}
}

}
catch (TimeoutException e)
{
throw new RuntimeException(e);
}
catch (UnavailableException e)
{
throw new RuntimeException(e);
}
catch (InvalidRequestException e)
{
throw new RuntimeException(e);
}

}

private void deleteLucandraDocument(String indexName, int docNumber, boolean autoCommit) throws IOException
Expand Down
4 changes: 2 additions & 2 deletions src/lucandra/LucandraAllTermDocs.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ private void fillDocBuffer() throws IOException
ByteBuffer key = CassandraUtils.hashKeyBytes(indexName.getBytes(), CassandraUtils.delimeterBytes, "ids".getBytes());

ReadCommand cmd = new SliceFromReadCommand(CassandraUtils.keySpace, key,
new ColumnParent(CassandraUtils.schemaInfoColumnFamily), FBUtilities.EMPTY_BYTE_BUFFER,
FBUtilities.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);
new ColumnParent(CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE);


List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.ONE, cmd);
Expand Down
10 changes: 9 additions & 1 deletion src/lucandra/LucandraTermInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package lucandra;

import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -97,7 +98,14 @@ else if(e.getKey().equals(CassandraUtils.termFrequencyKeyBytes))

else
{
throw new IllegalArgumentException(ByteBufferUtil.string(e.getKey()));
try
{
throw new IllegalArgumentException(ByteBufferUtil.string(e.getKey()));
}
catch (CharacterCodingException e1)
{
throw new IllegalArgumentException(e.getKey().toString());
}
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/lucandra/TermCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Pair<Term,Term> bufferTerms(Term startTerm, int bufferSize) throws IOExce
// Scan range of terms in this field (reversed, so we have a exit point)
List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.ONE,
new SliceFromReadCommand(CassandraUtils.keySpace, termsListKey, fieldColumnFamily, CassandraUtils.createColumnName(startTerm),
FBUtilities.EMPTY_BYTE_BUFFER, false, bufferSize));
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, bufferSize));

ColumnParent columnParent = new ColumnParent(CassandraUtils.termVecColumnFamily);

Expand Down Expand Up @@ -189,7 +189,7 @@ public Pair<Term,Term> bufferTerms(Term startTerm, int bufferSize) throws IOExce


reads.add((ReadCommand) new SliceFromReadCommand(CassandraUtils.keySpace, rowKey, columnParent,
FBUtilities.EMPTY_BYTE_BUFFER, FBUtilities.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE));
ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, Integer.MAX_VALUE));
}

rows = CassandraUtils.robustRead(ConsistencyLevel.ONE, reads.toArray(new ReadCommand[] {}));
Expand All @@ -200,7 +200,7 @@ public Pair<Term,Term> bufferTerms(Term startTerm, int bufferSize) throws IOExce
if (logger.isDebugEnabled())
{
logger.debug("Found " + rows.size() + " rows in range:" + startTerm + " to "
+ ByteBufferUtil.string(FBUtilities.EMPTY_BYTE_BUFFER) + " in "
+ "" + " in "
+ (System.currentTimeMillis() - start) + "ms");

}
Expand Down
61 changes: 35 additions & 26 deletions src/lucandra/TermFreqVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -117,46 +118,54 @@ public TermFreqVector(String indexName, String field, int docI)

int i = 0;

for (Row row : rows)
try
{
String rowKey = ByteBufferUtil.string(row.key.key, CassandraUtils.UTF_8);
for (Row row : rows)
{
String rowKey = ByteBufferUtil.string(row.key.key, CassandraUtils.UTF_8);

String termStr = rowKey.substring(rowKey.indexOf(CassandraUtils.delimeter)
+ CassandraUtils.delimeter.length());
String termStr = rowKey.substring(rowKey.indexOf(CassandraUtils.delimeter)
+ CassandraUtils.delimeter.length());

Term t = CassandraUtils.parseTerm(termStr);
Term t = CassandraUtils.parseTerm(termStr);

terms[i] = t.text();
terms[i] = t.text();

// Find the offsets and positions
LucandraTermInfo termInfo = null;
// Find the offsets and positions
LucandraTermInfo termInfo = null;

if (row.cf != null)
{
termInfo = new LucandraTermInfo(0, row.cf.getSortedColumns().iterator().next().value());
if (row.cf != null)
{
termInfo = new LucandraTermInfo(0, row.cf.getSortedColumns().iterator().next().value());

termPositions[i] = termInfo.positions;
}
termPositions[i] = termInfo.positions;
}

freqVec[i] = termPositions[i].length;
freqVec[i] = termPositions[i].length;

if (termInfo == null || !termInfo.hasOffsets)
{
termOffsets[i] = TermVectorOffsetInfo.EMPTY_OFFSET_INFO;
}
else
{
if (termInfo == null || !termInfo.hasOffsets)
{
termOffsets[i] = TermVectorOffsetInfo.EMPTY_OFFSET_INFO;
}
else
{

int[] offsets = termInfo.offsets;
int[] offsets = termInfo.offsets;

termOffsets[i] = new TermVectorOffsetInfo[freqVec[i]];
for (int j = 0, k = 0; j < offsets.length; j += 2, k++)
{
termOffsets[i][k] = new TermVectorOffsetInfo(offsets[j], offsets[j + 1]);
termOffsets[i] = new TermVectorOffsetInfo[freqVec[i]];
for (int j = 0, k = 0; j < offsets.length; j += 2, k++)
{
termOffsets[i][k] = new TermVectorOffsetInfo(offsets[j], offsets[j + 1]);
}
}

i++;
}
}
catch (CharacterCodingException e)
{
throw new RuntimeException(e);

i++;
}

}
Expand Down
31 changes: 25 additions & 6 deletions src/lucandra/cluster/CassandraIndexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
Expand Down Expand Up @@ -150,7 +151,7 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
.getBytes());

ReadCommand cmd = new SliceFromReadCommand(CassandraUtils.keySpace, key, new ColumnParent(
CassandraUtils.schemaInfoColumnFamily), FBUtilities.EMPTY_BYTE_BUFFER, FBUtilities.EMPTY_BYTE_BUFFER,
CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBufferUtil.EMPTY_BYTE_BUFFER,
false, 100);

List<Row> rows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);
Expand Down Expand Up @@ -178,8 +179,8 @@ private ShardInfo getShardInfo(String indexName, boolean force) throws IOExcepti
// goto each shard and get local offset
cmd = new SliceFromReadCommand(CassandraUtils.keySpace, CassandraUtils.hashKeyBytes((indexName
+ "~" + shardStr).getBytes(), CassandraUtils.delimeterBytes, "shards".getBytes()),
new ColumnParent(CassandraUtils.schemaInfoColumnFamily), FBUtilities.EMPTY_BYTE_BUFFER,
FBUtilities.EMPTY_BYTE_BUFFER, false, 100);
new ColumnParent(CassandraUtils.schemaInfoColumnFamily), ByteBufferUtil.EMPTY_BYTE_BUFFER,
ByteBufferUtil.EMPTY_BYTE_BUFFER, false, 100);

List<Row> lrows = CassandraUtils.robustRead(ConsistencyLevel.QUORUM, cmd);

Expand Down Expand Up @@ -358,7 +359,7 @@ public long getNextId(String indexName, String key, RowMutation[] rowMutations)
ByteBuffer idVal = ByteBuffer.wrap(val.toString().getBytes());

RowMutation rm2 = new RowMutation(CassandraUtils.keySpace, keyKey);
rm2.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, keyCol, idVal), FBUtilities.EMPTY_BYTE_BUFFER,
rm2.add(new QueryPath(CassandraUtils.schemaInfoColumnFamily, keyCol, idVal), ByteBufferUtil.EMPTY_BYTE_BUFFER,
System.nanoTime());

// Update last offset info for this shard
Expand Down Expand Up @@ -571,7 +572,14 @@ public int compare(IdInfo o1, IdInfo o2)
if (!(c instanceof ExpiringColumn) && !(c instanceof DeletedColumn))
{
if (logger.isDebugEnabled())
logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
try
{
logger.debug(offset + " was taken by " + ByteBufferUtil.string(c.name()));
}
catch (CharacterCodingException e)
{

}

winningToken = null;
break;
Expand All @@ -593,8 +601,19 @@ public int compare(IdInfo o1, IdInfo o2)
}
}


String winningTokenStr;
try
{
winningTokenStr = ByteBufferUtil.string(winningToken);
}
catch (CharacterCodingException e)
{
throw new RuntimeException(e);
}

// we won!
if (winningToken != null && ByteBufferUtil.string(winningToken).equals(myToken))
if (winningToken != null && winningTokenStr.equals(myToken))
{
int numReserved = 0;
for (int i = nextOffset; i == nextOffset || i % reserveSlabSize != 0; i++)
Expand Down
Loading

0 comments on commit 5a35df7

Please sign in to comment.