Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' of github.com:rantav/hector
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Jan 23, 2011
2 parents afbd77a + de1becb commit 85b5072
Show file tree
Hide file tree
Showing 9 changed files with 674 additions and 113 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG
Expand Up @@ -12,7 +12,7 @@ Added class-level default for failoverPolicy
Rudimentary (very much so) JPA 2.0 support for save and load via openjpa
Ligthweight ORM for JPA 1.0 annotations (additional improvements to BTodd's hector-object-mapper merged in)
Some cleanup of bytebuffer handling in serializers
Initial cut at virtual keyspaces (aka client-level multi-tennancy)
Initial cut at virtual keyspaces (aka client-level multi-tenancy), see https://github.com/rantav/hector/wiki/Virtual-Keyspaces
CassandraHostConfigurator and related are now all correctly serializable (patch by Thor Carpenter)

0.7.0-23
Expand Down
Expand Up @@ -5,17 +5,17 @@
import me.prettyprint.cassandra.connection.HConnectionManager;
import me.prettyprint.cassandra.service.FailoverPolicy;
import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.cassandra.service.PrefixedKeyspaceServiceImpl;
import me.prettyprint.cassandra.service.VirtualKeyspaceServiceImpl;
import me.prettyprint.hector.api.ConsistencyLevelPolicy;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.exceptions.HectorException;

public class ExecutingPrefixedKeyspace<E> extends ExecutingKeyspace {
public class ExecutingVirtualKeyspace<E> extends ExecutingKeyspace {

E keyPrefix;
Serializer<E> keyPrefixSerializer;

public ExecutingPrefixedKeyspace(String keyspace, E keyPrefix,
public ExecutingVirtualKeyspace(String keyspace, E keyPrefix,
Serializer<E> keyPrefixSerializer, HConnectionManager connectionManager,
ConsistencyLevelPolicy consistencyLevelPolicy,
FailoverPolicy failoverPolicy) {
Expand All @@ -25,7 +25,7 @@ public ExecutingPrefixedKeyspace(String keyspace, E keyPrefix,
this.keyPrefixSerializer = keyPrefixSerializer;
}

public ExecutingPrefixedKeyspace(String keyspace, E keyPrefix,
public ExecutingVirtualKeyspace(String keyspace, E keyPrefix,
Serializer<E> keyPrefixSerializer, HConnectionManager connectionManager,
ConsistencyLevelPolicy consistencyLevelPolicy,
FailoverPolicy failoverPolicy, Map<String, String> credentials) {
Expand All @@ -41,7 +41,7 @@ public <T> ExecutionResult<T> doExecute(KeyspaceOperationCallback<T> koc)
throws HectorException {
KeyspaceService ks = null;
try {
ks = new PrefixedKeyspaceServiceImpl(keyspace, keyPrefix,
ks = new VirtualKeyspaceServiceImpl(keyspace, keyPrefix,
keyPrefixSerializer, consistencyLevelPolicy, connectionManager,
failoverPolicy, credentials);
return koc.doInKeyspaceAndMeasure(ks);
Expand Down
Expand Up @@ -6,40 +6,36 @@
import java.util.List;
import java.util.Map;

import me.prettyprint.cassandra.model.thrift.ThriftConverter;
import me.prettyprint.cassandra.service.ExceptionsTranslator;
import me.prettyprint.cassandra.service.ExceptionsTranslatorImpl;
import me.prettyprint.cassandra.service.Operation;
import me.prettyprint.cassandra.service.OperationType;
import me.prettyprint.cassandra.service.KeyspaceService;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.Serializer;
import me.prettyprint.hector.api.beans.OrderedRows;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.query.QueryResult;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.IndexClause;
import org.apache.cassandra.thrift.IndexExpression;
import org.apache.cassandra.thrift.IndexOperator;
import org.apache.cassandra.thrift.KeySlice;

/**
* Uses new secondary indexes. Your CF must be configured for such to use this.
* The following creates an Indexed CF with the "birthday" column indexed
* (where birthdate represents a timestamp as it is validated by the LongType):
* The following creates an Indexed CF with the "birthday" column indexed (where
* birthdate represents a timestamp as it is validated by the LongType):
*
* <pre>
* - name: Indexed1
* column_metadata:
* - name: birthdate
* validator_class: LongType
* index_type: KEYS
*</pre>
*
* </pre>
*
* @author zznate (nate@riptano.com)
*/
public class IndexedSlicesQuery<K,N,V> extends AbstractSliceQuery<K,N,V,OrderedRows<K,N,V>> {
public class IndexedSlicesQuery<K, N, V> extends
AbstractSliceQuery<K, N, V, OrderedRows<K, N, V>> {

private final IndexClause indexClause;

Expand All @@ -49,38 +45,41 @@ public IndexedSlicesQuery(Keyspace k, Serializer<K> keySerializer,
indexClause = new IndexClause();
}

public IndexedSlicesQuery<K,N,V> addEqualsExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer.toByteBuffer(columnName),
IndexOperator.EQ,
valueSerializer.toByteBuffer(columnValue)));
public IndexedSlicesQuery<K, N, V> addEqualsExpression(N columnName,
V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer
.toByteBuffer(columnName), IndexOperator.EQ, valueSerializer
.toByteBuffer(columnValue)));
return this;
}

public IndexedSlicesQuery<K,N,V> addLteExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer.toByteBuffer(columnName),
IndexOperator.LTE,
valueSerializer.toByteBuffer(columnValue)));
public IndexedSlicesQuery<K, N, V> addLteExpression(N columnName,
V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer
.toByteBuffer(columnName), IndexOperator.LTE, valueSerializer
.toByteBuffer(columnValue)));
return this;
}

public IndexedSlicesQuery<K,N,V> addGteExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer.toByteBuffer(columnName),
IndexOperator.GTE,
valueSerializer.toByteBuffer(columnValue)));
public IndexedSlicesQuery<K, N, V> addGteExpression(N columnName,
V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer
.toByteBuffer(columnName), IndexOperator.GTE, valueSerializer
.toByteBuffer(columnValue)));
return this;
}

