Browse files

Updated for thrift 0.6.x: changed byte[]'s to ByteBuffers.

  • Loading branch information...
1 parent ccfbd61 commit 95e8bc2fae9f7e16657fbc9d2b75b1dddf16b7d3 swillis committed Aug 3, 2011
View
8 src/java/org/apache/cassandra/plugins/CassandraClient.java
@@ -1,6 +1,7 @@
package org.apache.cassandra.plugins;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
@@ -83,8 +84,9 @@ public void close() throws IOException {
}
/** Inserts columns into a column family in a given row. */
- public void insert(byte[] key, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
+ public void insert(byte[] keyarray, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
{
+ ByteBuffer key = ByteBuffer.wrap(keyarray);
List<Mutation> mutationList = new ArrayList<Mutation>();
for(int i = 0; i < columns.length; i++) {
Mutation mutation = new Mutation();
@@ -95,15 +97,15 @@ public void insert(byte[] key, String columnFamily, Column[] columns, Consistenc
}
Map<String, List<Mutation>> innerMutationMap = new HashMap<String, List<Mutation>>();
innerMutationMap.put(columnFamily, mutationList);
- Map<byte[], Map<String, List<Mutation>>> mutationMap = new HashMap<byte[], Map<String, List<Mutation>>>();
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
mutationMap.put(key, innerMutationMap);
batchMutate(mutationMap, consistencyLevel);
}
/** Attempts to perform a batch mutation and retries upon failure. */
private void batchMutate(
- Map<byte[], Map<String, List<Mutation>>> mutationMap,
+ Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap,
ConsistencyLevel consistencyLevel)
throws IOException
{
View
33 src/java/org/apache/cassandra/plugins/LogsandraSyslogSink.java
@@ -61,9 +61,10 @@ public void open() throws IOException {
/**
* Writes the message to Cassandra in a Logsandra compatible way.
+ * @throws InterruptedException
*/
@Override
- public void append(Event event) throws IOException {
+ public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
@@ -96,14 +97,34 @@ public void append(Event event) throws IOException {
finalBytes[i + eventInfoBytes.length] = eventBody[i];
}
- Column identColumn = new Column(IDENT_NAME, "ident".getBytes(), timestamp);
- Column sourceColumn = new Column(SOURCE_NAME, host.getBytes(), timestamp);
- Column dateColumn = new Column(DATE_NAME, date.getBytes(), timestamp);
- Column entryColumn = new Column(ENTRY_NAME, finalBytes, timestamp);
+ Column identColumn = new Column();
+ identColumn.setName(IDENT_NAME);
+ identColumn.setValue("ident".getBytes());
+ identColumn.setTimestamp(timestamp);
+
+ Column sourceColumn = new Column();
+ sourceColumn.setName(SOURCE_NAME);
+ sourceColumn.setValue(host.getBytes());
+ sourceColumn.setTimestamp(timestamp);
+
+ Column dateColumn = new Column();
+ dateColumn.setName(DATE_NAME);
+ dateColumn.setValue(date.getBytes());
+ dateColumn.setTimestamp(timestamp);
+
+ Column entryColumn = new Column();
+ entryColumn.setName(ENTRY_NAME);
+ entryColumn.setValue(finalBytes);
+ entryColumn.setTimestamp(timestamp);
+
Column[] entryColumns = {identColumn, sourceColumn, dateColumn, entryColumn};
Long time = System.currentTimeMillis() * MILLI_TO_MICRO;
- Column timeColumn = new Column(toBytes(time), uuid.toString().getBytes(), timestamp);
+ Column timeColumn = new Column();
+ timeColumn.setName(toBytes(time));
+ timeColumn.setValue(uuid.toString().getBytes());
+ timeColumn.setTimestamp(timestamp);
+
Column[] byDateColumns = {timeColumn};
// Insert the entry
View
13 src/java/org/apache/cassandra/plugins/SimpleCassandraSink.java
@@ -58,18 +58,25 @@ public void open() throws IOException {
* The key is the current date (YYYYMMDD) and the column
* name is a type 1 UUID, which includes a time stamp
* component.
+ * @throws InterruptedException
*/
@Override
- public void append(Event event) throws IOException {
+ public void append(Event event) throws IOException, InterruptedException {
long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
// Make the index column
UUID uuid = uuidGen.generateTimeBasedUUID();
- Column indexColumn = new Column(uuid.toByteArray(), new byte[0], timestamp);
+ Column indexColumn = new Column();
+ indexColumn.setName(uuid.toByteArray());
+ indexColumn.setValue(new byte[0]);
+ indexColumn.setTimestamp(timestamp);
// Make the data column
- Column dataColumn = new Column("data".getBytes(), event.getBody(), timestamp);
+ Column dataColumn = new Column();
+ dataColumn.setName("data".getBytes());
+ dataColumn.setValue(event.getBody());
+ dataColumn.setTimestamp(timestamp);
// Insert the index
this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);

0 comments on commit 95e8bc2

Please sign in to comment.