Permalink
Browse files

Merge pull request #1 from swillis11/master

Updated plugin to work with the latest Flume version.
  • Loading branch information...
2 parents dbcd8da + 3b03d98 commit 72beeff69d0763ef845a1802a82cec4532426a82 @thobbs committed Aug 8, 2011
Showing with 22,238 additions and 11,519 deletions.
  1. +5 −3 src/java/org/apache/cassandra/plugins/CassandraClient.java
  2. +27 −6 src/java/org/apache/cassandra/plugins/LogsandraSyslogSink.java
  3. +10 −3 src/java/org/apache/cassandra/plugins/SimpleCassandraSink.java
  4. +104 −93 src/java/org/apache/cassandra/thrift/AuthenticationException.java
  5. +99 −69 src/java/org/apache/cassandra/thrift/AuthenticationRequest.java
  6. +104 −93 src/java/org/apache/cassandra/thrift/AuthorizationException.java
  7. +15,713 −8,735 src/java/org/apache/cassandra/thrift/Cassandra.java
  8. +940 −361 src/java/org/apache/cassandra/thrift/CfDef.java
  9. +194 −158 src/java/org/apache/cassandra/thrift/Column.java
  10. +148 −125 src/java/org/apache/cassandra/thrift/ColumnDef.java
  11. +276 −71 src/java/org/apache/cassandra/thrift/ColumnOrSuperColumn.java
  12. +106 −85 src/java/org/apache/cassandra/thrift/ColumnParent.java
  13. +143 −112 src/java/org/apache/cassandra/thrift/ColumnPath.java
  14. +47 −0 src/java/org/apache/cassandra/thrift/Compression.java
  15. +58 −25 src/java/org/apache/cassandra/thrift/ConsistencyLevel.java
  16. +2 −1 src/java/org/apache/cassandra/thrift/Constants.java
  17. +426 −0 src/java/org/apache/cassandra/thrift/CounterColumn.java
  18. +463 −0 src/java/org/apache/cassandra/thrift/CounterSuperColumn.java
  19. +557 −0 src/java/org/apache/cassandra/thrift/CqlResult.java
  20. +47 −0 src/java/org/apache/cassandra/thrift/CqlResultType.java
  21. +466 −0 src/java/org/apache/cassandra/thrift/CqlRow.java
  22. +145 −121 src/java/org/apache/cassandra/thrift/Deletion.java
  23. +145 −116 src/java/org/apache/cassandra/thrift/IndexClause.java
  24. +147 −124 src/java/org/apache/cassandra/thrift/IndexExpression.java
  25. +20 −12 src/java/org/apache/cassandra/thrift/IndexOperator.java
  26. +8 −8 src/java/org/apache/cassandra/thrift/IndexType.java
  27. +104 −93 src/java/org/apache/cassandra/thrift/InvalidRequestException.java
  28. +112 −88 src/java/org/apache/cassandra/thrift/KeyCount.java
  29. +187 −143 src/java/org/apache/cassandra/thrift/KeyRange.java
  30. +120 −99 src/java/org/apache/cassandra/thrift/KeySlice.java
  31. +327 −146 src/java/org/apache/cassandra/thrift/KsDef.java
  32. +92 −76 src/java/org/apache/cassandra/thrift/Mutation.java
  33. +48 −42 src/java/org/apache/cassandra/thrift/NotFoundException.java
  34. +227 −0 src/java/org/apache/cassandra/thrift/SchemaDisagreementException.java
  35. +111 −95 src/java/org/apache/cassandra/thrift/SlicePredicate.java
  36. +174 −134 src/java/org/apache/cassandra/thrift/SliceRange.java
  37. +118 −97 src/java/org/apache/cassandra/thrift/SuperColumn.java
  38. +48 −42 src/java/org/apache/cassandra/thrift/TimedOutException.java
  39. +122 −101 src/java/org/apache/cassandra/thrift/TokenRange.java
  40. +48 −42 src/java/org/apache/cassandra/thrift/UnavailableException.java