public IndexedSlicesQuery<K,N,V> addLtExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer.toByteBuffer(columnName),
IndexOperator.LT,
valueSerializer.toByteBuffer(columnValue)));
public IndexedSlicesQuery<K, N, V> addLtExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer
.toByteBuffer(columnName), IndexOperator.LT, valueSerializer
.toByteBuffer(columnValue)));
return this;
}

public IndexedSlicesQuery<K,N,V> addGtExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer.toByteBuffer(columnName),
IndexOperator.GT,
valueSerializer.toByteBuffer(columnValue)));
public IndexedSlicesQuery<K, N, V> addGtExpression(N columnName, V columnValue) {
indexClause.addToExpressions(new IndexExpression(columnNameSerializer
.toByteBuffer(columnName), IndexOperator.GT, valueSerializer
.toByteBuffer(columnValue)));
return this;
}

Expand Down Expand Up @@ -109,54 +108,43 @@ public IndexedSlicesQuery<K, N, V> setReturnKeysOnly() {
return this;
}

public IndexedSlicesQuery<K,N,V> setStartKey(K startKey) {
public IndexedSlicesQuery<K, N, V> setStartKey(K startKey) {
indexClause.setStart_key(keySerializer.toByteBuffer(startKey));
return this;
}

@Override
public IndexedSlicesQuery<K,N,V> setColumnFamily(String cf) {
public IndexedSlicesQuery<K, N, V> setColumnFamily(String cf) {
super.setColumnFamily(cf);
return this;
}
public IndexedSlicesQuery<K,N,V> setRowCount(int rowCount) {

public IndexedSlicesQuery<K, N, V> setRowCount(int rowCount) {
indexClause.setCount(rowCount);
return this;
}

@Override
public QueryResult<OrderedRows<K,N, V>> execute() {
public QueryResult<OrderedRows<K, N, V>> execute() {

return new QueryResultImpl<OrderedRows<K,N,V>>(keyspace.doExecuteOperation(
new Operation <OrderedRows<K,N,V>>(OperationType.READ) {
return new QueryResultImpl<OrderedRows<K, N, V>>(
keyspace.doExecute(new KeyspaceOperationCallback<OrderedRows<K, N, V>>() {
@Override
public OrderedRows<K,N,V> execute(Cassandra.Client client) throws HectorException {
LinkedHashMap<ByteBuffer, List<Column>> ret = null;
try {
if (!indexClause.isSetStart_key()) {
indexClause.setStart_key(new byte[0]);
}
ColumnParent columnParent = new ColumnParent(columnFamilyName);
List<KeySlice> keySlices = client.get_indexed_slices(columnParent, indexClause,
getPredicate(), ThriftConverter.consistencyLevel(consistencyLevelPolicy.get(operationType)));

ret = (keySlices == null || keySlices.isEmpty() ) ? new LinkedHashMap<ByteBuffer, List<Column>>(0) :
new LinkedHashMap<ByteBuffer, List<Column>>(
keySlices.size());
for (KeySlice keySlice : keySlices) {
ret.put(ByteBuffer.wrap(keySlice.getKey()), ThriftConverter.getColumnList(keySlice.getColumns()));
}
} catch (Exception e) {
HectorException he = keyspace.getExceptionsTranslator().translate(e);
setException(he);
throw he;
public OrderedRows<K, N, V> doInKeyspace(KeyspaceService ks)
throws HectorException {
Map<ByteBuffer, List<Column>> ret = null;
if (!indexClause.isSetStart_key()) {
indexClause.setStart_key(new byte[0]);
}
ColumnParent columnParent = new ColumnParent(columnFamilyName);
ret = ks
.getIndexedSlices(columnParent, indexClause, getPredicate());
Map<K, List<Column>> thriftRet = keySerializer.fromBytesMap(ret);
return new OrderedRowsImpl<K,N,V>((LinkedHashMap<K, List<Column>>) thriftRet, columnNameSerializer, valueSerializer);
return new OrderedRowsImpl<K, N, V>(
(LinkedHashMap<K, List<Column>>) thriftRet,
columnNameSerializer, valueSerializer);
}
}), this);
}



}

0 comments on commit 85b5072

Please sign in to comment.