Download flume-cassandra-plugin-X.Y.tar.gz from the Downloads section.
Extract the tarball with
Set $FLUME_CLASSPATH for all terminals which will run Flume master or node:
- Modify flume-site.xml (you may start out by copying flume-site.xml.template and removing the body of the file) to include:
<configuration> <property> <name>flume.plugin.classes</name> <value>org.apache.cassandra.plugins.SimpleCassandraSink,org.apache.cassandra.plugins.LogsandraSyslogSink</value> <description>Comma separated list of plugin classes</description> </property> </configuration>
- Start the flume master and a node. The node should log something like:
2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder simpleCassandraSink in org.apache.cassandra.plugins.SimpleCassandraSink ... 2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder logsandraSyslogSink in org.apache.cassandra.plugins.LogsandraSyslogSink
This plugin primarily targets log storage right now.
There are two sinks available for use: the SimpleCassandraSink and the LogsandraSyslogSink.
Simple Cassandra Sink
The Simple Cassandra Sink requires four arguments for its constructor:
- A keyspace (String). For example, 'Keyspace1'.
- A column family name (String) for storing data in.
- A column family name (String) for storing indexes in.
- A list Cassandra server hostname:port combinations (Strings)
Cassandra must already be configured so that the keyspace and both of the column families must already exist. The index column family should use a TimeUUIDType comparator. The data storage column family can use BytesType.
For example, in cassandra-cli you might create them like:
[default@unknown] connect localhost/9160; Connected to: "Test Cluster" on localhost/9160 [default@unknown] create keyspace Keyspace1; 23d30bd0-c16b-11e0-0000-242d50cf1ffd Waiting for schema agreement... ... schemas agree across the cluster [default@unknown] use Keyspace1; Authenticated to keyspace: Keyspace1 [default@Keyspace1] create column family FlumeData; 2fcefa70-c16b-11e0-0000-242d50cf1ffd Waiting for schema agreement... ... schemas agree across the cluster [default@Keyspace1] create column family FlumeIndexes with comparator = 'TimeUUIDType'; 4f1d8130-c16b-11e0-0000-242d50cf1ffd Waiting for schema agreement... ... schemas agree across the cluster [default@Keyspace1]
When creating this sink with web UI (which you can access by default at http://localhost:35871/flumeconfig.jsp), you will use a sink like:
simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160")
If you're new to flume and you want to test that the plugin works, I recommend
using a Source like
asciisynth(20, 100). You should see 20 corresponding entries
in each of the column families if you use
list in cassandra-cli.
How it Works
When the Cassandra sink receives an event, it does the following:
- In the index column family: a. Creates a column where the name is a type 1 UUID (timestamp based) and the value is empty. b. Inserts it into row "YYYYMMDDHH" (the current date and hour) in the given column family.
- In the data column family: a. Creates a column where the name is 'data' and the value is the flume event body. b. Inserts it into a row with a key that is the same uuid from step 1.
This allows you to easily fetch all logs for a slice of time. Simply use something like get_slice() on the index column family to get the uuids you want for a particular slice of time, and then multiget the data column family using those uuids as the keys.