@@ -1,6 +1,7 @@
package org.apache.cassandra.plugins;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -83,8 +84,9 @@ public void close() throws IOException {
}
/** Inserts columns into a column family in a given row. */
- public void insert(byte[] key, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
+ public void insert(byte[] keyarray, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
{
+ ByteBuffer key = ByteBuffer.wrap(keyarray);
List<Mutation> mutationList = new ArrayList<Mutation>();
for(int i = 0; i < columns.length; i++) {
Mutation mutation = new Mutation();
@@ -95,15 +97,15 @@ public void insert(byte[] key, String columnFamily, Column[] columns, Consistenc
}
Map<String, List<Mutation>> innerMutationMap = new HashMap<String, List<Mutation>>();
innerMutationMap.put(columnFamily, mutationList);
- Map<byte[], Map<String, List<Mutation>>> mutationMap = new HashMap<byte[], Map<String, List<Mutation>>>();
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
mutationMap.put(key, innerMutationMap);
batchMutate(mutationMap, consistencyLevel);
}
/** Attempts to perform a batch mutation and retries upon failure. */
private void batchMutate(
- Map<byte[], Map<String, List<Mutation>>> mutationMap,
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap,
ConsistencyLevel consistencyLevel)
throws IOException
{
@@ -61,9 +61,10 @@ public void open() throws IOException {
/**
* Writes the message to Cassandra in a Logsandra compatible way.
+ * @throws InterruptedException
*/
@Override
- public void append(Event event) throws IOException {
+ public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
@@ -96,14 +97,34 @@ public void append(Event event) throws IOException {
finalBytes[i + eventInfoBytes.length] = eventBody[i];
}
- Column identColumn = new Column(IDENT_NAME, "ident".getBytes(), timestamp);
- Column sourceColumn = new Column(SOURCE_NAME, host.getBytes(), timestamp);
- Column dateColumn = new Column(DATE_NAME, date.getBytes(), timestamp);
- Column entryColumn = new Column(ENTRY_NAME, finalBytes, timestamp);
+ Column identColumn = new Column();
+ identColumn.setName(IDENT_NAME);
+ identColumn.setValue("ident".getBytes());
+ identColumn.setTimestamp(timestamp);
+
+ Column sourceColumn = new Column();
+ sourceColumn.setName(SOURCE_NAME);
+ sourceColumn.setValue(host.getBytes());
+ sourceColumn.setTimestamp(timestamp);
+
+ Column dateColumn = new Column();
+ dateColumn.setName(DATE_NAME);
+ dateColumn.setValue(date.getBytes());
+ dateColumn.setTimestamp(timestamp);
+
+ Column entryColumn = new Column();
+ entryColumn.setName(ENTRY_NAME);
+ entryColumn.setValue(finalBytes);
+ entryColumn.setTimestamp(timestamp);
+
Column[] entryColumns = {identColumn, sourceColumn, dateColumn, entryColumn};
Long time = System.currentTimeMillis() * MILLI_TO_MICRO;
- Column timeColumn = new Column(toBytes(time), uuid.toString().getBytes(), timestamp);
+ Column timeColumn = new Column();
+ timeColumn.setName(toBytes(time));
+ timeColumn.setValue(uuid.toString().getBytes());
+ timeColumn.setTimestamp(timestamp);
+
Column[] byDateColumns = {timeColumn};
// Insert the entry
@@ -58,18 +58,25 @@ public void open() throws IOException {
* The key is the current date (YYYYMMDD) and the column
* name is a type 1 UUID, which includes a time stamp
* component.
+ * @throws InterruptedException
*/
@Override
- public void append(Event event) throws IOException {
+ public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
// Make the index column
UUID uuid = uuidGen.generateTimeBasedUUID();
- Column indexColumn = new Column(uuid.toByteArray(), new byte[0], timestamp);
+ Column indexColumn = new Column();
+ indexColumn.setName(uuid.toByteArray());
+ indexColumn.setValue(new byte[0]);
+ indexColumn.setTimestamp(timestamp);
// Make the data column
- Column dataColumn = new Column("data".getBytes(), event.getBody(), timestamp);
+ Column dataColumn = new Column();
+ dataColumn.setName("data".getBytes());
+ dataColumn.setValue(event.getBody());
+ dataColumn.setTimestamp(timestamp);
// Insert the index
this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
Oops, something went wrong.

0 comments on commit 72beeff

Please sign in to comment.