Browse files

Initial Git Repo

  • Loading branch information...
0 parents commit 1d5b3427d6a1980cad4e85ca4ec84dae592d6f3a @ticktock committed Jun 2, 2010
Showing with 3,743 additions and 0 deletions.
  1. +6 −0 .gitignore
  2. +201 −0 LICENSE
  3. +105 −0 README.markdown
  4. +264 −0 pom.xml
  5. +730 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraClient.java
  6. +50 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraIdentifier.java
  7. +127 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraMessageStore.java
  8. +249 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapter.java
  9. +68 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapterFactory.java
  10. +114 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraTopicMessageStore.java
  11. +164 −0 src/main/java/org/apache/activemq/store/cassandra/CassandraUtils.java
  12. +33 −0 src/main/java/org/apache/activemq/store/cassandra/DestinationMaxIds.java
  13. +17 −0 src/main/java/org/apache/activemq/store/cassandra/MasterElector.java
  14. +136 −0 src/main/java/org/apache/activemq/store/cassandra/ZooKeeperMasterElector.java
  15. +112 −0 src/main/resources/keyspace.xml
  16. +142 −0 src/test/java/org/apache/activemq/store/cassandra/CassandraClientTest.java
  17. +79 −0 src/test/java/org/apache/activemq/store/cassandra/CassandraServiceDataCleaner.java
  18. +79 −0 src/test/java/org/apache/activemq/store/cassandra/CassandraStoreOrderTest.java
  19. +29 −0 src/test/java/org/apache/activemq/store/cassandra/EmbeddedCassandraService.java
  20. +135 −0 src/test/java/org/apache/activemq/store/cassandra/EmbeddedServicesTest.java
  21. +36 −0 src/test/java/org/apache/activemq/store/cassandra/EmbeddedZookeeperService.java
  22. +126 −0 src/test/java/org/apache/activemq/store/cassandra/SendAndReceiveTest.java
  23. +37 −0 src/test/java/org/apache/activemq/store/cassandra/SpringBrokerTest.java
  24. +173 −0 src/test/java/org/apache/activemq/store/cassandra/TopicTest.java
  25. +51 −0 src/test/java/org/apache/activemq/store/cassandra/ZooKeeperMasterElectorTest.java
  26. +62 −0 src/test/resources/broker.xml
  27. +13 −0 src/test/resources/log4j.properties
  28. +401 −0 src/test/resources/storage-conf.xml
  29. +4 −0 src/test/resources/test.properties
