Permalink
Browse files

small refactor

  • Loading branch information...
1 parent 9fb4f4d commit db9226c63cb381cea67742be30a393c048f4e5f6 @ticktock committed Jun 3, 2010
Showing with 10 additions and 9 deletions.
  1. +1 −0 .gitignore
  2. +9 −9 src/main/java/org/apache/activemq/store/cassandra/CassandraClient.java
View
@@ -4,3 +4,4 @@ qsandra.iws
qsandra.iml
.project
.classpath
+*.patch
@@ -14,12 +14,13 @@
import java.util.concurrent.atomic.AtomicLong;
import static org.apache.activemq.store.cassandra.CassandraIdentifier.*;
+import static org.apache.activemq.store.cassandra.CassandraUtils.*;
import static org.apache.cassandra.thrift.ConsistencyLevel.*;
/**
*
*/
-public class CassandraClient extends CassandraUtils {
+public class CassandraClient {
public static final ColumnPath DESTINATION_QUEUE_SIZE_COLUMN_PATH = new ColumnPath(DESTINATIONS_FAMILY.string());
@@ -329,19 +330,18 @@ public void saveMessage(ActiveMQDestination destination, long id, MessageId mess
saveMutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
saveMutation.put(MESSAGE_TO_STORE_ID_FAMILY.string(), indexMutations);
saveMutation.put(STORE_IDS_IN_USE_FAMILY.string(), storeIdsMutations);
- long colName = id;
- log.debug("Saving message with id:{}", colName);
+ log.debug("Saving message with id:{}", id);
log.debug("Saving message with brokerSeq id:{}", messageId.getBrokerSequenceId());
long current = timestamp();
- messageMutations.add(getInsertOrUpdateColumnMutation(getBytes(colName), messageBytes, current));
+ messageMutations.add(getInsertOrUpdateColumnMutation(getBytes(id), messageBytes, current));
destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), getBytes(queueSize.incrementAndGet()), current));
- //Timestamp is ID so max will always be there
+
destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes(), getBytes(id), current));
destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_BROKER_SEQUENCE_COLUMN.bytes(), getBytes(messageId.getBrokerSequenceId()), current));
- //TODO what is thre right timestamp here
+
indexMutations.add(getInsertOrUpdateColumnMutation(getMessageIndexKey(messageId), getBytes(id), current));
- storeIdsMutations.add(getInsertOrUpdateColumnMutation(getBytes(colName), new byte[1], current));
+ storeIdsMutations.add(getInsertOrUpdateColumnMutation(getBytes(id), new byte[1], current));
try {
getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
} catch (Exception e) {
@@ -435,7 +435,7 @@ private void recoverMessagesFromTo(String key, byte[] start, byte[] end, int lim
SliceRange range = new SliceRange(start, end, false, limit);
predicate.setSlice_range(range);
- List<ColumnOrSuperColumn> cols = null;
+ List<ColumnOrSuperColumn> cols;
try {
cols = getCassandraConnection().get_slice(KEYSPACE.string(), key, new ColumnParent(MESSAGES_FAMILY.string()), predicate, consistencyLevel);
@@ -591,7 +591,7 @@ public SubscriptionInfo lookupSubscription(ActiveMQDestination destination, Stri
}
- return info.toArray(new SubscriptionInfo[0]);
+ return info.toArray(new SubscriptionInfo[info.size()]);
} catch (Exception e) {
log.error("Exception in lookupSubscription", e);
discacrdCassandraConnection();

0 comments on commit db9226c

Please sign in to comment.