Permalink
Browse files

first 2.4.0.1-SNAPSHOT refactoring the producer to be capable of batc…

…hInserts, based upon an implementation of BatchCreator plugged into the endpoint.
  • Loading branch information...
1 parent a188cfc commit 29e5bd660c7dbfc70386547b388cf559b035dc73 @ticktock committed Oct 16, 2010
View
@@ -143,6 +143,7 @@ Here are some example URIs, which would apply default extractors to get the nece
<tr><td>cassandraPollingStrategy</td><td>None</td><td>If you specify an instance of CassndraPollingStrategy, it will be executes with StrategyBasedCassandraPolling</td></tr>
<tr><td>cassandraPollingMaxMessages</td><td>1000</td><td>The maximum mesages to return per poll</td></tr>
<tr><td>cassandraPollingMaxKeyRange</td><td>1000</td><td>The maximum mesages to specify in the KeyRange used to call get_range_slices during polling. See Note below.</td></tr>
+ <tr><td>batchCreator</td><td>instance of org.apache.camel.component.cassandra.DefaultBatchCreator</td><td>If the incoming exchange can be broken down into multiple inserts, you can specify an implementation of BatchCreator to create a List of Exchanges that will be inserted in batch</td></tr>
</table>
###A Note about the Default Polling implementation (before Cassandra 0.7)
View
@@ -10,6 +10,7 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>camel-cassandra</artifactId>
+ <version>2.4.0.1-SNAPSHOT</version>
<repositories>
<repository>
@@ -38,6 +39,7 @@
</repositories>
<properties>
+ <camel.version>2.4.0</camel.version>
<activemq.version>5.3.1</activemq.version>
<cassandra.version>0.6.1</cassandra.version>
<thrift.version>917130</thrift.version>
@@ -162,14 +164,19 @@
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
+ <version>${camel.version}</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
+ <version>${camel.version}</version>
+
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-scala</artifactId>
+ <version>${camel.version}</version>
+
</dependency>
<dependency>
@@ -0,0 +1,15 @@
+package org.apache.camel.component.cassandra;
+
+import org.apache.camel.Exchange;
+
+import java.util.List;
+
+/**
+ */
+public interface BatchCreator {
+
+
+ List<Exchange> createBatch(Exchange incoming);
+
+
+}
@@ -102,6 +102,7 @@ object CassandraComponent {
val keyExtractorOption = "keyExtractor"
val valueExtractorOption = "valueExtractor"
val dataFormatOption = "cassandraDataFormat"
+ val batchCreatorOption = "batchCreator"
val cassandraPollingOption = "cassandraPollingImpl"
val cassandraPollingStrategyOption = "cassandraPollingStrategy"
val cassandraPollingMaxMessagesOption = "cassandraPollingMaxMessages"
@@ -111,6 +112,8 @@ object CassandraComponent {
val columnHeader = "camel-cassandra-column"
val superColumnHeader = "camel-cassandra-supercolumn"
val keyHeader = "camel-cassandra-key"
+ val batchSizeHeader = "camel-casandra-batch-size"
+
}
@@ -33,11 +33,14 @@ class CassandraEndpoint(val uri: String, val context: CamelContext, val pool: Se
var pollingStrategy: Option[CassandraPollingStrategy] = None
var pollingMaxMessagesPerPoll: Int = 1000
var pollingMaxKeyRange: Int = 1000
+ var batchCreator: Option[BatchCreator] = None
parseUri
lookupExtractors
lookupDataFormat
lookupPollingOptions
+ lookupBatchCreator
+
private def parseUri(): Unit = {
@@ -156,5 +159,12 @@ class CassandraEndpoint(val uri: String, val context: CamelContext, val pool: Se
}
+ private def lookupBatchCreator():Unit={
+ batchCreator = lookupOptionBean(batchCreatorOption, classOf[BatchCreator]) match {
+ case None => Some(new DefaultBatchCreator)
+ case Some(creator) => Some(creator)
+ }
+ }
+
override def createEndpointUri = uri
}
@@ -15,6 +15,7 @@ import java.io.{OutputStream, ByteArrayOutputStream}
import reflect.BeanProperty
import collection.JavaConversions._
import com.shorrockin.cascal.model.Keyspace
+import collection.JavaConversions
/**
*
@@ -36,70 +37,87 @@ class CassandraProducer(val endpoint: CassandraEndpoint) extends DefaultProducer
var valueExtractor = defaultValueExtractor
overrideDefaultExtractors();
-
-
def process(exchange: Exchange): Unit = {
+ val exchanges = endpoint.batchCreator.get.createBatch(exchange)
endpoint.withSession {
session =>
- var keyspace: String = endpoint.keyspace.getOrElse(keyspaceExtractor.evaluate(exchange, classOf[String]))
- var columnfamily: String = endpoint.columnFamily.getOrElse(columnFamilyExtractor.evaluate(exchange, classOf[String]))
- var supercolumn: Option[String] = endpoint.superColumn match {
- case Some(sc) => Some(sc)
- case None => {
- superColumnExtractor.evaluate(exchange, classOf[String]) match {
- case null => None
- case str => Some(str)
- }
- }
- }
- var column: String = endpoint.column.getOrElse(columnExtractor.evaluate(exchange, classOf[String]))
- var key = endpoint.key.getOrElse(keyExtractor.evaluate(exchange, classOf[String]))
- var valueExtract: Any = valueExtractor.evaluate(exchange, classOf[Any])
- var value: Array[Byte] = new Array[Byte](0)
-
- endpoint.dataFormat match {
- case Some(format: DataFormat) => {
- val buffer = new ByteArrayOutputStream
- format.marshal(exchange, valueExtract, buffer)
- value = buffer.toByteArray
- }
- case None => {
- if (valueExtract.isInstanceOf[String]) value = bytes(valueExtract.asInstanceOf[String])
- else if (valueExtract.isInstanceOf[Array[Byte]]) value = valueExtract.asInstanceOf[Array[Byte]]
- else if (valueExtract.isInstanceOf[Long]) value = bytes(valueExtract.asInstanceOf[Long])
- else if (valueExtract.isInstanceOf[Date]) value = bytes(valueExtract.asInstanceOf[Date])
- else if (valueExtract.isInstanceOf[Float]) value = bytes(valueExtract.asInstanceOf[Float])
- else if (valueExtract.isInstanceOf[Double]) value = bytes(valueExtract.asInstanceOf[Double])
- else if (valueExtract.isInstanceOf[Int]) value = bytes(valueExtract.asInstanceOf[Int])
- else if (valueExtract.isInstanceOf[UUID]) value = bytes(valueExtract.asInstanceOf[UUID])
- //Else try to convert to byte[] with the valueExtractor, will throw NoSupportedConversion in worst case
- else value = valueExtractor.evaluate(exchange, classOf[Array[Byte]])
- }
+ val inserts = JavaConversions.asList(exchanges).map {
+ createInsertAndSetOutFor(_)
}
+ session.batch(inserts)
+ }
+ if(exchanges.size == 1 && exchanges.get(0).equals(exchange)){
+ //leave it alone
+ } else {
+ exchange.getOut.setBody(exchanges)
+ exchange.getOut.setHeader(batchSizeHeader, exchanges.size)
+ }
+
+ }
- val out: Message = exchange.getOut
+ def createInsertAndSetOutFor(exchange: Exchange): Insert = {
- supercolumn match {
- case Some(sc) => {
- log.debug("Inserting sueprcolumn keyspace:%s columnFamily:%s key:%s supercolumn:%s column:%s".format(keyspace, columnfamily, key, sc, column))
- session.insert(Keyspace(keyspace) \\ columnfamily \ key \ sc \ (column, value))
- out.setHeader(superColumnHeader, sc)
- }
- case None => {
- log.debug("Inserting standard column keyspace:%s columnFamily:%s key:%s column:%s".format(keyspace, columnfamily, key, column))
- session.insert(Keyspace(keyspace) \ columnfamily \ key \ (column, value))
- }
+ var keyspace: String = endpoint.keyspace.getOrElse(keyspaceExtractor.evaluate(exchange, classOf[String]))
+ var columnfamily: String = endpoint.columnFamily.getOrElse(columnFamilyExtractor.evaluate(exchange, classOf[String]))
+ var supercolumn: Option[String] = endpoint.superColumn match {
+ case Some(sc) => Some(sc)
+ case None => {
+ superColumnExtractor.evaluate(exchange, classOf[String]) match {
+ case null => None
+ case str => Some(str)
}
+ }
+ }
+ var column: String = endpoint.column.getOrElse(columnExtractor.evaluate(exchange, classOf[String]))
+ var key = endpoint.key.getOrElse(keyExtractor.evaluate(exchange, classOf[String]))
+ var valueExtract: Any = valueExtractor.evaluate(exchange, classOf[Any])
+ var value: Array[Byte] = new Array[Byte](0)
+
+ endpoint.dataFormat match {
+ case Some(format: DataFormat) => {
+ val buffer = new ByteArrayOutputStream
+ format.marshal(exchange, valueExtract, buffer)
+ value = buffer.toByteArray
+ }
+ case None => {
+ if (valueExtract.isInstanceOf[String]) value = bytes(valueExtract.asInstanceOf[String])
+ else if (valueExtract.isInstanceOf[Array[Byte]]) value = valueExtract.asInstanceOf[Array[Byte]]
+ else if (valueExtract.isInstanceOf[Long]) value = bytes(valueExtract.asInstanceOf[Long])
+ else if (valueExtract.isInstanceOf[Date]) value = bytes(valueExtract.asInstanceOf[Date])
+ else if (valueExtract.isInstanceOf[Float]) value = bytes(valueExtract.asInstanceOf[Float])
+ else if (valueExtract.isInstanceOf[Double]) value = bytes(valueExtract.asInstanceOf[Double])
+ else if (valueExtract.isInstanceOf[Int]) value = bytes(valueExtract.asInstanceOf[Int])
+ else if (valueExtract.isInstanceOf[UUID]) value = bytes(valueExtract.asInstanceOf[UUID])
+ //Else try to convert to byte[] with the valueExtractor, will throw NoSupportedConversion in worst case
+ else value = valueExtractor.evaluate(exchange, classOf[Array[Byte]])
+ }
+ }
+
- out.setHeader(keyspaceHeader, keyspace)
- out.setHeader(columnFamilyHeader, columnfamily)
- out.setHeader(columnHeader, column)
- out.setHeader(keyHeader, key)
- out.setBody(value)
+ val out: Message = exchange.getOut
+ var insert:Insert = null
+ supercolumn match {
+ case Some(sc) => {
+ log.debug("Inserting sueprcolumn keyspace:%s columnFamily:%s key:%s supercolumn:%s column:%s".format(keyspace, columnfamily, key, sc, column))
+ insert = Insert(Keyspace(keyspace) \\ columnfamily \ key \ sc \ (column, value))
+ out.setHeader(superColumnHeader, sc)
+ }
+ case None => {
+ log.debug("Inserting standard column keyspace:%s columnFamily:%s key:%s column:%s".format(keyspace, columnfamily, key, column))
+ insert = Insert(Keyspace(keyspace) \ columnfamily \ key \ (column, value))
+ }
}
+
+ out.setHeader(keyspaceHeader, keyspace)
+ out.setHeader(columnFamilyHeader, columnfamily)
+ out.setHeader(columnHeader, column)
+ out.setHeader(keyHeader, key)
+ out.setBody(value)
+ insert
}
+
private def overrideDefaultExtractors(): Unit = {
endpoint.keyspaceExtractor match {
case Some(ext) => keyspaceExtractor = ext
@@ -0,0 +1,11 @@
+package org.apache.camel.component.cassandra
+
+import org.apache.camel.Exchange
+import java.util.{Collections, List}
+
+/**
+ */
+
+class DefaultBatchCreator extends BatchCreator{
+ def createBatch(incoming: Exchange): List[Exchange] = Collections.singletonList(incoming)
+}

0 comments on commit 29e5bd6

Please sign in to comment.