Permalink
Browse files

Merge pull request #5 from akkumar/flumepkg

rename packages -> add a flume.sink
  • Loading branch information...
2 parents d666bd9 + 4f5708d commit 0da77303933e9b776f1cdc0322b25a97823cfb5a @thobbs committed Jan 9, 2012
View
@@ -24,7 +24,7 @@ flume-site.xml.template and removing the body of the file) to include:
<configuration>
<property>
<name>flume.plugin.classes</name>
- <value>org.apache.cassandra.plugins.SimpleCassandraSink,org.apache.cassandra.plugins.LogsandraSyslogSink</value>
+ <value>org.apache.cassandra.plugins.flume.sink.SimpleCassandraSink,org.apache.cassandra.plugins.flume.sink.LogsandraSyslogSink</value>
<description>Comma separated list of plugin classes</description>
</property>
</configuration>
@@ -33,9 +33,9 @@ flume-site.xml.template and removing the body of the file) to include:
5. Start the flume master and a node. The node should log something like:
~~~~~~
-2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder simpleCassandraSink in org.apache.cassandra.plugins.SimpleCassandraSink
+2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder simpleCassandraSink in org.apache.cassandra.plugins.flume.sink.SimpleCassandraSink
...
-2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder logsandraSyslogSink in org.apache.cassandra.plugins.LogsandraSyslogSink
+2011-08-07 21:29:54,793 [main] INFO conf.SinkFactoryImpl: Found sink builder logsandraSyslogSink in org.apache.cassandra.plugins.flume.sink.LogsandraSyslogSink
~~~~~~
Usage
@@ -78,10 +78,10 @@ Waiting for schema agreement...
4f1d8130-c16b-11e0-0000-242d50cf1ffd
Waiting for schema agreement...
... schemas agree across the cluster
-[default@Keyspace1]
+[default@Keyspace1]
~~~~~~
-When creating this sink with web UI (which you can access by default at
+When creating this sink with web UI (which you can access by default at
http://localhost:35871/flumeconfig.jsp), you will use a sink like:
`simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160")`
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.util.Calendar;
@@ -20,7 +20,7 @@
/**
* Uses Cassandra as a Logsandra compatible sink for syslog messages.
- *
+ *
* When the sink receives an event, it does the following:
*
* 1. Inserts the entry in to the "entries" column family with
@@ -32,25 +32,25 @@
* can find Logsandra at http://github.com/jbohman/logsandra
*/
public class LogsandraSyslogSink extends EventSink.Base {
-
+
private static final String KEYSPACE = "logsandra";
private static final String ENTRIES = "entries";
private static final String BY_DATE = "by_date";
-
+
// Column names
private static final byte[] IDENT_NAME = "ident".getBytes();
private static final byte[] SOURCE_NAME = "source".getBytes();
private static final byte[] DATE_NAME = "date".getBytes();
private static final byte[] ENTRY_NAME = "entry".getBytes();
-
+
private CassandraClient cClient;
private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
private static final long MILLI_TO_MICRO = 1000; // 1ms = 1000us
public LogsandraSyslogSink(String[] servers) {
-
+
this.cClient = new CassandraClient(KEYSPACE, servers);
}
@@ -61,21 +61,21 @@ public void open() throws IOException {
/**
* Writes the message to Cassandra in a Logsandra compatible way.
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
UUID uuid = uuidGen.generateTimeBasedUUID();
-
+
String date = getDate();
String host = event.getHost();
String facility = SyslogConsts.FACILITY[event.get(SyslogConsts.SYSLOG_FACILITY)[0]];
String severity = SyslogConsts.SEVERITY[event.get(SyslogConsts.SYSLOG_SEVERITY)[0]].toString();
-
- StringBuffer eventInfo = new StringBuffer();
+
+ StringBuilder eventInfo = new StringBuilder();
eventInfo.append(date);
eventInfo.append(' ');
eventInfo.append(host);
@@ -84,47 +84,47 @@ public void append(Event event) throws IOException, InterruptedException {
eventInfo.append(' ');
eventInfo.append(severity);
eventInfo.append(' ');
-
+
byte[] eventBody = event.getBody();
byte[] eventInfoBytes = eventInfo.toString().getBytes();
byte[] finalBytes = new byte[eventInfoBytes.length + eventBody.length];
for(int i = 0; i < eventInfoBytes.length; i++) {
finalBytes[i] = eventInfoBytes[i];
}
-
+
for(int i = 0; i < eventBody.length; i++) {
finalBytes[i + eventInfoBytes.length] = eventBody[i];
}
-
+
Column identColumn = new Column();
- identColumn.setName(IDENT_NAME);
- identColumn.setValue("ident".getBytes());
- identColumn.setTimestamp(timestamp);
-
+ identColumn.setName(IDENT_NAME);
+ identColumn.setValue("ident".getBytes());
+ identColumn.setTimestamp(timestamp);
+
Column sourceColumn = new Column();
- sourceColumn.setName(SOURCE_NAME);
- sourceColumn.setValue(host.getBytes());
- sourceColumn.setTimestamp(timestamp);
-
+ sourceColumn.setName(SOURCE_NAME);
+ sourceColumn.setValue(host.getBytes());
+ sourceColumn.setTimestamp(timestamp);
+
Column dateColumn = new Column();
- dateColumn.setName(DATE_NAME);
- dateColumn.setValue(date.getBytes());
- dateColumn.setTimestamp(timestamp);
-
+ dateColumn.setName(DATE_NAME);
+ dateColumn.setValue(date.getBytes());
+ dateColumn.setTimestamp(timestamp);
+
Column entryColumn = new Column();
- entryColumn.setName(ENTRY_NAME);
- entryColumn.setValue(finalBytes);
- entryColumn.setTimestamp(timestamp);
-
+ entryColumn.setName(ENTRY_NAME);
+ entryColumn.setValue(finalBytes);
+ entryColumn.setTimestamp(timestamp);
+
Column[] entryColumns = {identColumn, sourceColumn, dateColumn, entryColumn};
-
+
Long time = System.currentTimeMillis() * MILLI_TO_MICRO;
Column timeColumn = new Column();
- timeColumn.setName(toBytes(time));
- timeColumn.setValue(uuid.toString().getBytes());
- timeColumn.setTimestamp(timestamp);
-
+ timeColumn.setName(toBytes(time));
+ timeColumn.setValue(uuid.toString().getBytes());
+ timeColumn.setTimestamp(timestamp);
+
Column[] byDateColumns = {timeColumn};
// Insert the entry
@@ -139,7 +139,7 @@ public void append(Event event) throws IOException, InterruptedException {
this.cClient.insert(
SyslogConsts.SEVERITY[event.get(SyslogConsts.SYSLOG_SEVERITY)[0]].toString().getBytes(),
BY_DATE, byDateColumns, ConsistencyLevel.QUORUM);
-
+
super.append(event);
}
@@ -170,7 +170,7 @@ private String getDate() {
int minute = cal.get(Calendar.MINUTE);
int second = cal.get(Calendar.SECOND);
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
buff.append(year);
if(month < 10)
buff.append('0');
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.util.Calendar;
@@ -20,7 +20,7 @@
/**
* Allows Cassandra to be used as a sink, primarily for log messages.
- *
+ *
* When the Cassandra sink receives an event, it does the following:
*
* 1. Creates a column where the name is a type 1 UUID (timestamp based) and the
@@ -30,10 +30,10 @@
* SimpleCassandraSink primarily targets log storage right now.
*/
public class SimpleCassandraSink extends EventSink.Base {
-
+
private String dataColumnFamily;
private String indexColumnFamily;
-
+
private CassandraClient cClient;
private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
@@ -44,7 +44,7 @@ public SimpleCassandraSink(String keyspace, String dataColumnFamily,
String indexColumnFamily, String[] servers) {
this.dataColumnFamily = dataColumnFamily;
this.indexColumnFamily = indexColumnFamily;
-
+
this.cClient = new CassandraClient(keyspace, servers);
}
@@ -58,7 +58,7 @@ public void open() throws IOException {
* The key is the current date (YYYYMMDD) and the column
* name is a type 1 UUID, which includes a time stamp
* component.
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException, InterruptedException {
@@ -68,15 +68,15 @@ public void append(Event event) throws IOException, InterruptedException {
// Make the index column
UUID uuid = uuidGen.generateTimeBasedUUID();
Column indexColumn = new Column();
- indexColumn.setName(uuid.toByteArray());
- indexColumn.setValue(new byte[0]);
- indexColumn.setTimestamp(timestamp);
+ indexColumn.setName(uuid.toByteArray());
+ indexColumn.setValue(new byte[0]);
+ indexColumn.setTimestamp(timestamp);
// Make the data column
Column dataColumn = new Column();
- dataColumn.setName("data".getBytes());
- dataColumn.setValue(event.getBody());
- dataColumn.setTimestamp(timestamp);
+ dataColumn.setName("data".getBytes());
+ dataColumn.setValue(event.getBody());
+ dataColumn.setTimestamp(timestamp);
// Insert the index
this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
@@ -96,7 +96,7 @@ public void append(Event event) throws IOException, InterruptedException {
int year = cal.get(Calendar.YEAR);
int hour = cal.get(Calendar.HOUR_OF_DAY);
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
buff.append(year);
if(month < 10)
buff.append('0');

0 comments on commit 0da7730

Please sign in to comment.