6 .gitignore
@@ -0,0 +1,6 @@
+target
+qsandra.ipr
+qsandra.iws
+qsandra.iml
+.project
+.classpath
201 LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
105 README.markdown
@@ -0,0 +1,105 @@
+#qsandra - A Cassandra backed PersistenceAdapter for ActiveMQ
+
+##Intro
+While this adapter can be used against any existing cassandra installation, the goal is to provide an ActiveMQ broker cluster
+that is available across multiple datacenters, that can tolerate the loss of a datacenter with no impact on availability
+(like the existing ActiveMQ pure master-slave deployed across datacenters), while not having to bring the broker cluster down and copy data files around to
+ restore a failed master (like the existing ActiveMQ JDBC or Shared Filesystem master-slave), and have message state easily replicated to multiple datacenters
+ without expensive database or storage software and hardware.
+
+##Running
+###Configuration
+####ActiveMQ
+To configure ActiveMQ to use Cassandra for message persistence, you need to know a few pieces of information.
+
+You need the host and port of whatever is doing cassandra load balancing for you.
+If running more than one broker instance and using ZooKeeper for master election, you need your zookeeper connect string.
+
+Here is an example spring config.
+
+
+ <broker:broker useJmx="true" persistent="true">
+
+ <broker:persistenceAdapter>
+ <ref bean="adapter"/>
+ </broker:persistenceAdapter>
+
+ <broker:transportConnectors>
+ <broker:transportConnector name="tcp"
+ uri="tcp://messages.example.com:60001"/>
+ </broker:transportConnectors>
+
+ </broker:broker>
+
+ <bean id="adapter" class="org.apache.activemq.store.cassandra.CassandraPersistenceAdapter">
+ <property name="cassandraClient" ref="cassandraClient"/>
+ <property name="masterElector" ref="masterElector"/>
+ </bean>
+
+ <bean id="cassandraClient" class="org.apache.activemq.store.cassandra.CassandraClient">
+ <property name="cassandraHost" value="cassandra.example.com"/>
+ <property name="cassandraPort" value="9160"/>
+ </bean>
+
+ <bean id="masterElector" class="org.apache.activemq.store.cassandra.ZooKeeperMasterElector">
+ <property name="zookeeperConnectString" value="zookeeper.datacenter1.example.com:9260,zookeeper.datacenter2.example.com:9260,zookeeper.datacenter3.example.com:9260"/>
+ </bean>
+
+####Cassandra
+The keyspace defined [here](qsandra/blob/master/src/main/resources/keyspace.xml) needs to be deployed into your cassandra cluster, *after you modify
+the ReplicaPlacementStrategy,ReplicationFactor,and EndPointSnitch appropriately for your use case*
+
+####ZooKeeper
+If you are using the ZooKeeperMasterElector, a persistent node will be created at /qsandra/, and ephemeral, sequential nodes willbe created
+under this node, so if you have multiple broker clusters using the same ZooKeeper, use appropriate chroot suffixes in the connect strings
+to partition the master election appropriately.
+
+##Usecases
+All of the usecases assume a familiarity with, or at least the willingness to learn, the techniques for running an Apache Cassandra cluster,
+and in cases where there is more than a single ActiveMQ broker (when using ActiveMQ failover://), Apache ZooKeeper, which is used to elect the
+master broker. (You can also write your own master election if you dont want to use zookeeper). Most of the work here is setting up Cassandra
+and ZooKeeper for appropriate replication and availablity.
+
+
+###Multi Datacenter HA ActiveMQ Broker Cluster
+Due to the QUORUM mechanics of both Cassandra and ZooKeeper, to tolerate the loss of a datacenter while not impacting the availability of ActiveMQ
+there must be at least 3 datacenters involved in this configuration. Each datacenter needs at least one zooKeeper instance, and at least one cassandra instance.
+
+So lets say we have 3 datacenters, with 1 ZooKeeper node, 2 Cassandra nodes and 2 ActiveMQ brokers per data center.
+
+Use the DatacenterShardStragegy for replica placement, set the ReplicationFactor to 6, and the ReplicationFactor for each data center to 2.
+
+With QUORUM ConsistencyLevel for reads and writes, reads and writes will succeed when 4 (ReplicationFactor / 2 + 1) reads or writes are successful.
+So this means if a datacenter is network partitioned or lost, we keep on trucking. Any 2 nodes of the 6 can be unavailable.
+
+Similarly with Zookeeper, any 1 node of the 3 can be unavailable.
+
+If the master broker is in the datacenter that dies, or is partitioned, one of the other brokers in the other datacenter will be elected master,
+and clients should fail over (using failover:// brokerURL)
+
+
+###Single Datacenter, Single Broker instance
+This is a much simpler configuration. If a MasterElector is not set on the PersistenceAdapter, the broker will assume it is master.
+
+
+
+##Building
+Since there isnt a clean maven distribution of Cassandra at this point, you need to do some initial setup before you can build.
+Download Cassandra, and unpack such that the cassandra directory is a sibling directory of qsandra.
+
+for example
+
+/path/to/projects/qsandra/
+
+/path/to/projects/apache-cassandra-0.6.1/
+
+All the cassandra dependencies are declared as system dependencies in the pom file, so you need to unpack the cassandra distro as stated above
+or set the ${cassandra.home} maven property while building to point to where you unpacked. (mvn -Dcassandra.home=/different/path ...)
+
+If you are on a non windows platform, you should be able to mvn clean install at this point.
+
+##Building on Windows
+Due to some funky behavior in maven and cassandra on Windows, you will need to set another maven property to have a successful build and test on Windows.
+Set the ${cassandra.data} maven property to the path to where you want to store cassandra data, *using unescaped forward slashes in the path and with a trailing slash!*.
+
+Like so mvn -Dcassandra.data=C:/path/to/projects/qsandra/target/ ...
264 pom.xml
@@ -0,0 +1,264 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>qsandra</groupId>
+ <artifactId>qsandra</artifactId>
+ <version>0.1-SNAPSHOT</version>
+
+ <properties>
+ <activemq.version>5.3.1</activemq.version>
+ <cassandra.version>0.6.1</cassandra.version>
+ <thrift.version>r917130</thrift.version>
+ <antlr.version>3.1.3</antlr.version>
+ <common.cli.version>1.1</common.cli.version>
+ <common.collections.version>3.2.1</common.collections.version>
+ <common.lang.version>2.4</common.lang.version>
+ <hadoop.version>0.20.1</hadoop.version>
+ <google.collections.version>1.0</google.collections.version>
+ <ivy.version>2.1.0</ivy.version>
+ <jackson.version>1.4.0</jackson.version>
+ <jline.version>0.9.94</jline.version>
+ <json.simple.version>1.1</json.simple.version>
+ <log4j.version>1.2.14</log4j.version>
+ <slf4j.version>1.5.8</slf4j.version>
+ <zookeeper.version>3.3.0</zookeeper.version>
+ <projectBaseUri>${project.baseUri}</projectBaseUri>
+ </properties>
+
+ <profiles>
+ <profile>
+ <id>default</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <cassandra.home>${project.basedir}${file.separator}..${file.separator}apache-cassandra-${cassandra.version}</cassandra.home>
+ <cassandra.data>${project.build.directory}${file.separator}</cassandra.data>
+ <zookeeper.data>${project.build.directory}${file.separator}</zookeeper.data>
+ <cassandra.conf>${project.build.testOutputDirectory}${file.separator}</cassandra.conf>
+ <thrift.port>9160</thrift.port>
+ <zookeeper.port>9970</zookeeper.port>
+ </properties>
+ </profile>
+ <profile>
+ <id>windows</id>
+ <properties>
+ <!--
+ annoyingly fo some reason cassandra init blows up when running EmbeddedCassandraService
+ under windows and maven, so you need to define a path with forward slashes
+ to your build directory, incl the trailing slash
+ -->
+ <cassandra.data>C:/cygwin1.7/home/ticktock/projects/qsandra/target/</cassandra.data>
+ </properties>
+ </profile>
+ </profiles>
+
+ <build>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <filtering>true</filtering>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ <configuration>
+ <encoding>UTF-8</encoding>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <forkMode>pertest</forkMode>
+ <useFile>false</useFile>
+ <excludes>
+ <!--Base test-->
+ <exclude>**/EmbeddedServicesTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>${activemq.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core</artifactId>
+ <version>${activemq.version}</version>
+ <scope>test</scope>
+ <classifier>tests</classifier>
+ </dependency>
+ <!-- xbean is required for ActiveMQ broker configuration in the spring xml file -->
+ <dependency>
+ <groupId>org.apache.xbean</groupId>
+ <artifactId>xbean-spring</artifactId>
+ <version>3.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>${zookeeper.version}</version>
+ </dependency>
+
+
+ <!--Cassandra-->
+ <dependency>
+ <groupId>org.apache.cassandra</groupId>
+ <artifactId>cassandra</artifactId>
+ <version>${cassandra.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/apache-cassandra-${cassandra.version}.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>thrift</artifactId>
+ <version>${thrift.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/libthrift-${thrift.version}.jar</systemPath>
+ </dependency>
+ <!--Cassandra Testing-->
+ <dependency>
+ <groupId>antlr</groupId>
+ <artifactId>antlr</artifactId>
+ <version>${antlr.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/antlr-${antlr.version}.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>com.reardencommerce.kernel</groupId>
+ <artifactId>collections</artifactId>
+ <version>${cassandra.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/clhm-production.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ <version>${common.cli.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-collections</groupId>
+ <artifactId>commons-collections</artifactId>
+ <version>${common.collections.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>${common.lang.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.collections</groupId>
+ <artifactId>google-collections</artifactId>
+ <version>${google.collections.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/hadoop-core-${hadoop.version}.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>org.cliffc</groupId>
+ <artifactId>high-scale-lib</artifactId>
+ <version>${cassandra.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/high-scale-lib.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ivy</groupId>
+ <artifactId>ivy</artifactId>
+ <version>${ivy.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-core-asl</artifactId>
+ <version>${jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ <version>${jackson.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>net.sf</groupId>
+ <artifactId>jline</artifactId>
+ <version>${jline.version}</version>
+ <scope>system</scope>
+ <systemPath>${cassandra.home}/lib/jline-${jline.version}.jar</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${json.simple.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${slf4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.googlecode.json-simple</groupId>
+ <artifactId>json-simple</artifactId>
+ <version>${json.simple.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.3.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.5</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+</project>
730 src/main/java/org/apache/activemq/store/cassandra/CassandraClient.java
@@ -0,0 +1,730 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.command.*;
+import org.apache.cassandra.thrift.*;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.apache.activemq.store.cassandra.CassandraIdentifier.*;
+import static org.apache.cassandra.thrift.ConsistencyLevel.*;
+
+/**
+ *
+ */
+public class CassandraClient extends CassandraUtils {
+
+
+ public static final ColumnPath DESTINATION_QUEUE_SIZE_COLUMN_PATH = new ColumnPath(DESTINATIONS_FAMILY.string());
+ public static final ColumnPath BROKER_DESTINATION_COUNT_COLUMN_PATH = new ColumnPath(BROKER_FAMILY.string());
+
+ static {
+ DESTINATION_QUEUE_SIZE_COLUMN_PATH.setColumn(DESTINATION_QUEUE_SIZE_COLUMN.bytes());
+ BROKER_DESTINATION_COUNT_COLUMN_PATH.setColumn(BROKER_DESTINATION_COUNT.bytes());
+ }
+
+ /*Subscriptions Column Family Constants*/
+ public static final String SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER = "~~~~~";
+ public static final String SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME = "@NOT_SET@";
+
+ public static final byte[] ZERO = getBytes(0L);
+
+
+ private static ThreadLocal<Cassandra.Client> cassandraClients = new ThreadLocal<Cassandra.Client>();
+ private static ThreadLocal<TTransport> cassandraTransports = new ThreadLocal<TTransport>();
+ private static Logger log = LoggerFactory.getLogger(CassandraClient.class);
+
+ private String cassandraHost;
+ private int cassandraPort;
+
+ private ConsistencyLevel consistencyLevel = QUORUM;
+
+
+ public ConsistencyLevel getConsistencyLevel() {
+ return consistencyLevel;
+ }
+
+ public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
+ if (!EnumSet.of(QUORUM, DCQUORUM, DCQUORUMSYNC).contains(consistencyLevel)) {
+ throw new IllegalArgumentException("Only QUORUM or DCQUORUM or DCQUORUMSYNC are supported consistency levels.");
+ }
+ this.consistencyLevel = consistencyLevel;
+ }
+
+ public String getCassandraHost() {
+ return cassandraHost;
+ }
+
+ public void setCassandraHost(String cassandraHost) {
+ this.cassandraHost = cassandraHost;
+ }
+
+ public int getCassandraPort() {
+ return cassandraPort;
+ }
+
+ public void setCassandraPort(int cassandraPort) {
+ this.cassandraPort = cassandraPort;
+ }
+
+ Cassandra.Client getCassandraConnection() {
+ Cassandra.Client client = cassandraClients.get();
+ TTransport tr = cassandraTransports.get();
+ if (client == null || !tr.isOpen()) {
+ tr = new TSocket(cassandraHost, cassandraPort);
+ TProtocol proto = new TBinaryProtocol(tr);
+ client = new Cassandra.Client(proto);
+ try {
+ tr.open();
+ } catch (TTransportException e) {
+ log.error("Unable to open transport", e);
+ throw new RuntimeException(e);
+ }
+ cassandraTransports.set(tr);
+ cassandraClients.set(client);
+ }
+ return client;
+ }
+
+ void discacrdCassandraConnection() {
+ cassandraClients.remove();
+ cassandraTransports.remove();
+ }
+
+ /*Broker CF Methods*/
+
+
+ public int getDestinationCount() {
+ try {
+ ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), BROKER_KEY.string(), BROKER_DESTINATION_COUNT_COLUMN_PATH, consistencyLevel);
+ return getInt(cosc.getColumn().getValue());
+ } catch (NotFoundException e) {
+ log.warn("Broker Destination Count not found, inserting 0");
+ } catch (Exception e) {
+ log.error("Exception in getDestinationCount", e);
+ throw new RuntimeException(e);
+ }
+
+ try {
+ insertDestinationCount(0);
+ return 0;
+ } catch (Exception e) {
+ log.error("Exception in getDestinationCount while inserting 0", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void insertDestinationCount(int count) throws TException, TimedOutException, InvalidRequestException, UnavailableException {
+ getCassandraConnection().insert(KEYSPACE.string(), BROKER_KEY.string(), BROKER_DESTINATION_COUNT_COLUMN_PATH, getBytes(count), timestamp(), consistencyLevel);
+ }
+
+
+ /*Destination CF Methods*/
+
+ public boolean createDestination(String name, boolean isTopic, AtomicInteger destinationCount) {
+ ColumnPath columnPath = new ColumnPath(DESTINATIONS_FAMILY.string());
+ columnPath.setColumn(DESTINATION_IS_TOPIC_COLUMN.bytes());
+ try {
+ getCassandraConnection().get(KEYSPACE.string(), name, columnPath, consistencyLevel);
+ log.info("Destination {} Exists", name);
+ return false;
+ } catch (NotFoundException e) {
+ log.warn("Destination {} not found, Creating, topic:{} ", name, isTopic);
+ try {
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> mutation = map();
+ List<Mutation> destinationMutations = list();
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_IS_TOPIC_COLUMN.string(), isTopic));
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.string(), 0L));
+ mutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
+ mutations.put(name, mutation);
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), ZERO, timestamp()));
+ Map<String, List<Mutation>> brokerMutation = map();
+ List<Mutation> brokerMutations = list();
+ brokerMutations.add(getInsertOrUpdateColumnMutation(BROKER_DESTINATION_COUNT.bytes(), getBytes(destinationCount.incrementAndGet()), timestamp()));
+ brokerMutation.put(BROKER_FAMILY.string(), brokerMutations);
+ mutations.put(BROKER_KEY.string(), brokerMutation);
+
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+
+
+ return true;
+ } catch (Exception e2) {
+ destinationCount.decrementAndGet();
+ throw new RuntimeException(e2);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public Set<ActiveMQDestination> getDestinations() {
+ SlicePredicate predicate = new SlicePredicate();
+ predicate.setSlice_range(new SliceRange(new byte[0], new byte[0], false, Integer.MAX_VALUE));
+ ColumnParent parent = new ColumnParent(DESTINATIONS_FAMILY.string());
+ KeyRange keyRange = new KeyRange();
+ keyRange.setStart_key(getString(new byte[0]));
+ keyRange.setEnd_key(getString(new byte[0]));
+ try {
+ List<KeySlice> slices = getCassandraConnection().get_range_slices(KEYSPACE.string(), parent, predicate, keyRange, consistencyLevel);
+ List<String> keys = new ArrayList<String>();
+ for (KeySlice keySlice : slices) {
+ keys.add(keySlice.getKey());
+ }
+ SlicePredicate topicPredicate = new SlicePredicate();
+ topicPredicate.setColumn_names(Collections.singletonList(getBytes("isTopic")));
+ Map<String, List<ColumnOrSuperColumn>> result = getCassandraConnection().multiget_slice(KEYSPACE.string(), keys, parent, predicate, consistencyLevel);
+
+ Set<ActiveMQDestination> destinations = set();
+ for (Map.Entry<String, List<ColumnOrSuperColumn>> stringListEntry : result.entrySet()) {
+ if (stringListEntry.getValue().size() == 1) {
+ boolean isTopic = getBoolean(stringListEntry.getValue().get(0).getColumn().getValue());
+ if (isTopic) {
+ destinations.add(ActiveMQDestination.createDestination(stringListEntry.getKey(), ActiveMQDestination.TOPIC_TYPE));
+ } else {
+ destinations.add(ActiveMQDestination.createDestination(stringListEntry.getKey(), ActiveMQDestination.QUEUE_TYPE));
+ }
+ }
+ }
+ return destinations;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public void deleteQueue(ActiveMQDestination destination, AtomicInteger destinationCount) {
+ try {
+ String key = getDestinationKey(destination);
+ getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(MESSAGES_FAMILY.string()), timestamp(), consistencyLevel);
+ getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(DESTINATIONS_FAMILY.string()), timestamp(), consistencyLevel);
+ getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(MESSAGE_TO_STORE_ID_FAMILY.string()), timestamp(), consistencyLevel);
+ getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(STORE_IDS_IN_USE_FAMILY.string()), timestamp(), consistencyLevel);
+ getCassandraConnection().remove(KEYSPACE.string(), key, new ColumnPath(SUBSCRIPTIONS_FAMILY.string()), timestamp(), consistencyLevel);
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> mutation = map();
+ List<Mutation> brokerMutations = list();
+ mutations.put(BROKER_KEY.string(), mutation);
+ mutation.put(BROKER_FAMILY.string(), brokerMutations);
+ brokerMutations.add(getInsertOrUpdateColumnMutation(BROKER_DESTINATION_COUNT.bytes(), getBytes(destinationCount.decrementAndGet()), timestamp()));
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+ } catch (Exception e) {
+ destinationCount.incrementAndGet();
+ log.error("Exception in deleteQueue", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deleteTopic(ActiveMQTopic destination, AtomicInteger destinationCount) {
+ deleteQueue(destination, destinationCount);
+ }
+
+
+ public DestinationMaxIds getMaxStoreId() {
+
+ int destinations = getDestinations().size();
+ if (destinations == 0) {
+ return new DestinationMaxIds(null, 0, 0);
+ }
+ ColumnParent columnParent = new ColumnParent(DESTINATIONS_FAMILY.string());
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setColumn_names(Collections.singletonList(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes()));
+ KeyRange keyRange = new KeyRange();
+ keyRange.setStart_key("");
+ keyRange.setEnd_key("");
+ keyRange.setCount(destinations);
+ try {
+ List<KeySlice> cols = getCassandraConnection().get_range_slices(KEYSPACE.string(), columnParent, slicePredicate, keyRange, consistencyLevel);
+ DestinationMaxIds max = new DestinationMaxIds(null, 0, 0);
+ long storeVal = 0;
+ long broker = 0;
+ for (KeySlice col : cols) {
+ String key = col.getKey();
+ for (ColumnOrSuperColumn columnOrSuperColumn : col.getColumns()) {
+ if (Arrays.equals(columnOrSuperColumn.getColumn().getName(), DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes())) {
+ storeVal = getLong(columnOrSuperColumn.getColumn().getValue());
+ } else if (Arrays.equals(columnOrSuperColumn.getColumn().getName(), DESTINATION_MAX_BROKER_SEQUENCE_COLUMN.bytes())) {
+ broker = getLong(columnOrSuperColumn.getColumn().getValue());
+ }
+ }
+ if (storeVal > max.getMaxStoreId()) {
+ max = new DestinationMaxIds(ActiveMQDestination.createDestination(key, ActiveMQDestination.QUEUE_TYPE), storeVal, broker);
+ }
+ }
+
+
+ return max;
+ } catch (Exception e) {
+ log.error("Error getting Max Store ID", e);
+ throw new RuntimeException(e);
+ }
+
+ }
+
+
+ private byte[] getMessageIndexKey(MessageId id) {
+ return getBytes(getMessageIndexKeyString(id));
+ }
+
+ private String getMessageIndexKeyString(MessageId id) {
+ return new StringBuilder(id.getProducerId().toString()).append("->").append(id.getProducerSequenceId()).toString();
+ }
+
+ public long getStoreId(ActiveMQDestination destination, MessageId identity) {
+ ColumnPath path = new ColumnPath(MESSAGE_TO_STORE_ID_FAMILY.string());
+ path.setColumn(getMessageIndexKey(identity));
+ String key = getDestinationKey(destination);
+ try {
+ ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), key, path, consistencyLevel);
+ return getLong(cosc.getColumn().getValue());
+ } catch (Exception e) {
+ log.error("Exception in getStoreId", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*Messages CF Methods*/
+
+ public byte[] getMessage(ActiveMQDestination destination, long storeId) {
+ ColumnPath path = new ColumnPath(MESSAGES_FAMILY.string());
+ path.setColumn(getBytes(storeId));
+ try {
+ ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
+ byte[] messageBytes = cosc.getColumn().getValue();
+ if (messageBytes.length == 0) {
+ throw new NotFoundException();
+ }
+ return messageBytes;
+ } catch (NotFoundException e) {
+ log.error("Message Not Found");
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ log.error("Exception getting message", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public void saveMessage(ActiveMQDestination destination, long id, MessageId messageId, byte[] messageBytes, AtomicLong queueSize) {
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> saveMutation = map();
+
+ String cassandraKey = getDestinationKey(destination);
+ mutations.put(cassandraKey, saveMutation);
+
+ List<Mutation> messageMutations = list();
+ List<Mutation> destinationMutations = list();
+ List<Mutation> indexMutations = list();
+ List<Mutation> storeIdsMutations = list();
+ saveMutation.put(MESSAGES_FAMILY.string(), messageMutations);
+ saveMutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
+ saveMutation.put(MESSAGE_TO_STORE_ID_FAMILY.string(), indexMutations);
+ saveMutation.put(STORE_IDS_IN_USE_FAMILY.string(), storeIdsMutations);
+ long colName = id;
+ log.debug("Saving message with id:{}", colName);
+ log.debug("Saving message with brokerSeq id:{}", messageId.getBrokerSequenceId());
+ long current = timestamp();
+ messageMutations.add(getInsertOrUpdateColumnMutation(getBytes(colName), messageBytes, current));
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), getBytes(queueSize.incrementAndGet()), current));
+
+ //Timestamp is ID so max will always be there
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_STORE_SEQUENCE_COLUMN.bytes(), getBytes(id), current));
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_MAX_BROKER_SEQUENCE_COLUMN.bytes(), getBytes(messageId.getBrokerSequenceId()), current));
+ //TODO what is thre right timestamp here
+ indexMutations.add(getInsertOrUpdateColumnMutation(getMessageIndexKey(messageId), getBytes(id), current));
+ storeIdsMutations.add(getInsertOrUpdateColumnMutation(getBytes(colName), new byte[1], current));
+ try {
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+ } catch (Exception e) {
+ queueSize.decrementAndGet();
+ log.error("Exception savingMessage", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void deleteMessage(ActiveMQDestination destination, MessageId id, AtomicLong queueSize) {
+ long column = getStoreId(destination, id);
+ long current = timestamp();
+ Mutation delete = getDeleteColumnMutation(getBytes(column), current);
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> saveMutation = map();
+
+ String cassandraKey = getDestinationKey(destination);
+ mutations.put(cassandraKey, saveMutation);
+
+ List<Mutation> messageMutations = list();
+
+ List<Mutation> indexMutations = list();
+ List<Mutation> destinationMutations = list();
+ List<Mutation> storeIdsMutations = list();
+ saveMutation.put(MESSAGES_FAMILY.string(), messageMutations);
+ saveMutation.put(STORE_IDS_IN_USE_FAMILY.string(), storeIdsMutations);
+ saveMutation.put(MESSAGE_TO_STORE_ID_FAMILY.string(), indexMutations);
+ saveMutation.put(DESTINATIONS_FAMILY.string(), destinationMutations);
+ messageMutations.add(delete);
+ destinationMutations.add(getInsertOrUpdateColumnMutation(DESTINATION_QUEUE_SIZE_COLUMN.bytes(), getBytes(queueSize.decrementAndGet()), current));
+ indexMutations.add(getDeleteColumnMutation(getMessageIndexKey(id), current));
+ storeIdsMutations.add(getDeleteColumnMutation(getBytes(column), current));
+ try {
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+ log.debug("Deleted Message {}", column);
+ } catch (Exception e) {
+ queueSize.incrementAndGet();
+ log.error("Unable to delete message", e);
+ }
+ }
+
+ public void deleteAllMessages(ActiveMQDestination destination, AtomicLong queueSize) {
+ ColumnPath path = new ColumnPath(MESSAGES_FAMILY.string());
+ try {
+ getCassandraConnection().remove(KEYSPACE.string(), getDestinationKey(destination), path, timestamp(), consistencyLevel);
+ queueSize.set(0);
+ } catch (Exception e) {
+ log.error("Unable to delete all messages", e);
+ }
+ }
+
+ public int getMessageCount(ActiveMQDestination destination) {
+ try {
+ ColumnOrSuperColumn col = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), DESTINATION_QUEUE_SIZE_COLUMN_PATH, consistencyLevel);
+ byte[] countBytes = col.getColumn().getValue();
+ long count = getLong(countBytes);
+ if (count > Integer.MAX_VALUE) {
+ throw new IllegalStateException("Count Higher than Max int, something wrong");
+ } else {
+ return Long.valueOf(count).intValue();
+ }
+ } catch (Exception e) {
+ log.error("Error during getMessageCount for :" + getDestinationKey(destination), e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+ }
+
+ public List<byte[]> recoverMessages(ActiveMQDestination destination, AtomicLong batchPoint, int maxReturned) {
+ if (log.isDebugEnabled()) {
+ log.debug("recoverMessages({},{},{})", new Object[]{getDestinationKey(destination), batchPoint.get(), maxReturned});
+ }
+ if (maxReturned < 1) {
+ throw new IllegalArgumentException("cant get less than one result");
+ }
+ String key = getDestinationKey(destination);
+ byte[] start = batchPoint.get() == -1 ? new byte[0] : getBytes(batchPoint.get());
+ byte[] end = new byte[0];
+ List<byte[]> messages = list();
+ recoverMessagesFromTo(key, start, end, maxReturned, messages, maxReturned);
+ return messages;
+ }
+
+
+ private void recoverMessagesFromTo(String key, byte[] start, byte[] end, int limit, List<byte[]> messages, int messagelimit) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
+ }
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(start, end, false, limit);
+ predicate.setSlice_range(range);
+
+ List<ColumnOrSuperColumn> cols = null;
+
+ try {
+ cols = getCassandraConnection().get_slice(KEYSPACE.string(), key, new ColumnParent(MESSAGES_FAMILY.string()), predicate, consistencyLevel);
+ } catch (InvalidRequestException e) {
+ log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
+ log.error("InvalidRequestException", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ } catch (UnavailableException e) {
+ log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
+ log.error("UnavailableException", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ } catch (TimedOutException e) {
+ log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
+ log.error("TimedOutException", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ } catch (TException e) {
+ log.error("recoverMessagesFromTo({},{},{},{},{})", new Object[]{key, safeGetLong(start), safeGetLong(end), limit, messages.size()});
+ log.error("TException", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+
+ for (ColumnOrSuperColumn col : cols) {
+ Column c = col.getColumn();
+
+ messages.add(c.getValue());
+ if (log.isDebugEnabled()) {
+ log.debug("recovered message with id: {}", safeGetLong(c.getName()));
+ }
+ if (messages.size() >= messagelimit) {
+ break;
+ }
+ }
+
+
+ }
+
+ /*Subscription CF Messages*/
+
+ public void addSubscription(ActiveMQDestination destination, SubscriptionInfo subscriptionInfo, long ack) {
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> saveMutation = map();
+ List<Mutation> mutationList = list();
+ Mutation insert = new Mutation();
+ mutationList.add(insert);
+ mutations.put(getDestinationKey(destination), saveMutation);
+ saveMutation.put(SUBSCRIPTIONS_FAMILY.string(), mutationList);
+
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ insert.setColumn_or_supercolumn(columnOrSuperColumn);
+ List<Column> cols = list();
+ SuperColumn superColumn = new SuperColumn(getBytes(getSubscriptionSuperColumnName(subscriptionInfo)), cols);
+ columnOrSuperColumn.setSuper_column(superColumn);
+
+ byte[] selector = nullSafeGetBytes(subscriptionInfo.getSelector());
+ byte[] subscribedDestination = getBytes(getDestinationKey(subscriptionInfo.getSubscribedDestination()));
+ byte[] lastAck = getBytes(ack);
+ long current = timestamp();
+ if (subscriptionInfo.getSelector() != null) {
+ cols.add(getColumn(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), selector, current));
+ }
+ cols.add(getColumn(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), subscribedDestination, current));
+ cols.add(getColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), lastAck, current));
+ try {
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+ log.debug("created subscription on {} for client {} subscription {} with selector {} and lastAck {}",
+ new Object[]{
+ getDestinationKey(destination),
+ subscriptionInfo.getClientId(),
+ subscriptionInfo.getSubscriptionName(),
+ subscriptionInfo.getSelector(),
+ ack
+ });
+ } catch (Exception e) {
+ log.error("Exception addingSubscription:", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public SubscriptionInfo lookupSubscription(ActiveMQDestination destination, String clientId, String subscriptionName) {
+ ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
+ path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)));
+ try {
+ ColumnOrSuperColumn columnOrSuperColumn = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
+ log.debug("retrieved supercolumn of {} for client {} subscriptionName {}", new Object[]{getDestinationKey(destination), clientId, subscriptionName});
+ SuperColumn superColumn = columnOrSuperColumn.getSuper_column();
+ SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
+ subscriptionInfo.setClientId(clientId);
+ subscriptionInfo.setSubscriptionName(subscriptionName);
+ subscriptionInfo.setDestination(destination);
+ byte type = destination.isTopic() ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE;
+ for (Column column : superColumn.getColumns()) {
+ if (Arrays.equals(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), column.getName())) {
+ //skip
+ } else if (Arrays.equals(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), column.getName())) {
+ String selector = getString(column.getValue());
+ subscriptionInfo.setSelector(selector);
+ } else if (Arrays.equals(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), column.getName())) {
+ String name = getString(column.getValue());
+ subscriptionInfo.setSubscribedDestination(ActiveMQDestination.createDestination(name, type));
+ } else {
+ log.error("Recieved unexpected column from Subscription Super Column {}", getString(column.getName()));
+ }
+ }
+
+ return subscriptionInfo;
+ } catch (NotFoundException e) {
+ log.warn("lookupSubsctription({},{}) found no subscription, returning null", clientId, subscriptionName);
+ return null;
+ } catch (Exception e) {
+ log.error("Exception in lookupSubscription", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public SubscriptionInfo[] lookupAllSubscriptions(ActiveMQDestination destination) {
+ ColumnParent path = new ColumnParent(SUBSCRIPTIONS_FAMILY.string());
+ SlicePredicate slicePredicate = new SlicePredicate();
+ SliceRange sliceRange = new SliceRange(new byte[0], new byte[0], false, Integer.MAX_VALUE);
+ slicePredicate.setSlice_range(sliceRange);
+
+ try {
+ List<ColumnOrSuperColumn> coscs = getCassandraConnection().get_slice(KEYSPACE.string(), getDestinationKey(destination), path, slicePredicate, consistencyLevel);
+ List<SubscriptionInfo> info = new ArrayList<SubscriptionInfo>(coscs.size());
+ for (ColumnOrSuperColumn columnOrSuperColumn : coscs) {
+ SuperColumn superColumn = columnOrSuperColumn.getSuper_column();
+ SubscriptionInfo subscriptionInfo = new SubscriptionInfo();
+ subscriptionInfo.setClientId(getClientIdFromSubscriptionSuperColumnName(superColumn));
+ subscriptionInfo.setSubscriptionName(getSubscriptionNameFromSubscriptionSuperColumnName(superColumn));
+ subscriptionInfo.setDestination(destination);
+ byte type = destination.isTopic() ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE;
+ for (Column column : superColumn.getColumns()) {
+ if (Arrays.equals(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), column.getName())) {
+ //skip
+ } else if (Arrays.equals(SUBSCRIPTIONS_SELECTOR_SUBCOLUMN.bytes(), column.getName())) {
+ String selector = getString(column.getValue());
+ subscriptionInfo.setSelector(selector);
+ } else if (Arrays.equals(SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN.bytes(), column.getName())) {
+ String name = getString(column.getValue());
+ subscriptionInfo.setSubscribedDestination(ActiveMQDestination.createDestination(name, type));
+ } else {
+ log.error("Recieved unexpected column from Subscription Super Column {}", getString(column.getName()));
+ }
+ }
+ info.add(subscriptionInfo);
+ }
+
+
+ return info.toArray(new SubscriptionInfo[0]);
+ } catch (Exception e) {
+ log.error("Exception in lookupSubscription", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void acknowledge(ActiveMQDestination destination, String clientId, String subscriptionName, MessageId messageId) {
+
+
+ Map<String, Map<String, List<Mutation>>> mutations = map();
+ Map<String, List<Mutation>> saveMutation = map();
+ List<Mutation> mutationList = list();
+ Mutation insert = new Mutation();
+ mutationList.add(insert);
+ mutations.put(getDestinationKey(destination), saveMutation);
+ saveMutation.put(SUBSCRIPTIONS_FAMILY.string(), mutationList);
+
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ insert.setColumn_or_supercolumn(columnOrSuperColumn);
+ List<Column> cols = list();
+ SuperColumn superColumn = new SuperColumn(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)), cols);
+ columnOrSuperColumn.setSuper_column(superColumn);
+
+ long lastAckStoreId = getStoreId(destination, messageId);
+ byte[] lastAck = getBytes(lastAckStoreId);
+ long timestamp = timestamp();
+ cols.add(getColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes(), lastAck, timestamp));
+
+ try {
+ getCassandraConnection().batch_mutate(KEYSPACE.string(), mutations, consistencyLevel);
+ log.debug("Acked {} for client {} sub {}", new Object[]{messageId.getBrokerSequenceId(), clientId, subscriptionName});
+ } catch (Exception e) {
+ log.error("Exception acking:", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public void deleteSubscription(ActiveMQDestination destination, String clientId, String subscriptionName) {
+ ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
+ path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientId, subscriptionName)));
+ try {
+ getCassandraConnection().remove(KEYSPACE.string(), getDestinationKey(destination), path, timestamp(), consistencyLevel);
+ log.debug("deletedSubscription on {} for client {} subscriptionName {}", new Object[]{getDestinationKey(destination), clientId, subscriptionName});
+ } catch (Exception e) {
+ log.error("Exception in deleteSubscription", e);
+ discacrdCassandraConnection();
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ public int getMessageCountFrom(ActiveMQDestination destination, long storeId) {
+ SlicePredicate predicate = new SlicePredicate();
+ SliceRange range = new SliceRange(getBytes(storeId), new byte[0], false, Integer.MAX_VALUE);
+ predicate.setSlice_range(range);
+ try {
+ List<ColumnOrSuperColumn> coscs = getCassandraConnection().get_slice(KEYSPACE.string(), getDestinationKey(destination), new ColumnParent(STORE_IDS_IN_USE_FAMILY.string()), predicate, consistencyLevel);
+ return coscs.size();
+ } catch (Exception e) {
+ log.error("Exception in getMessageCountFrom {}:{}", getDestinationKey(destination), storeId);
+ log.error("Ex:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public int getLastAckStoreId(ActiveMQDestination destination, String clientid, String subsriptionName) {
+ ColumnPath path = new ColumnPath(SUBSCRIPTIONS_FAMILY.string());
+ path.setSuper_column(getBytes(getSubscriptionSuperColumnName(clientid, subsriptionName)));
+ path.setColumn(SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN.bytes());
+ try {
+ ColumnOrSuperColumn cosc = getCassandraConnection().get(KEYSPACE.string(), getDestinationKey(destination), path, consistencyLevel);
+ long result = getLong(cosc.getColumn().getValue());
+ return Long.valueOf(result).intValue();
+ } catch (NotFoundException e) {
+ log.debug("LastAckStoreId not found, returning 0");
+ return 0;
+ } catch (Exception e) {
+ log.error("Exception in getLastAckStoreId, {} {} {}", new Object[]{getDestinationKey(destination), clientid, subsriptionName});
+ log.error("Ex:", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ private static String getSubscriptionSuperColumnName(SubscriptionInfo info) {
+ return info.getClientId() + SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER + nullSafeGetSubscriptionName(info);
+ }
+
+ private static String nullSafeGetSubscriptionName(SubscriptionInfo info) {
+ return info.getSubscriptionName() != null ? info.getSubscriptionName() : SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME;
+ }
+
+ private static String getSubscriptionSuperColumnName(String clientId, String subscriptionName) {
+ return clientId + SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER + (subscriptionName != null ? subscriptionName : SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME);
+ }
+
+ public static String getSubscriberId(String clientId, String subscriptionName) {
+ return getSubscriptionSuperColumnName(clientId, subscriptionName);
+ }
+
+ private static String getClientIdFromSubscriptionSuperColumnName(SuperColumn superColumn) {
+ String key = getString(superColumn.getName());
+ String[] split = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER);
+ return split[0];
+ }
+
+ private static String getSubscriptionNameFromSubscriptionSuperColumnName(SuperColumn superColumn) {
+ String key = getString(superColumn.getName());
+ String[] split = key.split(SUBSCRIPTIONS_CLIENT_SUBSCRIPTION_DELIMITER);
+ if (split[1].equals(SUBSCRIPTIONS_DEFAULT_SUBSCRIPTION_NAME)) {
+ return null;
+ } else {
+ return split[1];
+ }
+ }
+
+ /*util*/
+
+ private static <K, V> Map<K, V> map() {
+ return new HashMap<K, V>();
+ }
+
+ private static <I> List<I> list() {
+ return new ArrayList<I>();
+ }
+
+ private static <I> Set<I> set() {
+ return new HashSet<I>();
+ }
+
+
+}
50 src/main/java/org/apache/activemq/store/cassandra/CassandraIdentifier.java
@@ -0,0 +1,50 @@
+package org.apache.activemq.store.cassandra;
+
+/**
+ *
+ */
+public enum CassandraIdentifier {
+
+ KEYSPACE("MessageStore"),
+
+ BROKER_FAMILY("Broker"),
+ BROKER_KEY("Broker"),
+ BROKER_DESTINATION_COUNT("destination-count"),
+
+
+ DESTINATIONS_FAMILY("Destinations"),
+ DESTINATION_IS_TOPIC_COLUMN("isTopic"),
+ DESTINATION_MAX_STORE_SEQUENCE_COLUMN("max-store-sequence"),
+ DESTINATION_MAX_BROKER_SEQUENCE_COLUMN("max-broker-sequence"),
+ DESTINATION_QUEUE_SIZE_COLUMN("queue-size"),
+
+
+ MESSAGES_FAMILY("Messages"),
+
+ MESSAGE_TO_STORE_ID_FAMILY("MessageIdToStoreId"),
+
+ STORE_IDS_IN_USE_FAMILY("StoreIdsInUse"),
+
+
+ SUBSCRIPTIONS_FAMILY("Subscriptions"),
+ SUBSCRIPTIONS_SELECTOR_SUBCOLUMN("selector"),
+ SUBSCRIPTIONS_LAST_ACK_SUBCOLUMN("lastMessageAck"),
+ SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN("subscribedDestination");
+
+ private byte[] bytes;
+ private String string;
+
+ CassandraIdentifier(String id) {
+ string = id;
+ bytes = CassandraUtils.getBytes(string);
+ }
+
+ public String string() {
+ return string;
+ }
+
+
+ public byte[] bytes() {
+ return bytes;
+ }
+}
127 src/main/java/org/apache/activemq/store/cassandra/CassandraMessageStore.java
@@ -0,0 +1,127 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.AbstractMessageStore;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.usage.MemoryUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class CassandraMessageStore extends AbstractMessageStore {
+
+ Logger log = LoggerFactory.getLogger(CassandraMessageStore.class);
+ private CassandraPersistenceAdapter adapter;
+
+ private MemoryUsage memoryUsage;
+ protected AtomicLong lastStoreSequenceId = new AtomicLong(-1);
+ private AtomicLong queueSize = new AtomicLong(0);
+
+
+ public CassandraMessageStore(CassandraPersistenceAdapter adapter, ActiveMQDestination destination) {
+ super(destination);
+ this.adapter = adapter;
+ }
+
+ public void addMessage(ConnectionContext context, Message message) throws IOException {
+ getAdapter().getCassandra().saveMessage(destination, getAdapter().getStoreSequenceGenerator().incrementAndGet(), message.getMessageId(), getAdapter().marshall(message), queueSize);
+ }
+
+ public Message getMessage(MessageId identity) throws IOException {
+ long id = getAdapter().getCassandra().getStoreId(destination, identity);
+ byte[] messageBytes = getAdapter().getCassandra().getMessage(destination, id);
+ return getAdapter().unmarshall(messageBytes);
+
+ }
+
+ public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
+ MessageId id = ack.getLastMessageId();
+ getAdapter().getCassandra().deleteMessage(destination, id, queueSize);
+ }
+
+ public void removeAllMessages(ConnectionContext context) throws IOException {
+ getAdapter().getCassandra().deleteAllMessages(destination, queueSize);
+ }
+
+
+ public void recover(MessageRecoveryListener container) throws Exception {
+ List<byte[]> messages = getAdapter().getCassandra().recoverMessages(destination, new AtomicLong(-1), Integer.MAX_VALUE);
+ for (byte[] message : messages) {
+ if (!container.recoverMessage(getAdapter().unmarshall(message))) {
+ break;
+ }
+ }
+ }
+
+ public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
+ List<byte[]> messages = getAdapter().getCassandra().recoverMessages(destination, lastStoreSequenceId, maxReturned);
+ Message lastRecovered = null;
+ for (int i = 0; i < messages.size(); i++) {
+ Message real = getAdapter().unmarshall(messages.get(i));
+ if (listener.recoverMessage(real)) {
+ lastRecovered = real;
+ if (log.isDebugEnabled()) {
+ log.debug("recovered message with BrokerSequence:{}", real.getMessageId().getBrokerSequenceId());
+ }
+
+ } else {
+ log.debug("stopped recovery");
+ break;
+ }
+ }
+ if (lastRecovered != null) {
+ long lastStore = getAdapter().getCassandra().getStoreId(destination, lastRecovered.getMessageId());
+ lastStoreSequenceId.set(lastStore + 1);
+ }
+ }
+
+ public void setBatch(MessageId messageId) throws Exception {
+ long storeId = getAdapter().getCassandra().getStoreId(destination, messageId);
+ lastStoreSequenceId.set(storeId + 1);
+ log.debug("setBatch {}", lastStoreSequenceId.get());
+ }
+
+ public void resetBatching() {
+ lastStoreSequenceId.set(-1);
+ log.debug("resetBatch {}", lastStoreSequenceId.get());
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+
+ public int getMessageCount() throws IOException {
+ return queueSize.intValue();
+ }
+
+
+ public void start() throws Exception {
+ log.debug("start()");
+ int count = getAdapter().getCassandra().getMessageCount(destination);
+ queueSize.set(count);
+ if (log.isDebugEnabled()) {
+ log.debug("Destination: {} has {} ", CassandraUtils.getDestinationKey(destination), queueSize.get());
+ }
+ }
+
+ public void stop() throws Exception {
+ if (log.isDebugEnabled()) {
+ log.debug("stop()");
+ log.debug("Destination: {} has {} ", CassandraUtils.getDestinationKey(destination), queueSize.get());
+ log.debug("Store: {} has {} ", CassandraUtils.getDestinationKey(destination), getAdapter().getCassandra().getMessageCount(destination));
+ }
+ }
+
+
+ protected CassandraPersistenceAdapter getAdapter() {
+ return adapter;
+ }
+}
249 src/main/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapter.java
@@ -0,0 +1,249 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.BrokerServiceAware;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.TopicMessageStore;
+import org.apache.activemq.store.TransactionStore;
+import org.apache.activemq.store.memory.MemoryTransactionStore;
+import org.apache.activemq.usage.SystemUsage;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class CassandraPersistenceAdapter implements PersistenceAdapter, BrokerServiceAware {
+
+ private Logger log = LoggerFactory.getLogger(CassandraPersistenceAdapter.class);
+
+ private MemoryTransactionStore transactionStore;
+ private AtomicLong sequenceGenerator = new AtomicLong(0);
+ private AtomicInteger destinationCount;
+ private WireFormat wireFormat = new OpenWireFormat();
+ private CassandraClient cassandra;
+ private ConcurrentMap<ActiveMQQueue, CassandraMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, CassandraMessageStore>();
+ private ConcurrentMap<ActiveMQTopic, CassandraTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, CassandraTopicMessageStore>();
+ private BrokerService brokerService;
+ private MasterElector masterElector;
+
+ public MasterElector getMasterElector() {
+ return masterElector;
+ }
+
+ public void setMasterElector(MasterElector masterElector) {
+ this.masterElector = masterElector;
+ }
+
+ public void setCassandraClient(CassandraClient cassandraClient) {
+ this.cassandra = cassandraClient;
+ }
+
+ public CassandraClient getCassandra() {
+ return cassandra;
+ }
+
+ public WireFormat getWireFormat() {
+ return wireFormat;
+ }
+
+ public Set<ActiveMQDestination> getDestinations() {
+ return cassandra.getDestinations();
+ }
+
+ public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
+ CassandraMessageStore store = queues.get(destination);
+ if (store == null) {
+ cassandra.createDestination(CassandraUtils.getDestinationKey(destination), false, destinationCount);
+ store = new CassandraMessageStore(this, destination);
+ try {
+ store.start();
+ } catch (Exception e) {
+ log.error("Error Starting queue:" + CassandraUtils.getDestinationKey(destination), e);
+ throw new IOException(e);
+ }
+ queues.putIfAbsent(destination, store);
+ store = queues.get(destination);
+ }
+ return transactionStore.proxy(store);
+ }
+
+ public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
+ CassandraTopicMessageStore store = topics.get(destination);
+ if (store == null) {
+ cassandra.createDestination(CassandraUtils.getDestinationKey(destination), true, destinationCount);
+ store = new CassandraTopicMessageStore(this, destination);
+ try {
+ store.start();
+ } catch (Exception e) {
+ log.error("Error Starting queue:" + CassandraUtils.getDestinationKey(destination), e);
+ throw new IOException(e);
+ }
+
+ topics.putIfAbsent(destination, store);
+ store = topics.get(destination);
+ }
+ return transactionStore.proxy(store);
+ }
+
+ public void removeQueueMessageStore(ActiveMQQueue destination) {
+ log.warn("removeQueueMessageStore for {}", destination.getQualifiedName());
+ cassandra.deleteQueue(destination, destinationCount);
+ }
+
+ public void removeTopicMessageStore(ActiveMQTopic destination) {
+ log.warn("removeTopicMessageStore for {}", destination.getQualifiedName());
+ cassandra.deleteTopic(destination, destinationCount);
+ }
+
+ public TransactionStore createTransactionStore() throws IOException {
+ if (transactionStore == null) {
+ transactionStore = new MemoryTransactionStore(this);
+ }
+ return this.transactionStore;
+ }
+
+ public void beginTransaction(ConnectionContext context) throws IOException {
+ log.debug("beginTransaction");
+ }
+
+ public void commitTransaction(ConnectionContext context) throws IOException {
+ log.debug("commitTransaction");
+ }
+
+ public void rollbackTransaction(ConnectionContext context) throws IOException {
+ log.debug("rollbackTransaction");
+ }
+
+ public long getLastMessageBrokerSequenceId() throws IOException {
+ DestinationMaxIds max = cassandra.getMaxStoreId();
+ sequenceGenerator.set(max.getMaxStoreId());
+ long brokerSeq = max.getMaxBrokerSeq();
+ log.debug("getLastSequence: store {}, broker {}", sequenceGenerator.get(), brokerSeq);
+ return brokerSeq;
+ }
+
+ public AtomicLong getStoreSequenceGenerator() {
+ return sequenceGenerator;
+ }
+
+ public void deleteAllMessages() throws IOException {
+ Set<ActiveMQDestination> destinations = getDestinations();
+ for (ActiveMQDestination destination : destinations) {
+ if (destination instanceof ActiveMQTopic) {
+ removeTopicMessageStore((ActiveMQTopic) destination);
+ } else {
+ removeQueueMessageStore((ActiveMQQueue) destination);
+ }
+ }
+ }
+
+ public void setUsageManager(SystemUsage usageManager) {
+
+ }
+
+ public void setBrokerName(String brokerName) {
+
+ }
+
+ public void setDirectory(File dir) {
+
+ }
+
+ public void checkpoint(boolean sync) throws IOException {
+
+ }
+
+ public long size() {
+ return 0;
+ }
+
+ public void start() throws Exception {
+ //Zookeeper master election
+ if (masterElector != null) {
+ masterElector.setMasterLostHandler(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ log.warn("Lost Master Status, stopping broker");
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ /*
+ It would be good if we could just restart the brokerService and
+ have it restart the master elector and wait till it is master again
+ but the BrokerService seems to put an ErrorBroker in such that
+ once you stop the broker service, you cant force restart it.
+
+ org.apache.activemq.broker.BrokerStoppedException:
+ Broker has been stopped:
+ org.apache.activemq.broker.BrokerService$3@1d840cd
+ at org.apache.activemq.broker.ErrorBroker.nowMasterBroker(ErrorBroker.java:297)
+ at org.apache.activemq.broker.MutableBrokerFilter.nowMasterBroker(MutableBrokerFilter.java:307)
+ see jiras AMQ-2245 and 2503
+ so the following line of code is commented out
+ */
+ //brokerService.start(true);
+ /*
+ Assume that this is running in a java service wrapper that would restart the whole process.
+ */
+ } catch (Exception e) {
+ log.error("Exception Stopping Broker when Master Status Lost!", e);
+ }
+ }
+ });
+
+ masterElector.start();
+ log.debug("Master Elector started, waiting to be master");
+ masterElector.waitTillMaster();
+ isMaster();
+ } else {
+ isMaster();
+ }
+
+ }
+
+ private void isMaster() throws Exception {
+ log.info("This Broker is now Master");
+ int count = cassandra.getDestinationCount();
+ destinationCount = new AtomicInteger(count);
+ brokerService.getBroker().nowMasterBroker();
+ }
+
+
+ public void stop() throws Exception {
+ masterElector.stop();
+ }
+
+
+ byte[] marshall(Message message) throws IOException {
+ ByteSequence byteSequence = getWireFormat().marshal(message);
+ return byteSequence.getData();
+ }
+
+ Message unmarshall(byte[] message) throws IOException {
+ ByteSequence byteSequence = new ByteSequence(message);
+ return (Message) getWireFormat().unmarshal(byteSequence);
+ }
+
+ @Override
+ public void setBrokerService(BrokerService brokerService) {
+ this.brokerService = brokerService;
+ }
+}
68 src/main/java/org/apache/activemq/store/cassandra/CassandraPersistenceAdapterFactory.java
@@ -0,0 +1,68 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.store.PersistenceAdapterFactory;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ *
+ */
+public class CassandraPersistenceAdapterFactory implements PersistenceAdapterFactory {
+
+ private Logger log = Logger.getLogger(CassandraPersistenceAdapterFactory.class);
+ private String cassandraHost;
+ private int cassandraPort;
+ private Cassandra.Client cassandra;
+ private String zookeeperConnectString;
+ private ConsistencyLevel consistencyLevel;
+
+
+ public PersistenceAdapter createPersistenceAdapter() throws IOException {
+ CassandraPersistenceAdapter adapter = new CassandraPersistenceAdapter();
+ CassandraClient client = new CassandraClient();
+ client.setCassandraHost(cassandraHost);
+ client.setCassandraPort(cassandraPort);
+ client.setConsistencyLevel(consistencyLevel);
+ adapter.setCassandraClient(client);
+ ZooKeeperMasterElector zookeeperMasterElector = new ZooKeeperMasterElector();
+ zookeeperMasterElector.setZookeeperConnectString(zookeeperConnectString);
+ adapter.setMasterElector(zookeeperMasterElector);
+ return adapter;
+ }
+
+ public int getCassandraPort() {
+ return cassandraPort;
+ }
+
+ public void setCassandraPort(int cassandraPort) {
+ this.cassandraPort = cassandraPort;
+ }
+
+ public String getCassandraHost() {
+ return cassandraHost;
+ }
+
+ public void setCassandraHost(String cassandraHost) {
+ this.cassandraHost = cassandraHost;
+ }
+
+ public String getZookeeperConnectString() {
+ return zookeeperConnectString;
+ }
+
+ public void setZookeeperConnectString(String zookeeperConnectString) {
+ this.zookeeperConnectString = zookeeperConnectString;
+ }
+
+ public ConsistencyLevel getConsistencyLevel() {
+ return consistencyLevel;
+ }
+
+ public void setConsistencyLevel(ConsistencyLevel consistencyLevel) {
+ this.consistencyLevel = consistencyLevel;
+ }
+}
114 src/main/java/org/apache/activemq/store/cassandra/CassandraTopicMessageStore.java
@@ -0,0 +1,114 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.*;
+import org.apache.activemq.store.MessageRecoveryListener;
+import org.apache.activemq.store.TopicMessageStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ */
+public class CassandraTopicMessageStore extends CassandraMessageStore implements TopicMessageStore {
+
+ private Logger log = LoggerFactory.getLogger(CassandraTopicMessageStore.class);
+ private Map<String, AtomicLong> subscriberLastMessageMap = new ConcurrentHashMap<String, AtomicLong>();
+
+ public CassandraTopicMessageStore(CassandraPersistenceAdapter cassandra, ActiveMQTopic destination) {
+ super(cassandra, destination);
+ }
+
+
+ @Override
+ public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
+ getAdapter().getCassandra().acknowledge(getDestination(), clientId, subscriptionName, messageId);
+ }
+
+ @Override
+ public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+ getAdapter().getCassandra().deleteSubscription(getDestination(), clientId, subscriptionName);
+ }
+
+ @Override
+ public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
+ long lastAcked = getAdapter().getCassandra().getLastAckStoreId(getDestination(), clientId, subscriptionName);
+ AtomicLong last = new AtomicLong(lastAcked + 1);
+ List<byte[]> messages = getAdapter().getCassandra().recoverMessages(getDestination(), last, Integer.MAX_VALUE);
+ for (byte[] message : messages) {
+ if (!listener.recoverMessage(getAdapter().unmarshall(message))) {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, MessageRecoveryListener listener) throws Exception {
+ String subcriberId = CassandraClient.getSubscriberId(clientId, subscriptionName);
+ AtomicLong last = subscriberLastMessageMap.get(subcriberId);
+ if (last == null) {
+ long lastAcked = getAdapter().getCassandra().getLastAckStoreId(getDestination(), clientId, subscriptionName);
+ last = new AtomicLong(lastAcked + 1);
+ subscriberLastMessageMap.put(subcriberId, last);
+ }
+ List<byte[]> messages = getAdapter().getCassandra().recoverMessages(getDestination(), last, maxReturned);
+ Message lastRecovered = null;
+ for (int i = 0; i < messages.size(); i++) {
+ Message real = getAdapter().unmarshall(messages.get(i));
+ if (listener.hasSpace()) {
+ listener.recoverMessage(real);
+ lastRecovered = real;
+ if (log.isDebugEnabled()) {
+ log.debug("recovered message with BrokerSequence:{} for {}", real.getMessageId().getBrokerSequenceId(), subcriberId);
+ }
+
+ } else {
+ log.debug("stopped recovery");
+ break;
+ }
+ }
+ if (lastRecovered != null) {
+ long lastStore = getAdapter().getCassandra().getStoreId(getDestination(), lastRecovered.getMessageId());
+ last.set(lastStore + 1);
+ }
+ }
+
+ @Override
+ public void resetBatching(String clientId, String subscriptionName) {
+ String id = CassandraClient.getSubscriberId(clientId, subscriptionName);
+ subscriberLastMessageMap.remove(id);
+ }
+
+ @Override
+ public int getMessageCount(String clientId, String subscriberName) throws IOException {
+ long storeId = getAdapter().getCassandra().getLastAckStoreId(getDestination(), clientId, subscriberName);
+ return getAdapter().getCassandra().getMessageCountFrom(getDestination(), storeId);
+ }
+
+ @Override
+ public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+ return getAdapter().getCassandra().lookupSubscription(getDestination(), clientId, subscriptionName);
+ }
+
+ @Override
+ public SubscriptionInfo[] getAllSubscriptions() throws IOException {
+ return getAdapter().getCassandra().lookupAllSubscriptions(getDestination());
+ }
+
+ @Override
+ public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+ long lastAck = -1;
+ if (!retroactive) {
+ lastAck = getAdapter().getLastMessageBrokerSequenceId();
+ }
+ getAdapter().getCassandra().addSubscription(getDestination(), subscriptionInfo, lastAck);
+ }
+
+
+}
164 src/main/java/org/apache/activemq/store/cassandra/CassandraUtils.java
@@ -0,0 +1,164 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.cassandra.thrift.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.*;
+import java.util.Collections;
+
+
+public class CassandraUtils {
+
+ static Logger log = LoggerFactory.getLogger(CassandraUtils.class);
+
+ public static String getString(byte[] bytes) {
+ try {
+ return new String(bytes, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+
+ public static long getLong(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ LongBuffer lBuffer = byteBuffer.asLongBuffer();
+ return lBuffer.get();
+ }
+
+ public static byte[] getBytes(long num) {
+ byte[] bArray = new byte[8];
+ ByteBuffer bBuffer = ByteBuffer.wrap(bArray);
+ LongBuffer lBuffer = bBuffer.asLongBuffer();
+ lBuffer.put(num);
+ return bArray;
+ }
+
+ public static int getInt(byte[] bytes) {
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ IntBuffer iBuffer = byteBuffer.asIntBuffer();
+ return iBuffer.get();
+ }
+
+ public static byte[] getBytes(int num) {
+ byte[] bArray = new byte[4];
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bArray);
+ IntBuffer iBuffer = byteBuffer.asIntBuffer();
+ iBuffer.put(num);
+ return bArray;
+ }
+
+
+ public static long safeGetLong(byte[] bytes) {
+ if (bytes.length != 8) {
+ log.debug("bytes length was {}, not 8, returning -1", bytes.length);
+ return -1L;
+ } else {
+ return getLong(bytes);
+ }
+ }
+
+ public static boolean getBoolean(byte[] bytes) {
+ return Boolean.parseBoolean(getString(bytes));
+ }
+
+
+ public static byte[] getBytes(String string) {
+ try {
+ return string.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Return empty byte array if string is null or ""
+ *
+ * @param string
+ * @return
+ */
+ public static byte[] nullSafeGetBytes(String string) {
+ if (string == null || "".equals(string)) {
+ return new byte[0];
+ } else {
+ return getBytes(string);
+ }
+ }
+
+
+ private static byte[] getBytes(boolean bool) {
+ return getBytes(Boolean.toString(bool));
+ }
+
+
+ public static long timestamp() {
+ return System.currentTimeMillis();
+ }
+
+ public static Mutation getInsertOrUpdateColumnMutation(byte[] name, byte[] val, Long timestamp) {
+ if (timestamp == null) {
+ timestamp = timestamp();
+ }
+ Mutation insert = new Mutation();
+ ColumnOrSuperColumn c = new ColumnOrSuperColumn();
+ c.setColumn(getColumn(name, val, timestamp));
+ insert.setColumn_or_supercolumn(c);
+ return insert;
+ }
+
+ public static Mutation getDeleteColumnMutation(byte[] column, long timestamp) {
+ Mutation mutation = new Mutation();
+ Deletion deletion = new Deletion(timestamp);
+ SlicePredicate predicate = new SlicePredicate();
+ predicate.setColumn_names(Collections.singletonList(column));
+ deletion.setPredicate(predicate);
+ mutation.setDeletion(deletion);
+ return mutation;
+ }
+
+
+
+ public static Mutation getInsertOrUpdateColumnMutation(String name, String val) {
+ return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(val), null);
+ }
+
+ public static Mutation getInsertOrUpdateColumnMutation(String name, long val) {
+ return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(val), null);
+ }
+
+ public static Mutation getInsertOrUpdateColumnMutation(String name, boolean val) {
+ return getInsertOrUpdateColumnMutation(getBytes(name), getBytes(Boolean.toString(val)), null);
+ }
+
+ public static Column getColumn(byte[] name, byte[] val) {
+ return new Column(name, val, timestamp());
+ }
+
+ public static Column getColumn(byte[] name, byte[] val, long timestamp) {
+ return new Column(name, val, timestamp);
+ }
+
+ public static Mutation getInsertOrUpdateSuperColumnMutation(long supername, String name, byte[] val) {
+ return getInsertOrUpdateSuperColumnMutation(getBytes(supername), getBytes(name), val);
+ }
+
+ public static Mutation getInsertOrUpdateSuperColumnMutation(byte[] supername, byte[] name, byte[] val) {
+ Mutation mutation = new Mutation();
+ ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
+ SuperColumn superColumn = new SuperColumn();
+ superColumn.setName(supername);
+ Column column = getColumn(name, val);
+ superColumn.addToColumns(column);
+ columnOrSuperColumn.setSuper_column(superColumn);
+ mutation.setColumn_or_supercolumn(columnOrSuperColumn);
+ return mutation;
+ }
+
+ public static String getDestinationKey(ActiveMQDestination destination) {
+ String key = destination.getQualifiedName();
+ return key;
+ }
+}
33 src/main/java/org/apache/activemq/store/cassandra/DestinationMaxIds.java
@@ -0,0 +1,33 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class DestinationMaxIds {
+
+ private ActiveMQDestination destination;
+ private long maxStoreId;
+ private long maxBrokerSeq;
+
+ public DestinationMaxIds(ActiveMQDestination destination, long maxStoreId, long maxBrokerSeq) {
+ this.destination = destination;
+ this.maxStoreId = maxStoreId;
+ this.maxBrokerSeq = maxBrokerSeq;
+ }
+
+ public ActiveMQDestination getDestination() {
+ return destination;
+ }
+
+ public long getMaxStoreId() {
+ return maxStoreId;
+ }
+
+ public long getMaxBrokerSeq() {
+ return maxBrokerSeq;
+ }
+}
17 src/main/java/org/apache/activemq/store/cassandra/MasterElector.java
@@ -0,0 +1,17 @@
+package org.apache.activemq.store.cassandra;
+
+/**
+ *
+ */
+public interface MasterElector {
+
+ void waitTillMaster();
+
+ void setMasterLostHandler(Runnable handler);
+
+ void start();
+
+ void stop();
+
+ boolean isMaster();
+}
136 src/main/java/org/apache/activemq/store/cassandra/ZooKeeperMasterElector.java
@@ -0,0 +1,136 @@
+package org.apache.activemq.store.cassandra;
+
+import org.apache.zookeeper.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ *
+ */
+public class ZooKeeperMasterElector implements MasterElector, Watcher {
+
+
+ private static Logger log = LoggerFactory.getLogger(ZooKeeperMasterElector.class);
+ private ZooKeeper zk;
+ private String path;
+ private List<String> paths;
+ private ReentrantLock lock = new ReentrantLock();
+ private Condition condition = lock.newCondition();
+ private Runnable lostMasterHandler;
+ private AtomicBoolean isMaster = new AtomicBoolean(false);
+ private String connectString;
+
+ @Override
+ public boolean isMaster() {
+ return isMaster.get();
+ }
+
+ @Override
+ public void waitTillMaster() {
+ while (!isMaster.get()) {
+ try {
+ lock.lock();
+ if (path.equals(paths.get(0))) {
+ log.warn("Path {} is the first path, this node is master", path);
+ isMaster.set(true);
+ return;
+ } else {
+ log.warn("Path {} is not the first path, waiting...", path);
+ condition.await();
+ }
+ } catch (InterruptedException e) {
+ log.error("InterruptedException while waiting on new paths", e);
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void setMasterLostHandler(Runnable handler) {
+ lostMasterHandler = handler;
+ }
+
+ private void lostConnection() {
+ log.warn("Lost Zookeeper Connection");
+ if (isMaster.compareAndSet(true, false)) {
+ log.error("Lost Zookeeper Connection and acting master, running lostMasterhandler");
+ if (lostMasterHandler != null) {
+ lostMasterHandler.run();
+ } else {
+ log.error("Lost Zookeeper Connection and acting master, but lostMasterhandler is NULL!!");
+ }
+ }
+ }
+
+ private void setPaths(List<String> newPaths) {
+ lock.lock();
+ Collections.sort(newPaths);
+ paths = newPaths;
+ for (String newPath : newPaths) {
+ log.info("AllPaths:{}", newPath);
+ }
+ condition.signalAll();
+ lock.unlock();
+ }
+
+ @Override
+ public void start() {
+ try {
+ if (connectString == null) {
+ throw new IllegalStateException("connectString cannot be null");
+ }
+ zk = new ZooKeeper(connectString, 10000, this);
+ if (zk.exists("/qsandra", false) == null) {
+ zk.create("/qsandra", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ path = zk.create("/qsandra/broker_", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ path = path.substring("/qsandra/".length());
+ log.info("Path:{}", path);
+ setPaths(zk.getChildren("/qsandra", true));
+
+ } catch (Exception e) {
+ log.error("Exception in start", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+
+ log.debug("ZKEvent {} {} {} {}", new Object[]{event.getType().toString(), event.getPath(), event.getState().toString(), event.getWrapper()});
+ if (Event.KeeperState.Disconnected.equals(event.getState()) || Event.KeeperState.Expired.equals(event.getState())) {
+ lostConnection();
+ } else if (("/qsandra").equals(event.getPath()) && event.getType().equals(Event.EventType.NodeChildrenChanged)) {
+ try {
+ setPaths(zk.getChildren("/qsandra", true));
+ } catch (KeeperException e) {
+ log.error("KeeperException while getting updated children", e);
+ } catch (InterruptedException e) {
+ log.error("InterruptedException while getting updated children", e);
+ } catch (Exception e) {
+ log.error("Exception while processing event", e);
+ }
+ }
+
+ }
+
+ public void stop() {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ log.error("Exception in stop", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setZookeeperConnectString(String connect) {
+ connectString = connect;
+ }
+}
112 src/main/resources/keyspace.xml
@@ -0,0 +1,112 @@
+<Keyspace Name="MessageStore">
+ <!--
+ This is just the Keyspace for qsandra message store.
+ You would need to configure the ReplicaPlacementStrategy,ReplicationFactor,and EndpointSnitch
+ as appropriate for your usecase.
+ -->
+
+
+ <!--
+ row key: Broker(just 1 row per keyspace) {
+ destination-count: long
+ }
+ -->
+ <ColumnFamily Name="Broker" CompareWith="BytesType"/>
+
+
+ <!--
+ row key: destinationName
+ Destinations: {
+ test.topic : {
+ isTopic: true
+ max-store-sequence: long
+ max-broker-sequence: long
+ queue-size: long
+ } ,
+ test.destination : {
+ isTopic: false
+ lastSequence: long
+ max-store-sequence: long
+ max-broker-sequence: long
+ queue-size: long
+ }
+ }
+ -->
+ <ColumnFamily Name="Destinations" CompareWith="BytesType"/>
+