Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

install assembly and verification of the keysapce

  • Loading branch information...
commit f809f4c434a35936c68fc9e0e0e31385d153d3b2 1 parent fcbd1d3
@ticktock authored
View
20 pom.xml
@@ -4,7 +4,7 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
- <groupId>qsandra</groupId>
+ <groupId>ticktock</groupId>
<artifactId>qsandra</artifactId>
<version>1.1-SNAPSHOT</version>
@@ -165,6 +165,23 @@
</excludes>
</configuration>
</plugin>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>src/main/assembly/assembly.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>install</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -298,6 +315,7 @@
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
View
24 src/main/assembly/assembly.xml
@@ -0,0 +1,24 @@
+<assembly>
+ <id>install</id>
+ <baseDirectory>qsandra-install</baseDirectory>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <formats>
+ <format>zip</format>
+ <format>tar.gz</format>
+ </formats>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib/optional</outputDirectory>
+ <excludes>
+ <exclude>junit*</exclude>
+ <exclude>log4j*</exclude>
+ <exclude>commons-pool*</exclude>
+ <exclude>jline*</exclude>
+ <exclude>jms*</exclude>
+ <exclude>jmx*</exclude>
+ <exclude>activation*</exclude>
+ <exclude>mail*</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
View
66 src/main/scala/org/apache/activemq/store/cassandra/CassandraClient.scala
@@ -8,20 +8,18 @@ import CassandraClient._
import org.apache.cassandra.utils.BloomFilter
import grizzled.slf4j.Logger
import org.apache.activemq.store.cassandra.{DestinationMaxIds => Max}
-import org.apache.activemq.store.cassandra._
import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
import org.apache.activemq.command.{SubscriptionInfo, MessageId, ActiveMQDestination}
import collection.jcl.{ArrayList, HashSet, Set}
-import com.shorrockin.cascal.model.{SuperColumn, StandardKey, Key, Column}
-import collection.mutable.ListBuffer
-import org.apache.cassandra.thrift.{ConsistencyLevel, NotFoundException}
+import org.apache.cassandra.thrift.{NotFoundException}
+import java.lang.String
+import collection.mutable.{HashMap, ListBuffer}
class CassandraClient() {
-
-
@BeanProperty var cassandraHost: String = _
@BeanProperty var cassandraPort: Int = _
@BeanProperty var cassandraTimeout: Int = _
+ @BeanProperty var verifyKeyspace: Boolean = true;
protected var pool: SessionPool = null
@@ -30,6 +28,9 @@ class CassandraClient() {
val params = new PoolParams(20, ExhaustionPolicy.Fail, 500L, 6, 2)
var hosts = Host(cassandraHost, cassandraPort, cassandraTimeout) :: Nil
pool = new SessionPool(hosts, params, Consistency.Quorum)
+ if (verifyKeyspace) {
+ verifyMessageStoreKeyspace
+ }
}
def stop() = {
@@ -37,7 +38,7 @@ class CassandraClient() {
}
protected def withSession[E](block: Session => E): E = {
- pool.borrow{session => block(session)}
+ pool.borrow {session => block(session)}
}
def getDestinationCount(): Int = {
@@ -430,6 +431,53 @@ class CassandraClient() {
return getSubscriptionSuperColumnName(clientId, subscriptionName)
}
+ def verifyMessageStoreKeyspace(): Unit = {
+ withSession {
+ session =>
+
+ val keyspace = convertMap(session.client.describe_keyspace(KEYSPACE))
+
+ val stdCols: List[String] = MESSAGES_FAMILY :: STORE_IDS_IN_USE_FAMILY :: DESTINATIONS_FAMILY :: BROKER_FAMILY :: MESSAGE_TO_STORE_ID_FAMILY :: Nil
+ val superCols: List[String] = SUBSCRIPTIONS_FAMILY :: Nil
+ val allCols: List[String] = stdCols ++ superCols
+
+
+ allCols.foreach {
+ family =>
+ keyspace.get(family) match {
+ case Some(x) => None
+ case None => throw new RuntimeException("ColumnFamily: %s missing from keyspace".format(family))
+ }
+ }
+
+ stdCols.foreach {
+ family =>
+ keyspace.get(family) match {
+ case Some(map) => convertMap(map).get(DESCRIBE_CF_TYPE) match {
+ case Some(colType) => colType match {
+ case DESCRIBE_CF_TYPE_STANDARD => None
+ case _ => throw new RuntimeException("Type of the ColumnFamily was not expected to be:%s".format(colType))
+ }
+ case None => throw new RuntimeException("Type wasnt part of the column description")
+ }
+ }
+ }
+
+ superCols.foreach {
+ family =>
+ keyspace.get(family) match {
+ case Some(map) => convertMap(map).get(DESCRIBE_CF_TYPE) match {
+ case Some(colType) => colType match {
+ case DESCRIBE_CF_TYPE_SUPER => None
+ case _ => throw new RuntimeException("Type of the ColumnFamily was not expected to be:%s".format(colType))
+ }
+ case None => throw new RuntimeException("Type wasnt part of the column description")
+ }
+ }
+ }
+ }
+ }
+
}
object CassandraClient {
@@ -517,6 +565,10 @@ object CassandraClient {
val SUBSCRIPTIONS_SUB_DESTINATION_SUBCOLUMN = "subscribedDestination";
+ val DESCRIBE_CF_TYPE = "Type"
+ val DESCRIBE_CF_TYPE_STANDARD = "Standard"
+ val DESCRIBE_CF_TYPE_SUPER = "Super"
+
/*Subscriptions Column Family Constants*/
Please sign in to comment.
Something went wrong with that request. Please try again.