Permalink
Browse files

rename packages -> add a flume.sink

  • Loading branch information...
1 parent d666bd9 commit 8bece3e01f88f5ae8e04282ce46a1fcff0c155bb @akkumar akkumar committed Jan 9, 2012
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.util.Calendar;
@@ -20,7 +20,7 @@
/**
* Uses Cassandra as a Logsandra compatible sink for syslog messages.
- *
+ *
* When the sink receives an event, it does the following:
*
* 1. Inserts the entry in to the "entries" column family with
@@ -32,25 +32,25 @@
* can find Logsandra at http://github.com/jbohman/logsandra
*/
public class LogsandraSyslogSink extends EventSink.Base {
-
+
private static final String KEYSPACE = "logsandra";
private static final String ENTRIES = "entries";
private static final String BY_DATE = "by_date";
-
+
// Column names
private static final byte[] IDENT_NAME = "ident".getBytes();
private static final byte[] SOURCE_NAME = "source".getBytes();
private static final byte[] DATE_NAME = "date".getBytes();
private static final byte[] ENTRY_NAME = "entry".getBytes();
-
+
private CassandraClient cClient;
private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
private static final long MILLI_TO_MICRO = 1000; // 1ms = 1000us
public LogsandraSyslogSink(String[] servers) {
-
+
this.cClient = new CassandraClient(KEYSPACE, servers);
}
@@ -61,21 +61,21 @@ public void open() throws IOException {
/**
* Writes the message to Cassandra in a Logsandra compatible way.
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
UUID uuid = uuidGen.generateTimeBasedUUID();
-
+
String date = getDate();
String host = event.getHost();
String facility = SyslogConsts.FACILITY[event.get(SyslogConsts.SYSLOG_FACILITY)[0]];
String severity = SyslogConsts.SEVERITY[event.get(SyslogConsts.SYSLOG_SEVERITY)[0]].toString();
-
- StringBuffer eventInfo = new StringBuffer();
+
+ StringBuilder eventInfo = new StringBuilder();
eventInfo.append(date);
eventInfo.append(' ');
eventInfo.append(host);
@@ -84,47 +84,47 @@ public void append(Event event) throws IOException, InterruptedException {
eventInfo.append(' ');
eventInfo.append(severity);
eventInfo.append(' ');
-
+
byte[] eventBody = event.getBody();
byte[] eventInfoBytes = eventInfo.toString().getBytes();
byte[] finalBytes = new byte[eventInfoBytes.length + eventBody.length];
for(int i = 0; i < eventInfoBytes.length; i++) {
finalBytes[i] = eventInfoBytes[i];
}
-
+
for(int i = 0; i < eventBody.length; i++) {
finalBytes[i + eventInfoBytes.length] = eventBody[i];
}
-
+
Column identColumn = new Column();
- identColumn.setName(IDENT_NAME);
- identColumn.setValue("ident".getBytes());
- identColumn.setTimestamp(timestamp);
-
+ identColumn.setName(IDENT_NAME);
+ identColumn.setValue("ident".getBytes());
+ identColumn.setTimestamp(timestamp);
+
Column sourceColumn = new Column();
- sourceColumn.setName(SOURCE_NAME);
- sourceColumn.setValue(host.getBytes());
- sourceColumn.setTimestamp(timestamp);
-
+ sourceColumn.setName(SOURCE_NAME);
+ sourceColumn.setValue(host.getBytes());
+ sourceColumn.setTimestamp(timestamp);
+
Column dateColumn = new Column();
- dateColumn.setName(DATE_NAME);
- dateColumn.setValue(date.getBytes());
- dateColumn.setTimestamp(timestamp);
-
+ dateColumn.setName(DATE_NAME);
+ dateColumn.setValue(date.getBytes());
+ dateColumn.setTimestamp(timestamp);
+
Column entryColumn = new Column();
- entryColumn.setName(ENTRY_NAME);
- entryColumn.setValue(finalBytes);
- entryColumn.setTimestamp(timestamp);
-
+ entryColumn.setName(ENTRY_NAME);
+ entryColumn.setValue(finalBytes);
+ entryColumn.setTimestamp(timestamp);
+
Column[] entryColumns = {identColumn, sourceColumn, dateColumn, entryColumn};
-
+
Long time = System.currentTimeMillis() * MILLI_TO_MICRO;
Column timeColumn = new Column();
- timeColumn.setName(toBytes(time));
- timeColumn.setValue(uuid.toString().getBytes());
- timeColumn.setTimestamp(timestamp);
-
+ timeColumn.setName(toBytes(time));
+ timeColumn.setValue(uuid.toString().getBytes());
+ timeColumn.setTimestamp(timestamp);
+
Column[] byDateColumns = {timeColumn};
// Insert the entry
@@ -139,7 +139,7 @@ public void append(Event event) throws IOException, InterruptedException {
this.cClient.insert(
SyslogConsts.SEVERITY[event.get(SyslogConsts.SYSLOG_SEVERITY)[0]].toString().getBytes(),
BY_DATE, byDateColumns, ConsistencyLevel.QUORUM);
-
+
super.append(event);
}
@@ -170,7 +170,7 @@ private String getDate() {
int minute = cal.get(Calendar.MINUTE);
int second = cal.get(Calendar.SECOND);
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
buff.append(year);
if(month < 10)
buff.append('0');
@@ -1,4 +1,4 @@
-package org.apache.cassandra.plugins;
+package org.apache.cassandra.plugins.flume.sink;
import java.io.IOException;
import java.util.Calendar;
@@ -20,7 +20,7 @@
/**
* Allows Cassandra to be used as a sink, primarily for log messages.
- *
+ *
* When the Cassandra sink receives an event, it does the following:
*
* 1. Creates a column where the name is a type 1 UUID (timestamp based) and the
@@ -30,10 +30,10 @@
* SimpleCassandraSink primarily targets log storage right now.
*/
public class SimpleCassandraSink extends EventSink.Base {
-
+
private String dataColumnFamily;
private String indexColumnFamily;
-
+
private CassandraClient cClient;
private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
@@ -44,7 +44,7 @@ public SimpleCassandraSink(String keyspace, String dataColumnFamily,
String indexColumnFamily, String[] servers) {
this.dataColumnFamily = dataColumnFamily;
this.indexColumnFamily = indexColumnFamily;
-
+
this.cClient = new CassandraClient(keyspace, servers);
}
@@ -58,7 +58,7 @@ public void open() throws IOException {
* The key is the current date (YYYYMMDD) and the column
* name is a type 1 UUID, which includes a time stamp
* component.
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException, InterruptedException {
@@ -68,15 +68,15 @@ public void append(Event event) throws IOException, InterruptedException {
// Make the index column
UUID uuid = uuidGen.generateTimeBasedUUID();
Column indexColumn = new Column();
- indexColumn.setName(uuid.toByteArray());
- indexColumn.setValue(new byte[0]);
- indexColumn.setTimestamp(timestamp);
+ indexColumn.setName(uuid.toByteArray());
+ indexColumn.setValue(new byte[0]);
+ indexColumn.setTimestamp(timestamp);
// Make the data column
Column dataColumn = new Column();
- dataColumn.setName("data".getBytes());
- dataColumn.setValue(event.getBody());
- dataColumn.setTimestamp(timestamp);
+ dataColumn.setName("data".getBytes());
+ dataColumn.setValue(event.getBody());
+ dataColumn.setTimestamp(timestamp);
// Insert the index
this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
@@ -96,7 +96,7 @@ public void append(Event event) throws IOException, InterruptedException {
int year = cal.get(Calendar.YEAR);
int hour = cal.get(Calendar.HOUR_OF_DAY);
- StringBuffer buff = new StringBuffer();
+ StringBuilder buff = new StringBuilder();
buff.append(year);
if(month < 10)
buff.append('0');

0 comments on commit 8bece3e

Please sign in to comment.