Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
An Apache Camel component for integration with Apache Cassandra
branch: master

This branch is even with ticktock:master

Fetching latest commit…

Cannot retrieve the latest commit at this time

Failed to load latest commit information.
src
.gitignore
LICENSE
README.md
pom.xml

README.md

camel-cassandra An Apache Camel component for integrating with Apache Cassandra

Cassandra Component


The Cassandra component allows you to store and retrieve data to/from Apache Cassandra.

Maven users will need to add the following dependency to their pom.xml for this component:

    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-cassandra</artifactId>
        <version>x.x.x</version>
        <!-- use the same version as your Camel core version -->
    </dependency>

Sending to the endpoint


You can store data in cassandra by sending a message to a Cassandra producer endpoint. The destination keyspace, column family, key, supercolumn (if any), and column to store data in can be determined either statically or dynamically, based upon the endpoint URI used to configure the producer.

When keyspace, column family, key, supercolumn and column are not statically configured via the URI, user supplied (or default) Camel Expressions are used to extract the appropriate values from the Exchange.

The default extractors use a Camel Header expression to extract the headers defined below. The default value to store in Cassandra is the body of the message.

Receiving from the endpoint


The CassandraConsumer is a scheduled polling endpoint and will return data based upon the URI used to configure it. For instance, if keyspace, columnfamily, and key are specified, the consumer will send a message for every column for that key in the columnfamily, also deleting the column after sending.

If only a keyspace and columnfamily are specified, the Consumer will do a "key-first" polling of the columnfamily returning and deleting each column for a given key before moving on to the next key.

Most sensible permutations are handled by default. To customize the Polling behavior, you can either specify an instance of org.apache.camel.component.cassandra.CassandraPollingStrategy, which will be executed by an instance of org.apache.camel.component.cassandra.StrategyBasedCassandraPolling, or to completely customize polling, you can specify an instance of org.apache.camel.component.cassandra.CassandraPolling.

Headers


By default, the CassandraProducer extracts information unspecified in the url from the following headers, and also by default, the CassandraConsumer populates the following headers in messages it sends.

Header Type Description
camel-cassandra-keyspace String The keyspace in which to store the data
camel-cassandra-columnFamily String The column family in which to store the data
camel-cassandra-supercolumn String The supercolumn in which to store the data
camel-cassandra-column String The column in which to store the data
camel-cassandra-key String The key with which to store the data

The out message of the exchange will have all these headers set to the values used to store the data, as well as the value of returned by the valueExtractor Expression, set as the body.

The default extractors can be overridden by specifying an Expression to use in the Camel Registry (Spring,JNDI, etc..) You can simply register a bean of the correct name, or override the name of the bean to use via options in the URI.

