Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

working client and cleanup

  • Loading branch information...
commit d633fe51adbd47de07b6d079d128f6e1cde90bfe 1 parent 3341ac1
@ticktock authored
View
763 src/main/java/org/apache/activemq/store/cassandra/CassandraClient.java
@@ -1,763 +0,0 @@
-package org.apache.activemq.store.cassandra;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.SubscriptionInfo;
-import org.apache.cassandra.thrift.*;
-import org.apache.cassandra.utils.BloomFilter;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static org.apache.activemq.store.cassandra.CassandraIdentifier.*;
-import static org.apache.activemq.store.cassandra.CassandraUtils.*;
-import static org.apache.cassandra.thrift.ConsistencyLevel.*;
-
-/**
- *
- */
-public class CassandraClient {
-
-
- public static final ColumnPath DESTINATION_QUEUE_SIZE_COLUMN_PATH = new ColumnPath(DESTINATIONS_FAMILY.string());
- public static final ColumnPath BROKER_DESTINATION_COUNT_COLUMN_PATH = new ColumnPath(BROKER_FAMILY.string());
-
- static {
- DESTINATION_QUEUE_SIZE_COLUMN_PATH.setColumn(DESTINATION_QUEUE_SIZE_COLUMN.bytes());
- BROKER_DESTINATION_COUNT_COLUMN_PATH.setColumn(BROKER_DESTINATION_COUNT.bytes());
- }
-
- /*Subscriptions Column Family Constants*/
- public static final String SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER = "~~~~~";
- public static final String SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME = "@NOT_SET@";
-
- public static final byte[] ZERO = getBytes(0L);
-
-
- private static ThreadLocal<Cassandra.Client> cassandraClients = new ThreadLocal<Cassandra.Client>();
- private static ThreadLocal<TTransport> cassandraTransports = new ThreadLocal<TTransport>();
- private static Logger log = LoggerFactory.getLogger(CassandraClient.class);
-
- private String cassandraHost;
- private int cassandraPort;
-
- private ConsistencyLevel consistencyLevel = QUORUM;
-
-
- public ConsistencyLevel getConsistencyLevel() {
- return consistencyLevel;
- }
-
- public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
- if (!EnumSet.of(QUORUM, DCQUORUM, DCQUORUMSYNC).contains(consistencyLevel)) {
- throw new IllegalArgumentException("Only QUORUM or DCQUORUM or DCQUORUMSYNC are supported consistency levels.");
- }
- this.consistencyLevel = consistencyLevel;
- }
-
- public String getCassandraHost() {
- return cassandraHost;
- }
-
- public void setCassandraHost(String cassandraHost) {
- this.cassandraHost = cassandraHost;
- }
-
- public int getCassandraPort() {
- return cassandraPort;
- }
-
- public void setCassandraPort(int cassandraPort) {
- this.cassandraPort = cassandraPort;
- }
-
- Cassandra.Client getCassandraConnection() {
- Cassandra.Client client = cassandraClients.get();
- TTransport tr = cassandraTransports.get();
- if (client == null || !tr.isOpen()) {
- tr = new TSocket(cassandraHost, cassandraPort);
- TProtocol proto = new TBinaryProtocol(tr);
- client = new Cassandra.Client(proto);
- try {
- tr.open();
- } catch (TTransportException e) {
- log.error("Unable to open transport", e);
- throw new RuntimeException(e);
- }
- cassandraTransports.set(tr);
- cassandraClients.set(client);
- }
- return client;
- }
-
- void discacrdCassandraConnection() {
- cassandraClients.remove();
- cassandraTransports.remove();
- }
-
- /*Broker CF Methods*/
-
-
- public int getDestinationCount() {
- try {
- ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), BROKER_KEY.string(), BROKER_DESTINATION_COUNT_COLUMN_PATH, consistencyLevel);
- return getInt(cosc.getColumn().getValue());
- } catch (NotFoundException e) {
- log.warn("Broker Destination Count not found, inserting 0");
- } catch (Exception e) {
- log.error("Exception in getDestinationCount", e);
- throw new RuntimeException(e);
- }
-
- try {
- insertDestinationCount(0);
- return 0;
- } catch (Exception e) {
- log.error("Exception in getDestinationCount while inserting 0", e);
- throw new RuntimeException(e);
- }
-
- }
-
- public void insertDestinationCount(int count) throws TException, TimedOutException, InvalidRequestException, UnavailableException {
- getCassandraConnection().insert(KEYSPACE.string(), BROKER_KEY.string(), BROKER_DESTINATION_COUNT_COLUMN_PATH, getBytes(count), timestamp(), consistencyLevel);
- }
-
- public BloomFilter getMessageIdFilterFor(ActiveMQDestination destination, long size) {
- BloomFilter bloomFilter = BloomFilter.getFilter(Math.max(size, 100000), 0.01d);
- byte[] start = new byte[0];
- byte[] end = new byte[0];
- long count = 0;
- while (count < size) {
- SlicePredicate slicePredicate = new SlicePredicate();
- SliceRange sliceRange = new SliceRange(start, end, false, 10000);
- slicePredicate.setSlice_range(sliceRange);
- List<ColumnOrSuperColumn> orSuperColumns = null;
- try {
- orSuperColumns = getCassandraConnection().get_slice(KEYSPACE.string(), getDestinationKey(destination), new ColumnParent(MESSAGE_TO_STORE_ID_FAMILY.string()), slicePredicate, consistencyLevel);
- for (ColumnOrSuperColumn orSuperColumn : orSuperColumns) {
- Column column = orSuperColumn.getColumn();
- start = column.getName();
- count++;
- bloomFilter.add(start);
- }
- } catch (Exception e) {
- log.error("Exception while populating bloom filter for :" + destination.getQualifiedName(), e);
- throw new RuntimeException(e);
- }
-
- }
- return bloomFilter;
- }
-
- /*Destination CF Methods*/
-
- public boolean createDestination(String name, boolean isTopic, AtomicInteger destinationCount) {
- ColumnPath columnPath = new ColumnPath(DESTINATIONS_FAMILY.string());
- columnPath.setColumn(DESTINATION_IS_TOPIC_COLUMN.bytes());
- try {
- getCassandraConnection().get(KEYSPACE.string(), name, columnPath, consistencyLevel);
- log.info("Destination {} Exists", name);
- return false;
- } catch (NotFoundException e) {
- log.warn("Destination {} not found, Creating, topic:{} ", name, isTopic);
- try {
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> mutation = map();
- List<Mutation> destinationMutations = list();
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_IS_TOPIC_COLUMN.string(), isTopic));
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.string(), 0L));
- mutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
- mutations.put(name, mutation);
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), ZERO, timestamp()));
- Map<String, List<Mutation>> brokerMutation = map();
- List<Mutation> brokerMutations = list();
- brokerMutations.add(getInsertOrUpdateColumnMutation(BROKER_DESTINATION_COUNT.bytes(), getBytes(destinationCount.incrementAndGet()), timestamp()));
- brokerMutation.put(BROKER_FAMILY.string(), brokerMutations);
- mutations.put(BROKER_KEY.string(), brokerMutation);
-
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
-
-
- return true;
- } catch (Exception e2) {
- destinationCount.decrementAndGet();
- throw new RuntimeException(e2);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-
- public Set<ActiveMQDestination> getDestinations() {
- SlicePredicate predicate = new SlicePredicate();
- predicate.setSlice_range(new SliceRange(new byte[0], new byte[0], false, Integer.MAX_VALUE));
- ColumnParent parent = new ColumnParent(DESTINATIONS_FAMILY.string());
- KeyRange keyRange = new KeyRange();
- keyRange.setStart_key(getString(new byte[0]));
- keyRange.setEnd_key(getString(new byte[0]));
- try {
- List<KeySlice> slices = getCassandraConnection().get_range_slices(KEYSPACE.string(), parent, predicate, keyRange, consistencyLevel);
-
- Set<ActiveMQDestination> destinations = set();
- for (KeySlice slice : slices) {
- destinations.add(ActiveMQDestination.createDestination(slice.getKey(), ActiveMQDestination.QUEUE_TYPE));
- }
- return destinations;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
-
- public void deleteQueue(ActiveMQDestination destination, AtomicInteger destinationCount) {
- try {
- String key = getDestinationKey(destination);
- getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(MESSAGES_FAMILY.string()), timestamp(), consistencyLevel);
- getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(DESTINATIONS_FAMILY.string()), timestamp(), consistencyLevel);
- getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(MESSAGE_TO_STORE_ID_FAMILY.string()), timestamp(), consistencyLevel);
- getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(STORE_IDS_IN_USE_FAMILY.string()), timestamp(), consistencyLevel);
- getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(SUBSCRIPTIONS_FAMILY.string()), timestamp(), consistencyLevel);
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> mutation = map();
- List<Mutation> brokerMutations = list();
- mutations.put(BROKER_KEY.string(), mutation);
- mutation.put(BROKER_FAMILY.string(), brokerMutations);
- brokerMutations.add(getInsertOrUpdateColumnMutation(BROKER_DESTINATION_COUNT.bytes(), getBytes(destinationCount.decrementAndGet()), timestamp()));
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
- } catch (Exception e) {
- destinationCount.incrementAndGet();
- log.error("Exception in deleteQueue", e);
- throw new RuntimeException(e);
- }
- }
-
- public void deleteTopic(ActiveMQTopic destination, AtomicInteger destinationCount) {
- deleteQueue(destination, destinationCount);
- }
-
-
- public DestinationMaxIds getMaxStoreId() {
-
- int destinations = getDestinations().size();
- if (destinations == 0) {
- return new DestinationMaxIds(null, 0, 0);
- }
- ColumnParent columnParent = new ColumnParent(DESTINATIONS_FAMILY.string());
- SlicePredicate slicePredicate = new SlicePredicate();
- slicePredicate.setColumn_names(Collections.singletonList(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes()));
- KeyRange keyRange = new KeyRange();
- keyRange.setStart_key("");
- keyRange.setEnd_key("");
- keyRange.setCount(destinations);
- try {
- List<KeySlice> cols = getCassandraConnection().get_range_slices(KEYSPACE.string(), columnParent, slicePredicate, keyRange, consistencyLevel);
- DestinationMaxIds max = new DestinationMaxIds(null, 0, 0);
- long storeVal = 0;
- long broker = 0;
- for (KeySlice col : cols) {
- String key = col.getKey();
- for (ColumnOrSuperColumn columnOrSuperColumn : col.getColumns()) {
- if (Arrays.equals(columnOrSuperColumn.getColumn().getName(), DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes())) {
- storeVal = getLong(columnOrSuperColumn.getColumn().getValue());
- } else if (Arrays.equals(columnOrSuperColumn.getColumn().getName(), DESTINATION_MAX_BROKER_SEQUENCE_COLUMN.bytes())) {
- broker = getLong(columnOrSuperColumn.getColumn().getValue());
- }
- }
- if (storeVal > max.getMaxStoreId()) {
- max = new DestinationMaxIds(ActiveMQDestination.createDestination(key, ActiveMQDestination.QUEUE_TYPE), storeVal, broker);
- }
- }
-
-
- return max;
- } catch (Exception e) {
- log.error("Error getting Max Store ID", e);
- throw new RuntimeException(e);
- }
-
- }
-
-
- private byte[] getMessageIndexKey(MessageId id) {
- return getBytes(getMessageIndexKeyString(id));
- }
-
- private String getMessageIndexKeyString(MessageId id) {
- return id.toString();
- }
-
- public long getStoreId(ActiveMQDestination destination, MessageId identity) {
- ColumnPath path = new ColumnPath(MESSAGE_TO_STORE_ID_FAMILY.string());
- path.setColumn(getMessageIndexKey(identity));
- String key = getDestinationKey(destination);
- try {
- ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), key, path, consistencyLevel);
- return getLong(cosc.getColumn().getValue());
- } catch (Exception e) {
- log.error("Exception in getStoreId", e);
- throw new RuntimeException(e);
- }
- }
-
- /*Messages CF Methods*/
-
- public byte[] getMessage(ActiveMQDestination destination, long storeId) {
- ColumnPath path = new ColumnPath(MESSAGES_FAMILY.string());
- path.setColumn(getBytes(storeId));
- try {
- ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
- byte[] messageBytes = cosc.getColumn().getValue();
- if (messageBytes.length == 0) {
- throw new NotFoundException();
- }
- return messageBytes;
- } catch (NotFoundException e) {
- log.error("Message Not Found");
- throw new RuntimeException(e);
- } catch (Exception e) {
- log.error("Exception getting message", e);
- throw new RuntimeException(e);
- }
- }
-
-
- public void saveMessage(ActiveMQDestination destination, long id, MessageId messageId, byte[] messageBytes, AtomicLong queueSize, BloomFilter duplicateDetector) {
- if (duplicateDetector.isPresent(getMessageIndexKey(messageId))) {
- ColumnPath path = new ColumnPath(MESSAGE_TO_STORE_ID_FAMILY.string());
- path.setColumn(getMessageIndexKey(messageId));
- try {
- ColumnOrSuperColumn column = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
- log.warn("Duplicate Message Save recieved from broker for {}...ignoring", messageId);
- return;
- } catch (NotFoundException e) {
- log.warn("NotFoundException while confirming duplicate, BloomFilter false positive, continuing");
- } catch (Exception e) {
- log.error("Exception whhile confiring duplicate detected by BloomFilter", e);
- }
-
- }
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> saveMutation = map();
-
- String cassandraKey = getDestinationKey(destination);
- mutations.put(cassandraKey, saveMutation);
-
- List<Mutation> messageMutations = list();
- List<Mutation> destinationMutations = list();
- List<Mutation> indexMutations = list();
- List<Mutation> storeIdsMutations = list();
- saveMutation.put(MESSAGES_FAMILY.string(), messageMutations);
- saveMutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
- saveMutation.put(MESSAGE_TO_STORE_ID_FAMILY.string(), indexMutations);
- saveMutation.put(STORE_IDS_IN_USE_FAMILY.string(), storeIdsMutations);
- log.debug("Saving message with id:{}", id);
- log.debug("Saving message with brokerSeq id:{}", messageId.getBrokerSequenceId());
- long current = timestamp();
- messageMutations.add(getInsertOrUpdateColumnMutation(getBytes(id), messageBytes, current));
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), getBytes(queueSize.incrementAndGet()), current));
-
-
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes(), getBytes(id), current));
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_BROKER_SEQUENCE_COLUMN.bytes(), getBytes(messageId.getBrokerSequenceId()), current));
-
- indexMutations.add(getInsertOrUpdateColumnMutation(getMessageIndexKey(messageId), getBytes(id), current));
- storeIdsMutations.add(getInsertOrUpdateColumnMutation(getBytes(id), new byte[1], current));
- try {
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
- duplicateDetector.add(getMessageIndexKey(messageId));
- } catch (Exception e) {
- queueSize.decrementAndGet();
- log.error("Exception savingMessage", e);
- throw new RuntimeException(e);
- }
- }
-
- public void deleteMessage(ActiveMQDestination destination, MessageId id, AtomicLong queueSize) {
- long column = getStoreId(destination, id);
- long current = timestamp();
- Mutation delete = getDeleteColumnMutation(getBytes(column), current);
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> saveMutation = map();
-
- String cassandraKey = getDestinationKey(destination);
- mutations.put(cassandraKey, saveMutation);
-
- List<Mutation> messageMutations = list();
-
- List<Mutation> indexMutations = list();
- List<Mutation> destinationMutations = list();
- List<Mutation> storeIdsMutations = list();
- saveMutation.put(MESSAGES_FAMILY.string(), messageMutations);
- saveMutation.put(STORE_IDS_IN_USE_FAMILY.string(), storeIdsMutations);
- saveMutation.put(MESSAGE_TO_STORE_ID_FAMILY.string(), indexMutations);
- saveMutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
- messageMutations.add(delete);
- destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), getBytes(queueSize.decrementAndGet()), current));
- indexMutations.add(getDeleteColumnMutation(getMessageIndexKey(id), current));
- storeIdsMutations.add(getDeleteColumnMutation(getBytes(column), current));
- try {
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
- log.debug("Deleted Message {}", column);
- } catch (Exception e) {
- queueSize.incrementAndGet();
- log.error("Unable to delete message", e);
- }
- }
-
- public void deleteAllMessages(ActiveMQDestination destination, AtomicLong queueSize) {
- ColumnPath path = new ColumnPath(MESSAGES_FAMILY.string());
- try {
- getCassandraConnection().remove(KEYSPACE.string(), getDestinationKey(destination), path, timestamp(), consistencyLevel);
- queueSize.set(0);
- } catch (Exception e) {
- log.error("Unable to delete all messages", e);
- }
- }
-
- public int getMessageCount(ActiveMQDestination destination) {
- try {
- ColumnOrSuperColumn col = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), DESTINATION_QUEUE_SIZE_COLUMN_PATH, consistencyLevel);
- byte[] countBytes = col.getColumn().getValue();
- long count = getLong(countBytes);
- if (count > Integer.MAX_VALUE) {
- throw new IllegalStateException("Count Higher than Max int, something wrong");
- } else {
- return Long.valueOf(count).intValue();
- }
- } catch (Exception e) {
- log.error("Error during getMessageCount for :" + getDestinationKey(destination), e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
- }
-
- public List<byte[]> recoverMessages(ActiveMQDestination destination, AtomicLong batchPoint, int maxReturned) {
- if (log.isDebugEnabled()) {
- log.debug("recoverMessages({},{},{})", new Object[]{getDestinationKey(destination), batchPoint.get(), maxReturned});
- }
- if (maxReturned < 1) {
- throw new IllegalArgumentException("cant get less than one result");
- }
- String key = getDestinationKey(destination);
- byte[] start = batchPoint.get() == -1 ? new byte[0] : getBytes(batchPoint.get());
- byte[] end = new byte[0];
- List<byte[]> messages = list();
- recoverMessagesFromTo(key, start, end, maxReturned, messages, maxReturned);
- return messages;
- }
-
-
- private void recoverMessagesFromTo(String key, byte[] start, byte[] end, int limit, List<byte[]> messages, int messagelimit) {
-
- if (log.isDebugEnabled()) {
- log.debug("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
- }
- SlicePredicate predicate = new SlicePredicate();
- SliceRange range = new SliceRange(start, end, false, limit);
- predicate.setSlice_range(range);
-
- List<ColumnOrSuperColumn> cols;
-
- try {
- cols = getCassandraConnection().get_slice(KEYSPACE.string(), key, new ColumnParent(MESSAGES_FAMILY.string()), predicate, consistencyLevel);
- } catch (InvalidRequestException e) {
- log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
- log.error("InvalidRequestException", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- } catch (UnavailableException e) {
- log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
- log.error("UnavailableException", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- } catch (TimedOutException e) {
- log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
- log.error("TimedOutException", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- } catch (TException e) {
- log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
- log.error("TException", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
-
- for (ColumnOrSuperColumn col : cols) {
- Column c = col.getColumn();
-
- messages.add(c.getValue());
- if (log.isDebugEnabled()) {
- log.debug("recovered message with id: {}", safeGetLong(c.getName()));
- }
- if (messages.size() >= messagelimit) {
- break;
- }
- }
-
-
- }
-
- /*Subscription CF Messages*/
-
- public void addSubscription(ActiveMQDestination destination, SubscriptionInfo subscriptionInfo, long ack) {
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> saveMutation = map();
- List<Mutation> mutationList = list();
- Mutation insert = new Mutation();
- mutationList.add(insert);
- mutations.put(getDestinationKey(destination), saveMutation);
- saveMutation.put(SUBSCRIPTIONS_FAMILY.string(), mutationList);
-
- ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
- insert.setColumn_or_supercolumn(columnOrSuperColumn);
- List<Column> cols = list();
- SuperColumn superColumn = new SuperColumn(getBytes(getSubscriptionSuperColumnName(subscriptionInfo)), cols);
- columnOrSuperColumn.setSuper_column(superColumn);
-
- byte[] selector = nullSafeGetBytes(subscriptionInfo.getSelector());
- byte[] subscribedDestination = getBytes(getDestinationKey(subscriptionInfo.getSubscribedDestination()));
- byte[] lastAck = getBytes(ack);
- long current = timestamp();
- if (subscriptionInfo.getSelector() != null) {
- cols.add(getColumn(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), selector, current));
- }
- cols.add(getColumn(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), subscribedDestination, current));
- cols.add(getColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), lastAck, current));
- try {
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
- log.debug("created subscription on {} for client {} subscription {} with selector {} and lastAck {}",
- new Object[]{
- getDestinationKey(destination),
- subscriptionInfo.getClientId(),
- subscriptionInfo.getSubscriptionName(),
- subscriptionInfo.getSelector(),
- ack
- });
- } catch (Exception e) {
- log.error("Exception addingSubscription:", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
-
- }
-
- public SubscriptionInfo lookupSubscription(ActiveMQDestination destination, String clientId, String subscriptionName) {
- ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
- path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)));
- try {
- ColumnOrSuperColumn columnOrSuperColumn = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
- log.debug("retrieved supercolumn of {} for client {} subscriptionName {}", new Object[]{getDestinationKey(destination), clientId, subscriptionName});
- SuperColumn superColumn = columnOrSuperColumn.getSuper_column();
- SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
- subscriptionInfo.setClientId(clientId);
- subscriptionInfo.setSubscriptionName(subscriptionName);
- subscriptionInfo.setDestination(destination);
- byte type = destination.isTopic() ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE;
- for (Column column : superColumn.getColumns()) {
- if (Arrays.equals(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), column.getName())) {
- //skip
- } else if (Arrays.equals(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), column.getName())) {
- String selector = getString(column.getValue());
- subscriptionInfo.setSelector(selector);
- } else if (Arrays.equals(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), column.getName())) {
- String name = getString(column.getValue());
- subscriptionInfo.setSubscribedDestination(ActiveMQDestination.createDestination(name, type));
- } else {
- log.error("Recieved unexpected column from Subscription Super Column {}", getString(column.getName()));
- }
- }
-
- return subscriptionInfo;
- } catch (NotFoundException e) {
- log.warn("lookupSubsctription({},{}) found no subscription, returning null", clientId, subscriptionName);
- return null;
- } catch (Exception e) {
- log.error("Exception in lookupSubscription", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
-
- }
-
- public SubscriptionInfo[] lookupAllSubscriptions(ActiveMQDestination destination) {
- ColumnParent path = new ColumnParent(SUBSCRIPTIONS_FAMILY.string());
- SlicePredicate slicePredicate = new SlicePredicate();
- SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], false, Integer.MAX_VALUE);
- slicePredicate.setSlice_range(sliceRange);
-
- try {
- List<ColumnOrSuperColumn> coscs = getCassandraConnection().get_slice(KEYSPACE.string(), getDestinationKey(destination), path, slicePredicate, consistencyLevel);
- List<SubscriptionInfo> info = new ArrayList<SubscriptionInfo>(coscs.size());
- for (ColumnOrSuperColumn columnOrSuperColumn : coscs) {
- SuperColumn superColumn = columnOrSuperColumn.getSuper_column();
- SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
- subscriptionInfo.setClientId(getClientIdFromSubscriptionSuperColumnName(superColumn));
- subscriptionInfo.setSubscriptionName(getSubscriptionNameFromSubscriptionSuperColumnName(superColumn));
- subscriptionInfo.setDestination(destination);
- byte type = destination.isTopic() ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE;
- for (Column column : superColumn.getColumns()) {
- if (Arrays.equals(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), column.getName())) {
- //skip
- } else if (Arrays.equals(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), column.getName())) {
- String selector = getString(column.getValue());
- subscriptionInfo.setSelector(selector);
- } else if (Arrays.equals(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), column.getName())) {
- String name = getString(column.getValue());
- subscriptionInfo.setSubscribedDestination(ActiveMQDestination.createDestination(name, type));
- } else {
- log.error("Recieved unexpected column from Subscription Super Column {}", getString(column.getName()));
- }
- }
- info.add(subscriptionInfo);
- }
-
-
- return info.toArray(new SubscriptionInfo[info.size()]);
- } catch (Exception e) {
- log.error("Exception in lookupSubscription", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
-
- }
-
- public void acknowledge(ActiveMQDestination destination, String clientId, String subscriptionName, MessageId messageId) {
-
-
- Map<String, Map<String, List<Mutation>>> mutations = map();
- Map<String, List<Mutation>> saveMutation = map();
- List<Mutation> mutationList = list();
- Mutation insert = new Mutation();
- mutationList.add(insert);
- mutations.put(getDestinationKey(destination), saveMutation);
- saveMutation.put(SUBSCRIPTIONS_FAMILY.string(), mutationList);
-
- ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
- insert.setColumn_or_supercolumn(columnOrSuperColumn);
- List<Column> cols = list();
- SuperColumn superColumn = new SuperColumn(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)), cols);
- columnOrSuperColumn.setSuper_column(superColumn);
-
- long lastAckStoreId = getStoreId(destination, messageId);
- byte[] lastAck = getBytes(lastAckStoreId);
- long timestamp = timestamp();
- cols.add(getColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), lastAck, timestamp));
-
- try {
- getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
- log.debug("Acked {} for client {} sub {}", new Object[]{messageId.getBrokerSequenceId(), clientId, subscriptionName});
- } catch (Exception e) {
- log.error("Exception acking:", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
-
- }
-
- public void deleteSubscription(ActiveMQDestination destination, String clientId, String subscriptionName) {
- ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
- path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)));
- try {
- getCassandraConnection().remove(KEYSPACE.string(), getDestinationKey(destination), path, timestamp(), consistencyLevel);
- log.debug("deletedSubscription on {} for client {} subscriptionName {}", new Object[]{getDestinationKey(destination), clientId, subscriptionName});
- } catch (Exception e) {
- log.error("Exception in deleteSubscription", e);
- discacrdCassandraConnection();
- throw new RuntimeException(e);
- }
- }
-
-
- public int getMessageCountFrom(ActiveMQDestination destination, long storeId) {
- SlicePredicate predicate = new SlicePredicate();
- SliceRange range = new SliceRange(getBytes(storeId), new byte[0], false, Integer.MAX_VALUE);
- predicate.setSlice_range(range);
- try {
- List<ColumnOrSuperColumn> coscs = getCassandraConnection().get_slice(KEYSPACE.string(), getDestinationKey(destination), new ColumnParent(STORE_IDS_IN_USE_FAMILY.string()), predicate, consistencyLevel);
- return coscs.size();
- } catch (Exception e) {
- log.error("Exception in getMessageCountFrom {}:{}", getDestinationKey(destination), storeId);
- log.error("Ex:", e);
- throw new RuntimeException(e);
- }
- }
-
- public int getLastAckStoreId(ActiveMQDestination destination, String clientid, String subsriptionName) {
- ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
- path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientid, subsriptionName)));
- path.setColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes());
- try {
- ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
- long result = getLong(cosc.getColumn().getValue());
- return Long.valueOf(result).intValue();
- } catch (NotFoundException e) {
- log.debug("LastAckStoreId not found, returning 0");
- return 0;
- } catch (Exception e) {
- log.error("Exception in getLastAckStoreId, {} {} {}", new Object[]{getDestinationKey(destination), clientid, subsriptionName});
- log.error("Ex:", e);
- throw new RuntimeException(e);
- }
- }
-
-
- private static String getSubscriptionSuperColumnName(SubscriptionInfo info) {
- return info.getClientId() + SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER + nullSafeGetSubscriptionName(info);
- }
-
- private static String nullSafeGetSubscriptionName(SubscriptionInfo info) {
- return info.getSubscriptionName() != null ? info.getSubscriptionName() : SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME;
- }
-
- private static String getSubscriptionSuperColumnName(String clientId, String subscriptionName) {
- return clientId + SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER + (subscriptionName != null ? subscriptionName : SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME);
- }
-
- public static String getSubscriberId(String clientId, String subscriptionName) {
- return getSubscriptionSuperColumnName(clientId, subscriptionName);
- }
-
- private static String getClientIdFromSubscriptionSuperColumnName(SuperColumn superColumn) {
- String key = getString(superColumn.getName());
- String[] split = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER);
- return split[0];
- }
-
- private static String getSubscriptionNameFromSubscriptionSuperColumnName(SuperColumn superColumn) {
- String key = getString(superColumn.getName());
- String[] split = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER);
- if (split[1].equals(SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME)) {
- return null;
- } else {
- return split[1];
- }
- }
-
- /*util*/
-
- private static <K, V> Map<K, V> map() {
- return new HashMap<K, V>();
- }
-
- private static <I> List<I> list() {
- return new ArrayList<I>();
- }
-
- private static <I> Set<I> set() {
- return new HashSet<I>();
- }
-
-
-}
View
50 src/main/java/org/apache/activemq/store/cassandra/CassandraIdentifier.java
@@ -1,50 +0,0 @@
-package org.apache.activemq.store.cassandra;
-
-/**
- *
- */
-public enum CassandraIdentifier {
-
- KEYSPACE("MessageStore"),
-
- BROKER_FAMILY("Broker"),
- BROKER_KEY("Broker"),
- BROKER_DESTINATION_COUNT("destination-count"),
-
-
- DESTINATIONS_FAMILY("Destinations"),
- DESTINATION_IS_TOPIC_COLUMN("isTopic"),
- DESTINATION_MAX_STORE_SEQUENCE_COLUMN("max-store-sequence"),
- DESTINATION_MAX_BROKER_SEQUENCE_COLUMN("max-broker-sequence"),
- DESTINATION_QUEUE_SIZE_COLUMN("queue-size"),
-
-
- MESSAGES_FAMILY("Messages"),
-
- MESSAGE_TO_STORE_ID_FAMILY("MessageIdToStoreId"),
-
- STORE_IDS_IN_USE_FAMILY("StoreIdsInUse"),
-
-
- SUBSCRIPTIONS_FAMILY("Subscriptions"),
- SUBSCRIPTIONS_SELECTOR_SUBCOLUMN("selector"),
- SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN("lastMessageAck"),
- SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN("subscribedDestination");
-
- private byte[] bytes;
- private String string;
-
- CassandraIdentifier(String id) {
- string = id;
- bytes = CassandraUtils.getBytes(string);
- }
-
- public String string() {
- return string;
- }
-
-
- public byte[] bytes() {
- return bytes;
- }
-}
View
6 src/main/java/org/apache/activemq/store/cassandra/CassandraMessageStore.java
@@ -113,7 +113,7 @@ public void start() throws Exception {
int count = getAdapter().getCassandra().getMessageCount(destination);
queueSize.set(count);
if (log.isDebugEnabled()) {
- log.debug("Destination: {} has {} ", CassandraUtils.getDestinationKey(destination), queueSize.get());
+ log.debug("Destination: {} has {} ", CassandraClientUtil.getDestinationKey(destination), queueSize.get());
}
duplicateDetector = getAdapter().getCassandra().getMessageIdFilterFor(destination, queueSize.get());
}
@@ -121,8 +121,8 @@ public void start() throws Exception {
public void stop() throws Exception {
if (log.isDebugEnabled()) {
log.debug("stop()");
- log.debug("Destination: {} has {} ", CassandraUtils.getDestinationKey(destination), queueSize.get());
- log.debug("Store: {} has {} ", CassandraUtils.getDestinationKey(destination), getAdapter().getCassandra().getMessageCount(destination));
+ log.debug("Destination: {} has {} ", CassandraClientUtil.getDestinationKey(destination), queueSize.get());
+ log.debug("Store: {} has {} ", CassandraClientUtil.getDestinationKey(destination), getAdapter().getCassandra().getMessageCount(destination));
}
}
View
11 src/main/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapter.java
@@ -71,12 +71,12 @@ public WireFormat getWireFormat() {
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
CassandraMessageStore store = queues.get(destination);
if (store == null) {
- cassandra.createDestination(CassandraUtils.getDestinationKey(destination), false, destinationCount);
+ cassandra.createDestination(CassandraClientUtil.getDestinationKey(destination), false, destinationCount);
store = new CassandraMessageStore(this, destination);
try {
store.start();
} catch (Exception e) {
- log.error("Error Starting queue:" + CassandraUtils.getDestinationKey(destination), e);
+ log.error("Error Starting queue:" + CassandraClientUtil.getDestinationKey(destination), e);
throw new IOException(e);
}
queues.putIfAbsent(destination, store);
@@ -88,12 +88,12 @@ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IO
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
CassandraTopicMessageStore store = topics.get(destination);
if (store == null) {
- cassandra.createDestination(CassandraUtils.getDestinationKey(destination), true, destinationCount);
+ cassandra.createDestination(CassandraClientUtil.getDestinationKey(destination), true, destinationCount);
store = new CassandraTopicMessageStore(this, destination);
try {
store.start();
} catch (Exception e) {
- log.error("Error Starting queue:" + CassandraUtils.getDestinationKey(destination), e);
+ log.error("Error Starting queue:" + CassandraClientUtil.getDestinationKey(destination), e);
throw new IOException(e);
}
@@ -177,6 +177,7 @@ public long size() {
public void start() throws Exception {
//Zookeeper master election
+ cassandra.start();
if (masterElector != null) {
masterElector.setMasterLostHandler(new Runnable() {
@Override
@@ -228,9 +229,11 @@ private void isMaster() throws Exception {
public void stop() throws Exception {
+
if (masterElector != null) {
masterElector.stop();
}
+ cassandra.stop();
}
View
4 src/main/java/org/apache/activemq/store/cassandra/CassandraTopicMessageStore.java
@@ -50,7 +50,7 @@ public void recoverSubscription(String clientId, String subscriptionName, Messag
@Override
public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
- String subcriberId = CassandraClient.getSubscriberId(clientId, subscriptionName);
+ String subcriberId = getAdapter().getCassandra().getSubscriberId(clientId, subscriptionName);
AtomicLong last = subscriberLastMessageMap.get(subcriberId);
if (last == null) {
long lastAcked = getAdapter().getCassandra().getLastAckStoreId(getDestination(), clientId, subscriptionName);
@@ -81,7 +81,7 @@ public void recoverNextMessages(String clientId, String subscriptionName, int ma
@Override
public void resetBatching(String clientId, String subscriptionName) {
- String id = CassandraClient.getSubscriberId(clientId, subscriptionName);
+ String id = getAdapter().getCassandra().getSubscriberId(clientId, subscriptionName);
subscriberLastMessageMap.remove(id);
}
View
164 src/main/java/org/apache/activemq/store/cassandra/CassandraUtils.java
@@ -1,164 +0,0 @@
-package org.apache.activemq.store.cassandra;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.cassandra.thrift.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.*;
-import java.util.Collections;
-
-
-public class CassandraUtils {
-
- static Logger log = LoggerFactory.getLogger(CassandraUtils.class);
-
- public static String getString(byte[] bytes) {
- try {
- return new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
-
- }
-
- public static long getLong(byte[] bytes) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- LongBuffer lBuffer = byteBuffer.asLongBuffer();
- return lBuffer.get();
- }
-
- public static byte[] getBytes(long num) {
- byte[] bArray = new byte[8];
- ByteBuffer bBuffer = ByteBuffer.wrap(bArray);
- LongBuffer lBuffer = bBuffer.asLongBuffer();
- lBuffer.put(num);
- return bArray;
- }
-
- public static int getInt(byte[] bytes) {
- ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
- IntBuffer iBuffer = byteBuffer.asIntBuffer();
- return iBuffer.get();
- }
-
- public static byte[] getBytes(int num) {
- byte[] bArray = new byte[4];
- ByteBuffer byteBuffer = ByteBuffer.wrap(bArray);
- IntBuffer iBuffer = byteBuffer.asIntBuffer();
- iBuffer.put(num);
- return bArray;
- }
-
-
- public static long safeGetLong(byte[] bytes) {
- if (bytes.length != 8) {
- log.debug("bytes length was {}, not 8, returning -1", bytes.length);
- return -1L;
- } else {
- return getLong(bytes);
- }
- }
-
- public static boolean getBoolean(byte[] bytes) {
- return Boolean.parseBoolean(getString(bytes));
- }
-
-
- public static byte[] getBytes(String string) {
- try {
- return string.getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Return empty byte array if string is null or ""
- *
- * @param string
- * @return
- */
- public static byte[] nullSafeGetBytes(String string) {
- if (string == null || "".equals(string)) {
- return new byte[0];
- } else {
- return getBytes(string);
- }
- }
-
-
- private static byte[] getBytes(boolean bool) {
- return getBytes(Boolean.toString(bool));
- }
-
-
- public static long timestamp() {
- return System.currentTimeMillis();
- }
-
- public static Mutation getInsertOrUpdateColumnMutation(byte[] name, byte[] val, Long timestamp) {
- if (timestamp == null) {
- timestamp = timestamp();
- }
- Mutation insert = new Mutation();
- ColumnOrSuperColumn c = new ColumnOrSuperColumn();
- c.setColumn(getColumn(name, val, timestamp));
- insert.setColumn_or_supercolumn(c);
- return insert;
- }
-
- public static Mutation getDeleteColumnMutation(byte[] column, long timestamp) {
- Mutation mutation = new Mutation();
- Deletion deletion = new Deletion(timestamp);
- SlicePredicate predicate = new SlicePredicate();
- predicate.setColumn_names(Collections.singletonList(column));
- deletion.setPredicate(predicate);
- mutation.setDeletion(deletion);
- return mutation;
- }
-
-
-
- public static Mutation getInsertOrUpdateColumnMutation(String name, String val) {
- return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(val), null);
- }
-
- public static Mutation getInsertOrUpdateColumnMutation(String name, long val) {
- return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(val), null);
- }
-
- public static Mutation getInsertOrUpdateColumnMutation(String name, boolean val) {
- return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(Boolean.toString(val)), null);
- }
-
- public static Column getColumn(byte[] name, byte[] val) {
- return new Column(name, val, timestamp());
- }
-
- public static Column getColumn(byte[] name, byte[] val, long timestamp) {
- return new Column(name, val, timestamp);
- }
-
- public static Mutation getInsertOrUpdateSuperColumnMutation(long supername, String name, byte[] val) {
- return getInsertOrUpdateSuperColumnMutation(getBytes(supername), getBytes(name), val);
- }
-
- public static Mutation getInsertOrUpdateSuperColumnMutation(byte[] supername, byte[] name, byte[] val) {
- Mutation mutation = new Mutation();
- ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
- SuperColumn superColumn = new SuperColumn();
- superColumn.setName(supername);
- Column column = getColumn(name, val);
- superColumn.addToColumns(column);
- columnOrSuperColumn.setSuper_column(superColumn);
- mutation.setColumn_or_supercolumn(columnOrSuperColumn);
- return mutation;
- }
-
- public static String getDestinationKey(ActiveMQDestination destination) {
- String key = destination.getQualifiedName();
- return key;
- }
-}
View
262 src/main/scala/org/apache/activemq/store/cassandra/CassandraClient.scala
@@ -1,27 +1,28 @@
-package org.apache.activemq.store.cassandra.scala
+package org.apache.activemq.store.cassandra
import com.shorrockin.cascal.session._
import com.shorrockin.cascal.utils.Conversions._
import collection.jcl.Conversions._
import reflect.BeanProperty
import CassandraClient._
-import CassandraClient.Id._
import org.apache.cassandra.utils.BloomFilter
import grizzled.slf4j.Logger
import org.apache.activemq.store.cassandra.{DestinationMaxIds => Max}
import org.apache.activemq.store.cassandra._
-import org.apache.cassandra.thrift.NotFoundException
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import org.apache.activemq.command.{SubscriptionInfo, MessageId, ActiveMQDestination}
import collection.jcl.{ArrayList, HashSet, Set}
import com.shorrockin.cascal.model.{SuperColumn, StandardKey, Key, Column}
+import collection.mutable.ListBuffer
+import org.apache.cassandra.thrift.{ConsistencyLevel, NotFoundException}
class CassandraClient() {
@BeanProperty var cassandraHost: String = _
- @BeanProperty var cassandraPort: int = _
- @BeanProperty var cassandraTimeout: int = _
+ @BeanProperty var cassandraPort: Int = _
+ @BeanProperty var cassandraTimeout: Int = _
+ @BeanProperty var consistencyLevel: ConsistencyLevel = _
+
- val logger = Logger(this.getClass)
protected var pool: SessionPool = null
@@ -44,7 +45,7 @@ class CassandraClient() {
}
}
- def getDestinationCount(): int = {
+ def getDestinationCount(): Int = {
withSession {
session =>
session.get(KEYSPACE \ BROKER_FAMILY \ BROKER_KEY \ BROKER_DESTINATION_COUNT) match {
@@ -57,20 +58,20 @@ class CassandraClient() {
}
}
- def insertDestinationCount(count: int) = {
+ def insertDestinationCount(count: Int) = {
withSession {
session =>
session.insert(KEYSPACE \ BROKER_FAMILY \ BROKER_KEY \ (BROKER_DESTINATION_COUNT, count))
}
}
- def getMessageIdFilterFor(destination: ActiveMQDestination, size: long): BloomFilter = {
+ def getMessageIdFilterFor(destination: ActiveMQDestination, size: Long): BloomFilter = {
val filterSize = Math.max(size, 10000)
val bloomFilter = BloomFilter.getFilter(filterSize, 0.01d);
var start = ""
val end = ""
- var counter: int = 0
- while (counter < filterSize) {
+ var counter: Int = 0
+ while (counter < size) {
withSession {
session =>
val cols = session.list(KEYSPACE \ MESSAGE_TO_STORE_ID_FAMILY \ destination, RangePredicate(start, end))
@@ -86,7 +87,7 @@ class CassandraClient() {
}
- def createDestination(name: String, isTopic: boolean, destinationCount: AtomicInteger): boolean = {
+ def createDestination(name: String, isTopic: Boolean, destinationCount: AtomicInteger): Boolean = {
withSession {
session =>
session.get(KEYSPACE \ DESTINATIONS_FAMILY \ name \ DESTINATION_IS_TOPIC_COLUMN) match {
@@ -96,7 +97,7 @@ class CassandraClient() {
case None =>
val topic = KEYSPACE \ DESTINATIONS_FAMILY \ name \ (DESTINATION_IS_TOPIC_COLUMN, isTopic)
val maxStore = KEYSPACE \ DESTINATIONS_FAMILY \ name \ (DESTINATION_MAX_STORE_SEQUENCE_COLUMN, 0L)
- val queueSize = KEYSPACE \ DESTINATIONS_FAMILY \ name \ (DESTINATION_QUEUE_SIZE_COLUMN, 0)
+ val queueSize = KEYSPACE \ DESTINATIONS_FAMILY \ name \ (DESTINATION_QUEUE_SIZE_COLUMN, 0L)
val destCount = KEYSPACE \ BROKER_FAMILY \ BROKER_KEY \ (BROKER_DESTINATION_COUNT, destinationCount.incrementAndGet)
try {
session.batch(Insert(topic) :: Insert(maxStore) :: Insert(queueSize) :: Insert(destCount))
@@ -112,12 +113,12 @@ class CassandraClient() {
}
- def getDestinations(): Set[ActiveMQDestination] = {
+ def getDestinations(): java.util.Set[ActiveMQDestination] = {
val destinations = new HashSet[ActiveMQDestination]
withSession {
session =>
session.list(KEYSPACE \ DESTINATIONS_FAMILY, KeyRange("", "", 10000)).foreach {
- case (key, colomns) => {
+ case (key, cols) => {
destinations.add(key.value)
}
}
@@ -130,14 +131,14 @@ class CassandraClient() {
def deleteQueue(destination: ActiveMQDestination, destinationCount: AtomicInteger): Unit = {
withSession {
session =>
- val msgs = KEYSPACE \ MESSAGES_FAMILY \ destination
- val dest = KEYSPACE \ DESTINATIONS_FAMILY \ destination
- val mids = KEYSPACE \ MESSAGE_TO_STORE_ID_FAMILY \ destination
- val sids = KEYSPACE \ STORE_IDS_IN_USE_FAMILY \ destination
- val subs = KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination
- val count = KEYSPACE \ BROKER_FAMILY \ BROKER_KEY \ (BROKER_DESTINATION_COUNT, destinationCount.decrementAndGet)
+ session.remove(KEYSPACE \ MESSAGES_FAMILY \ destination)
+ session.remove(KEYSPACE \ DESTINATIONS_FAMILY \ destination)
+ session.remove(KEYSPACE \ MESSAGE_TO_STORE_ID_FAMILY \ destination)
+ session.remove(KEYSPACE \ STORE_IDS_IN_USE_FAMILY \ destination)
+ session.remove(KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination)
try {
- session.batch(Delete(msgs) :: Delete(dest) :: Delete(mids) :: Delete(sids) :: Delete(subs) :: Insert(count))
+ val count = KEYSPACE \ BROKER_FAMILY \ BROKER_KEY \ (BROKER_DESTINATION_COUNT, destinationCount.decrementAndGet)
+ session.insert(count)
} catch {
case e: RuntimeException =>
destinationCount.incrementAndGet
@@ -157,8 +158,8 @@ class CassandraClient() {
if (destinations == 0) {
return max;
}
- var storeVal: long = 0
- var broker: long = 0
+ var storeVal: Long = 0
+ var broker: Long = 0
withSession {
session =>
session.list(KEYSPACE \ DESTINATIONS_FAMILY, new KeyRange("", "", 10000), ColumnPredicate(
@@ -182,7 +183,7 @@ class CassandraClient() {
max
}
- def getStoreId(destination: ActiveMQDestination, id: MessageId): long = {
+ def getStoreId(destination: ActiveMQDestination, id: MessageId): Long = {
withSession {
session =>
session.get(KEYSPACE \ MESSAGE_TO_STORE_ID_FAMILY \ destination \ id.toString) match {
@@ -195,20 +196,20 @@ class CassandraClient() {
}
}
- def getMessage(destination: ActiveMQDestination, storeId: long): Array[byte] = {
+ def getMessage(destination: ActiveMQDestination, storeId: Long): Array[Byte] = {
withSession {
session =>
session.get(KEYSPACE \ MESSAGES_FAMILY \ destination \ storeId) match {
case Some(x) =>
x.value
case None =>
- logger.error({"Message Not Found for destination:%s id:%i".format(destination, storeId)})
+ logger.error({"Message Not Found for destination:%s id:%s".format(destination, storeId)})
throw new NotFoundException;
}
}
}
- def saveMessage(destination: ActiveMQDestination, id: long, messageId: MessageId, message: Array[byte], queueSize: AtomicLong, duplicateDetector: BloomFilter): Unit = {
+ def saveMessage(destination: ActiveMQDestination, id: Long, messageId: MessageId, message: Array[Byte], queueSize: AtomicLong, duplicateDetector: BloomFilter): Unit = {
withSession {
session =>
if (duplicateDetector.isPresent(messageId.toString)) {
@@ -223,16 +224,18 @@ class CassandraClient() {
}
logger.debug({"Saving message with id:%d".format(id)});
+ logger.debug({"Saving message with messageId:%s".format(messageId.toString)});
logger.debug({"Saving message with brokerSeq id:%d".format(messageId.getBrokerSequenceId())});
val mesg = KEYSPACE \ MESSAGES_FAMILY \ destination \ (id, message)
- val destQ = KEYSPACE \ DESTINATIONS_FAMILY \ destination \ (DESTINATION_QUEUE_SIZE_COLUMN, queueSize.incrementAndGet)
+ val destQ = KEYSPACE \ DESTINATIONS_FAMILY \ destination \ (DESTINATION_QUEUE_SIZE_COLUMN, bytes(queueSize.incrementAndGet))
val destStore = KEYSPACE \ DESTINATIONS_FAMILY \ destination \ (DESTINATION_MAX_STORE_SEQUENCE_COLUMN, id)
val destBrok = KEYSPACE \ DESTINATIONS_FAMILY \ destination \ (DESTINATION_MAX_BROKER_SEQUENCE_COLUMN, messageId.getBrokerSequenceId)
val idx = KEYSPACE \ MESSAGE_TO_STORE_ID_FAMILY \ destination \ (messageId.toString, id)
val storeId = KEYSPACE \ STORE_IDS_IN_USE_FAMILY \ destination \ (id, "")
try {
session.batch(Insert(mesg) :: Insert(destQ) :: Insert(destStore) :: Insert(destBrok) :: Insert(idx) :: Insert(storeId));
+ duplicateDetector.add(messageId.toString)
} catch {
case e: RuntimeException =>
queueSize.decrementAndGet
@@ -271,41 +274,43 @@ class CassandraClient() {
}
}
- def getMessageCount(destination: ActiveMQDestination): long = {
+ def getMessageCount(destination: ActiveMQDestination): Int = {
withSession {
session =>
session.get(KEYSPACE \ DESTINATIONS_FAMILY \ destination \ DESTINATION_QUEUE_SIZE_COLUMN) match {
case Some(x) =>
- x.value
+ long(x.value).intValue
case None =>
throw new RuntimeException("Count not found for destination" + destination);
}
}
}
- def recoverMessages(destination: ActiveMQDestination, batchPoint: AtomicLong, maxReturned: int): java.util.List[Array[byte]] = {
- var start: String = ""
+ def recoverMessages(destination: ActiveMQDestination, batchPoint: AtomicLong, maxReturned: Int): java.util.List[Array[Byte]] = {
+ logger.debug({"recoverMessages(%s, %s,%s)".format(destination, batchPoint, maxReturned)})
+ var start: Array[Byte] = new Array[Byte](0)
if (batchPoint.get != -1) {
start = batchPoint.get
}
- val end = ""
- val messages = new ArrayList[Array[byte]]
- recoverMessagesFromTo(destination, start, end, maxReturned, messages, maxReturned)
+ val end: Array[Byte] = new Array[Byte](0)
+ val messages = new ArrayList[Array[Byte]]
+ recoverMessagesFromTo(destination, start, end, maxReturned, messages)
messages
}
- private def recoverMessagesFromTo(key: String, start: String, end: String, limit: int, messages: ArrayList[Array[byte]], messagelimit: int): Unit = {
+ private def recoverMessagesFromTo(key: String, start: Array[Byte], end: Array[Byte], limit: Int, messages: ArrayList[Array[Byte]]): Unit = {
+ logger.debug({"recoverMessagesFrom(%s,%s,%s,%s,%s)".format(key, start, end, limit, messages.length)})
withSession {
session =>
val range = RangePredicate(Some(start), Some(end), Order.Ascending, Some(limit))
session.list(KEYSPACE \ MESSAGES_FAMILY \ key, range, Consistency.Quorum).foreach {
col =>
- if (messages.size < messagelimit) messages.add(col.value)
+ if (messages.size < limit) messages.add(col.value)
}
}
}
- def addSubscription(destination: ActiveMQDestination, subscriptionInfo: SubscriptionInfo, ack: long): Unit = {
+ def addSubscription(destination: ActiveMQDestination, subscriptionInfo: SubscriptionInfo, ack: Long): Unit = {
withSession {
session =>
val supercolumnName = subscriptionSupercolumn(subscriptionInfo)
@@ -328,21 +333,123 @@ class CassandraClient() {
subscriptionInfo.setClientId(clientId)
subscriptionInfo.setSubscriptionName(subscriptionName)
subscriptionInfo.setDestination(destination)
- var dtype: Byte = if (destination.isTopic) ActiveMQDestination.TOPIC_TYPE else ActiveMQDestination.QUEUE_TYPE
- session.get(KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination \ getSubscriptionSuperColumnName(clientId, subscriptionName))
+ session.get(KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination \ getSubscriptionSuperColumnName(clientId, subscriptionName)) match {
+ case Some(seq) => {
+ seq.foreach {
+ column => {
+ string(column.name) match {
+ case SUBSCRIPTIONS_SELECTOR_SUBCOLUMN =>
+ subscriptionInfo.setSelector(column.value)
+ case SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN =>
+ subscriptionInfo.setSubscribedDestination(column.value)
+ case _ => None
+ }
+ }
+ }
+ subscriptionInfo
+ }
+ case None => {
+ logger.warn("lookupSubscription failed to find the subscription")
+ return null
+ }
+ }
+
+ }
+ }
+
+ def lookupAllSubscriptions(destination: ActiveMQDestination): Array[SubscriptionInfo] = {
+ withSession {
+ session =>
+ val subs = new ListBuffer[SubscriptionInfo]
+ session.list(KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination, EmptyPredicate, Consistency.Quorum).foreach {
+ case (superCol, columnList) => {
+ var key: String = superCol.value
+ logger.warn(string(columnList.length))
+ var subscriptionInfo = new SubscriptionInfo
+ subscriptionInfo.setClientId(getClientIdFromSubscriptionKey(key))
+ subscriptionInfo.setSubscriptionName(getSubscriptionNameFromSubscriptionKey(key))
+ subscriptionInfo.setDestination(destination)
+ columnList.foreach {
+ column => {
+ string(column.name) match {
+ case SUBSCRIPTIONS_SELECTOR_SUBCOLUMN =>
+ logger.debug("got selector col")
+ subscriptionInfo.setSelector(column.value)
+ case SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN =>
+ logger.debug("got sub dest col")
+ subscriptionInfo.setSubscribedDestination(column.value)
+ case _ => None
+ }
+ }
+ }
+
+ subs + subscriptionInfo
+ }
+ }
+ subs.toArray
+ }
+
+ }
+
+ def acknowledge(destination: ActiveMQDestination, clientId: String, subscriptionName: String, id: MessageId): Unit = {
+ val lastAckStoreId: Long = getStoreId(destination, id);
+ val superCol = KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination \ getSubscriptionSuperColumnName(clientId, subscriptionName)
+ val ackCol = superCol \ (SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN, lastAckStoreId)
+ withSession {
+ session =>
+ session.insert(ackCol)
+ }
+ }
+
+ def deleteSubscription(destination: ActiveMQDestination, clientId: String, subscriptionName: String): Unit = {
+ val superCol = KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination \ getSubscriptionSuperColumnName(clientId, subscriptionName)
+ withSession {
+ session =>
+ session.remove(superCol)
+ }
+ }
+
+ def getMessageCountFrom(destination: ActiveMQDestination, storeId: Long): Int = {
+ withSession {
+ session =>
+ session.list(KEYSPACE \ STORE_IDS_IN_USE_FAMILY \ destination, RangePredicate(Some(storeId), None, Order.Ascending, None)).size
+ }
+ }
+ def getLastAckStoreId(destination: ActiveMQDestination, clientId: String, subscriptionName: String): Int = {
+ withSession {
+ session =>
+ session.get(KEYSPACE \\ SUBSCRIPTIONS_FAMILY \ destination \ getSubscriptionSuperColumnName(clientId, subscriptionName) \ SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN) match {
+ case Some(col) =>
+ long(col.value).intValue
+ case None => {
+ logger.debug("LastAckStoreId Not found, returning 0");
+ 0
+ }
+
+ }
}
}
+ def getSubscriberId(clientId: String, subscriptionName: String): String = {
+ return getSubscriptionSuperColumnName(clientId, subscriptionName)
+ }
+
}
object CassandraClient {
- implicit def destinationKey(destination: ActiveMQDestination): String = {
+ val logger = Logger(this.getClass)
+
+ implicit def getDestinationKey(destination: ActiveMQDestination): String = {
destination.getQualifiedName
}
implicit def destinationBytes(destination: ActiveMQDestination): Array[Byte] = {
- bytes(destinationKey(destination))
+ bytes(getDestinationKey(destination))
+ }
+
+ implicit def bytesToDest(bytes: Array[Byte]): ActiveMQDestination = {
+ destinationFromKey(string(bytes))
}
implicit def destinationFromKey(key: String): ActiveMQDestination = {
@@ -364,48 +471,69 @@ object CassandraClient {
}
- private def getSubscriberId(clientId: String, subscriptionName: String): String = {
- return getSubscriptionSuperColumnName(clientId, subscriptionName)
+ private def getClientIdFromSubscriptionKey(key: String): String = {
+ var split: Array[String] = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER)
+ return split(0)
}
+ private def getSubscriptionNameFromSubscriptionKey(key: String): String = {
+ var split: Array[String] = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER)
+ if (split(1).equals(SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME)) {
+ return null
+ }
+ else {
+ return split(1)
+ }
+ }
+ def safeGetLong(bytes: Array[Byte]): Long = {
+ if (bytes.length != 8) {
+ logger.debug({"bytes length was %d, not 8, returning -1".format(bytes.length)})
+ return -1L
+ }
+ else {
+ return long(bytes)
+ }
+ }
- object Id {
- val KEYSPACE = "MessageStore"
- val BROKER_FAMILY = "Broker"
- val BROKER_KEY = "Broker"
- val BROKER_DESTINATION_COUNT = "destination-count"
+ val KEYSPACE = "MessageStore"
+ val BROKER_FAMILY = "Broker"
+ val BROKER_KEY = "Broker"
+ val BROKER_DESTINATION_COUNT = "destination-count"
- val DESTINATIONS_FAMILY = "Destinations"
- val DESTINATION_IS_TOPIC_COLUMN = "isTopic"
- val DESTINATION_MAX_STORE_SEQUENCE_COLUMN = "max-store-sequence"
- val DESTINATION_MAX_BROKER_SEQUENCE_COLUMN = "max-broker-sequence"
- val DESTINATION_QUEUE_SIZE_COLUMN = "queue-size"
+ val DESTINATIONS_FAMILY = "Destinations"
+ val DESTINATION_IS_TOPIC_COLUMN = "isTopic"
+ val DESTINATION_MAX_STORE_SEQUENCE_COLUMN = "max-store-sequence"
+ val DESTINATION_MAX_BROKER_SEQUENCE_COLUMN = "max-broker-sequence"
+ val DESTINATION_QUEUE_SIZE_COLUMN = "queue-size"
- val MESSAGES_FAMILY = "Messages"
+ val MESSAGES_FAMILY = "Messages"
- val MESSAGE_TO_STORE_ID_FAMILY = "MessageIdToStoreId"
+ val MESSAGE_TO_STORE_ID_FAMILY = "MessageIdToStoreId"
- val STORE_IDS_IN_USE_FAMILY = "StoreIdsInUse"
+ val STORE_IDS_IN_USE_FAMILY = "StoreIdsInUse"
- val SUBSCRIPTIONS_FAMILY = "Subscriptions"
- val SUBSCRIPTIONS_SELECTOR_SUBCOLUMN = "selector"
- val SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN = "lastMessageAck"
- val SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN = "subscribedDestination";
+ val SUBSCRIPTIONS_FAMILY = "Subscriptions"
+ val SUBSCRIPTIONS_SELECTOR_SUBCOLUMN = "selector"
+ val SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN = "lastMessageAck"
+ val SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN = "subscribedDestination";
- /*Subscriptions Column Family Constants*/
+ /*Subscriptions Column Family Constants*/
- val SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER: String = "~~~~~"
- val SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME: String = "@NOT_SET@"
+ val SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER: String = "~~~~~"
+ val SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME: String = "@NOT_SET@"
+}
+object CassandraClientUtil {
+ def getDestinationKey(destination: ActiveMQDestination): String = {
+ CassandraClient.getDestinationKey(destination)
}
-
-
+ val MESSAGES_FAMILY = CassandraClient.MESSAGES_FAMILY
}
View
12 src/test/java/org/apache/activemq/store/cassandra/CassandraClientTest.java
@@ -9,7 +9,6 @@
import org.junit.Test;
import java.io.UnsupportedEncodingException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,6 +31,7 @@ public static void createAdapter() throws TTransportException {
client.setCassandraHost("localhost");
client.setCassandraPort(getCassandraPort());
client.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ client.start();
}
@@ -69,8 +69,8 @@ public void messageCrud() throws TTransportException, UnsupportedEncodingExcepti
try {
byte[] rte2 = client.getMessage(queue, 1);
assertFalse(new String(fakeMessage).equals(new String(rte2)));
- } catch (RuntimeException e) {
- assertTrue(e.getCause() instanceof NotFoundException);
+ } catch (Exception e) {
+ assertTrue(e instanceof NotFoundException);
}
assertEquals(0, client.getMessageCount(queue));
}
@@ -121,7 +121,7 @@ public void testQuery() throws TException, UnsupportedEncodingException, TimedOu
client.saveMessage(queue, i, messageId, fakeMessage, count, dups);
log.debug("saved:{}", i);
}
- ColumnParent p = new ColumnParent(CassandraIdentifier.MESSAGES_FAMILY.string());
+ ColumnParent p = new ColumnParent(CassandraClientUtil.MESSAGES_FAMILY());
byte[] start = new byte[0];
byte[] end = new byte[0];
byte[] lastCol = new byte[0];
@@ -131,7 +131,7 @@ public void testQuery() throws TException, UnsupportedEncodingException, TimedOu
predicate.setSlice_range(range);
- List<ColumnOrSuperColumn> orSuperColumns = client.getCassandraConnection().get_slice(CassandraIdentifier.KEYSPACE.string(), CassandraUtils.getDestinationKey(queue), p, predicate, ConsistencyLevel.QUORUM);
+ /* List<ColumnOrSuperColumn> orSuperColumns = client.getCassandraConnection().get_slice(CassandraIdentifier.KEYSPACE.string(), CassandraUtils.getDestinationKey(queue), p, predicate, ConsistencyLevel.QUORUM);
for (ColumnOrSuperColumn orSuperColumn : orSuperColumns) {
logColumnName(orSuperColumn, Long.class);
}
@@ -140,7 +140,7 @@ public void testQuery() throws TException, UnsupportedEncodingException, TimedOu
orSuperColumns = client.getCassandraConnection().get_slice(CassandraIdentifier.KEYSPACE.string(), CassandraUtils.getDestinationKey(queue), p, predicate, ConsistencyLevel.QUORUM);
for (ColumnOrSuperColumn orSuperColumn : orSuperColumns) {
logColumnName(orSuperColumn, Long.class);
- }
+ }*/
}
View
1  src/test/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapterTest.java
@@ -28,6 +28,7 @@ protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exc
cassandraClient.setCassandraHost("localhost");
cassandraClient.setCassandraPort(EmbeddedServicesTest.getCassandraPort());
cassandraClient.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ cassandraClient.start();
ZooKeeperMasterElector elector = new ZooKeeperMasterElector();
elector.setZookeeperConnectString(EmbeddedServicesTest.getZookeeperConnectString());
adapter.setCassandraClient(cassandraClient);
View
1  src/test/java/org/apache/activemq/store/cassandra/CassandraStoreOrderTest.java
@@ -69,6 +69,7 @@ protected void setPersistentAdapter(BrokerService brokerService) throws Exceptio
cassandraClient.setCassandraHost("localhost");
cassandraClient.setCassandraPort(EmbeddedServicesTest.getCassandraPort());
cassandraClient.setConsistencyLevel(ConsistencyLevel.QUORUM);
+ cassandraClient.start();
ZooKeeperMasterElector masterElector = new ZooKeeperMasterElector();
masterElector.setZookeeperConnectString(EmbeddedServicesTest.getZookeeperConnectString());
cassandraPersistenceAdapter.setCassandraClient(cassandraClient);
View
24 src/test/java/org/apache/activemq/store/cassandra/EmbeddedServicesTest.java
@@ -9,8 +9,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.nio.LongBuffer;
import java.util.Properties;
/**
@@ -119,9 +120,9 @@ private Object getValue(ColumnOrSuperColumn cos, Class valueType) {
private Object get(byte[] bytes, Class type) {
if (type.equals(String.class)) {
- return CassandraUtils.getString(bytes);
+ return getString(bytes);
} else if (type.equals(Long.class)) {
- return CassandraUtils.getLong(bytes);
+ return getLong(bytes);
} else {
return "UnsupportedType for conversion";
}
@@ -131,5 +132,20 @@ public void logColumnNameAndValue(ColumnOrSuperColumn cos, Class nameType, Class
log.debug("column:{} value:{}", getName(cos, nameType), getValue(cos, valueType));
}
+ public static String getString(byte[] bytes) {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public static long getLong(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ LongBuffer lBuffer = byteBuffer.asLongBuffer();
+ return lBuffer.get();
+ }
+
}
View
2  src/test/resources/log4j.properties
@@ -1,6 +1,6 @@
log4j.rootLogger=INFO,stdout
log4j.logger.org.apache.activemq.store.cassandra=INFO
-log4j.logger.org.apache.activemq.store.cassandra.CassandraClient=INFO
+log4j.logger.org.apache.activemq.store.cassandra.CassandraClient=DEBUG
log4j.logger.org.apache.activemq.store.cassandra.CassandraPersistenceAdapter=DEBUG
log4j.logger.org.apache.zookeeper.server.ZooKeeperServerMain=ERROR
Please sign in to comment.
Something went wrong with that request. Please try again.