Permalink
Browse files

allowing column and supercolumn to be extracted as any type we can co…

…nvert to Array[Byte] instead of just string
  • Loading branch information...
1 parent 29e5bd6 commit bdb709231bc046c6331f53a5af010add9846b4a5 @ticktock committed Oct 19, 2010
@@ -1,17 +1,16 @@
package org.apache.camel.component.cassandra
import org.apache.camel.impl.DefaultProducer
-import org.apache.camel.model.language.{ExpressionDefinition, HeaderExpression}
import CassandraProducer._
import org.apache.camel.builder.ExpressionBuilder
import grizzled.slf4j.Logger
import com.shorrockin.cascal.session._
import com.shorrockin.cascal.utils.Conversions._
-import org.apache.camel.{Message, Expression, Exchange}
+import org.apache.camel.{Message, Exchange}
import org.apache.camel.component.cassandra.CassandraComponent._
import org.apache.camel.spi.DataFormat
import java.util.{UUID, Date}
-import java.io.{OutputStream, ByteArrayOutputStream}
+import java.io.ByteArrayOutputStream
import reflect.BeanProperty
import collection.JavaConversions._
import com.shorrockin.cascal.model.Keyspace
@@ -47,29 +46,35 @@ class CassandraProducer(val endpoint: CassandraEndpoint) extends DefaultProducer
session.batch(inserts)
}
- if(exchanges.size == 1 && exchanges.get(0).equals(exchange)){
+ if (exchanges.size == 1 && exchanges.get(0).equals(exchange)) {
//leave it alone
} else {
exchange.getOut.setBody(exchanges)
exchange.getOut.setHeader(batchSizeHeader, exchanges.size)
}
-
+
}
def createInsertAndSetOutFor(exchange: Exchange): Insert = {
var keyspace: String = endpoint.keyspace.getOrElse(keyspaceExtractor.evaluate(exchange, classOf[String]))
var columnfamily: String = endpoint.columnFamily.getOrElse(columnFamilyExtractor.evaluate(exchange, classOf[String]))
- var supercolumn: Option[String] = endpoint.superColumn match {
+ var supercolumn: Option[Array[Byte]] = endpoint.superColumn match {
case Some(sc) => Some(sc)
case None => {
- superColumnExtractor.evaluate(exchange, classOf[String]) match {
+ superColumnExtractor.evaluate(exchange, classOf[Any]) match {
case null => None
- case str => Some(str)
+ case str => coerceToByteArray(str)
}
}
}
- var column: String = endpoint.column.getOrElse(columnExtractor.evaluate(exchange, classOf[String]))
+
+
+ var column: Array[Byte] = null
+ column = endpoint.column match {
+ case Some(col) => col
+ case None => coerceToByteArray(columnExtractor.evaluate(exchange, classOf[Any])).get
+ }
var key = endpoint.key.getOrElse(keyExtractor.evaluate(exchange, classOf[String]))
var valueExtract: Any = valueExtractor.evaluate(exchange, classOf[Any])
var value: Array[Byte] = new Array[Byte](0)
@@ -81,22 +86,17 @@ class CassandraProducer(val endpoint: CassandraEndpoint) extends DefaultProducer
value = buffer.toByteArray
}
case None => {
- if (valueExtract.isInstanceOf[String]) value = bytes(valueExtract.asInstanceOf[String])
- else if (valueExtract.isInstanceOf[Array[Byte]]) value = valueExtract.asInstanceOf[Array[Byte]]
- else if (valueExtract.isInstanceOf[Long]) value = bytes(valueExtract.asInstanceOf[Long])
- else if (valueExtract.isInstanceOf[Date]) value = bytes(valueExtract.asInstanceOf[Date])
- else if (valueExtract.isInstanceOf[Float]) value = bytes(valueExtract.asInstanceOf[Float])
- else if (valueExtract.isInstanceOf[Double]) value = bytes(valueExtract.asInstanceOf[Double])
- else if (valueExtract.isInstanceOf[Int]) value = bytes(valueExtract.asInstanceOf[Int])
- else if (valueExtract.isInstanceOf[UUID]) value = bytes(valueExtract.asInstanceOf[UUID])
- //Else try to convert to byte[] with the valueExtractor, will throw NoSupportedConversion in worst case
- else value = valueExtractor.evaluate(exchange, classOf[Array[Byte]])
+ value = coerceToByteArray(valueExtract).getOrElse {
+ //Else try to convert to byte[] with the valueExtractor, will throw NoSupportedConversion in worst case
+ valueExtractor.evaluate(exchange, classOf[Array[Byte]])
+ }
+
}
}
val out: Message = exchange.getOut
- var insert:Insert = null
+ var insert: Insert = null
supercolumn match {
case Some(sc) => {
log.debug("Inserting sueprcolumn keyspace:%s columnFamily:%s key:%s supercolumn:%s column:%s".format(keyspace, columnfamily, key, sc, column))
@@ -118,6 +118,21 @@ class CassandraProducer(val endpoint: CassandraEndpoint) extends DefaultProducer
}
+ private def coerceToByteArray(valueExtract: Any): Option[Array[Byte]] = {
+ var value: Array[Byte] = null
+ if(valueExtract.isInstanceOf[Array[Byte]]) value = valueExtract.asInstanceOf[Array[Byte]]
+ else if (valueExtract.isInstanceOf[String]) value = bytes(valueExtract.asInstanceOf[String])
+ else if (valueExtract.isInstanceOf[Array[Byte]]) value = valueExtract.asInstanceOf[Array[Byte]]
+ else if (valueExtract.isInstanceOf[Long]) value = bytes(valueExtract.asInstanceOf[Long])
+ else if (valueExtract.isInstanceOf[Date]) value = bytes(valueExtract.asInstanceOf[Date])
+ else if (valueExtract.isInstanceOf[Float]) value = bytes(valueExtract.asInstanceOf[Float])
+ else if (valueExtract.isInstanceOf[Double]) value = bytes(valueExtract.asInstanceOf[Double])
+ else if (valueExtract.isInstanceOf[Int]) value = bytes(valueExtract.asInstanceOf[Int])
+ else if (valueExtract.isInstanceOf[UUID]) value = bytes(valueExtract.asInstanceOf[UUID])
+
+ Option(value)
+ }
+
private def overrideDefaultExtractors(): Unit = {
endpoint.keyspaceExtractor match {
case Some(ext) => keyspaceExtractor = ext
@@ -121,7 +121,7 @@ class ProducerSuite extends FunSuite with CassandraSuite with ShouldMatchers {
out.getHeader(columnFamilyHeader, classOf[String]) should be("superStringCols")
out.getHeader(columnHeader, classOf[String]) should be("testcolumn")
out.getHeader(keyHeader, classOf[String]) should be("theSupercolKey")
- out.getHeader(superColumnHeader) should be("superduper")
+ out.getHeader(superColumnHeader, classOf[String]) should be("superduper")
out.getBody(classOf[String]) should be("TEST123")
withSession {
@@ -156,7 +156,7 @@ class ProducerSuite extends FunSuite with CassandraSuite with ShouldMatchers {
out.getHeader(columnFamilyHeader, classOf[String]) should be("superStringCols")
out.getHeader(columnHeader, classOf[String]) should be("testcolumn")
out.getHeader(keyHeader, classOf[String]) should be("testUrlKey")
- out.getHeader(superColumnHeader) should be("superduper")
+ out.getHeader(superColumnHeader, classOf[String]) should be("superduper")
out.getBody(classOf[String]) should be("TEST123")
withSession {
@@ -248,7 +248,7 @@ class ProducerSuite extends FunSuite with CassandraSuite with ShouldMatchers {
out.getHeader(columnFamilyHeader, classOf[String]) should be("superStringCols")
out.getHeader(columnHeader, classOf[String]) should be("testcolumn")
out.getHeader(keyHeader, classOf[String]) should be("theSuperExtractorKey")
- out.getHeader(superColumnHeader) should be("superduper")
+ out.getHeader(superColumnHeader,classOf[String]) should be("superduper")
out.getBody(classOf[String]) should be("TEST123")
withSession {

0 comments on commit bdb7092

Please sign in to comment.