URI format


    cassandra:[//cassandraHost:cassandraPort][/keyspace][/columnFamily][/~key][/!supercolumn][/column]

To successfully store data, all of the values except for supercolumn must be specified or successfully extracted.

If a supercolumn name specified or extracted, then a column in a supercolumn is inserted, otherwise a standard column is inserted.

Here are some example URIs, which would apply default extractors to get the necessary information.

    cassandra://host:port
    (Producer: extract keyspace, columnfamily, supercolumn, column, key, value via default expressions)
    (Consumer: Not Valid)

    cassandra:/keyspace
    (Producer: use host/port configured in the component and extract columnfamily, supercolumn, column,
            key, value via default expressions)
    (Consumer: Not Valid)

    cassandra://host:port/keyspace
    (Producer: extract columnfamily, supercolumn, column, key, value via default expressions)
    (Consumer: Not Valid)

    cassandra:/keyspace/columnFamily
    (Producer: use host/port configured in the component and extract supercolumn, column, key,
            value via default expressions)
    (Consumer: use host/port configured in the component and do a "key-first" consumption of all
            columns for each key in the C.F.)

    cassandra://host:port/keyspace/columnFamily
    (Producer: extract supercolumn, column, key, value via default expressions)
    (Consumer: do a "key-first" consumption of all columns for each key in the C.F.)


    cassandra:/keyspace/columnFamily/column
    (Producer: use host/port configured in the component and extract columnfamily, supercolumn, column,
            key, value via default expressions)
    (Consumer: for each key in the C.F, consume and delete the specified column)

    cassandra:/keyspace/columnFamily/~key
    (Producer: extract column, value via default expressions)
    (Consumer: consume and delete each column in this key in the C.F)


    cassandra:/keyspace/columnFamily/!supercolumn
    (Producer: and extract column, key, value via default expressions)
    (Consumer: "key-first" consumption of each column of each supercolumn of each key in the C.F.)

    cassandra://host:port/keyspace/columnFamily/~key/!supercolumn
    (Producer: extract column, key, value via default expressions)
    (Consumer: consume and delete each column in this key in the supercolumn in the C.F)

Options


Name Default Value Description
keyspaceExtractor keyspaceExtractor The the name of the bean used to lookup the keyspace extractor Expression from the registry. If none is found, the default Expression is used
columnFamilyExtractor columnFamilyExtractor The the name of the bean used to lookup the columnFamily extractor Expression from the registry. If none is found, the default Expression is used
superColumnExtractor superColumnExtractor The the name of the bean used to lookup the superColumn extractor Expression from the registry. If none is found, the default Expression is used
columnExtractor columnExtractor The the name of the bean used to lookup the column extractor Expression from the registry. If none is found, the default Expression is used
keyExtractor keyExtractor The the name of the bean used to lookup the key extractor Expression from the registry. If none is found, the default Expression is used
valueExtractor valueExtractor The the name of the bean used to lookup the value extractor Expression from the registry. If none is found, the default Expression is used
cassandraDataFormat cassandraDataFormat The the name of the bean used to lookup the optional Camel Data Format to use. If found the Data Format is used to marshal the value returned by the value extractor expression before storing, and is used to unmarshall the data in cassandra when reading
cassandraTimeout 3000 Timeout on calls to Cassandra
cassandraConsistency QUORUM The Consistency Level to use when storing/loading data to/from cassandra
cassandraPollingImpl instance of org.apache.camel.component.cassandra.DefaultCassandraPolling instance of CassandraPolling to use to poll cassandra
cassandraPollingStrategy None If you specify an instance of CassndraPollingStrategy, it will be executes with StrategyBasedCassandraPolling
cassandraPollingMaxMessages 1000 The maximum mesages to return per poll
cassandraPollingMaxKeyRange 1000 The maximum mesages to specify in the KeyRange used to call get_range_slices during polling. See Note below.
batchCreator instance of org.apache.camel.component.cassandra.DefaultBatchCreator If the incoming exchange can be broken down into multiple inserts, you can specify an implementation of BatchCreator to create a List of Exchanges that will be inserted in batch

A Note about the Default Polling implementation (before Cassandra 0.7)

Care must be taken when configuring consumer endpoints when no key is specified. When no key is specified, the default polling implementation calls get_range_slices, using a key range with a empty start and end key. If there are more deleted, uncompacted keys in the columnFamily than the maximum keys specified in the key range, no data will be returned, even though there will be data that matches the uri.

Once Cassandra 0.7 is out the plan would be to create a small keyspace via the client that can be used to track keys deleted by the consumer, so that smarter Key Ranges can be passed to get_range_slices.

Example Config


You can either define beans of the proper name to have your own extractors (camel Expression) used, or you can override the name that will be used to look up the extractor.

So for example, you could define your own extractors as follows

    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:camel="http://camel.apache.org/schema/spring"
           xsi:schemaLocation="
              http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
              http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">


         <camel:camelContext id="camel">

                 <camel:route>
                     <camel:from uri="direct:cassandraInbound"/>
                     <camel:to uri="cassandra://localhost:9160/camel-cassandra?columnFamilyExtractor=cfEx&amp;columnExtractor=colEx"/>
                 </camel:route>

         </camel:camelContext>

        <!-- the name of the columnFamilyExtractor and columnExtractor beans are specified above as options in the cassandra uri -->
        <bean id="cfEx" class="org.apache.camel.builder.ExpressionBuilder" factory-method="headerExpression">
            <constructor-arg index="0" type="java.lang.String" value="camel-cassandra-columnFamily"/>
        </bean>

        <bean id="colEx"  class="org.apache.camel.builder.ExpressionBuilder" factory-method="headerExpression">
            <constructor-arg index="0" type="java.lang.String" value="camel-cassandra-column"/>
        </bean>

        <!-- the keyExtractor and valueExtractor beans are found by their default name-->
        <bean id="keyExtractor" class="org.apache.camel.builder.ExpressionBuilder" factory-method="bodyOgnlExpression">
            <constructor-arg index="0" type="java.lang.String" value="getIdentifier()"/>
        </bean>

        <bean id="valueExtractor" class="org.apache.camel.builder.ExpressionBuilder" factory-method="bodyOgnlExpression">
            <constructor-arg index="0" type="java.lang.String" value="getImportantData().asByteArray()"/>
        </bean>
    </beans>
Something went wrong with that request. Please try again.