Skip to content

Commit

Permalink
Updated for thrift 0.6.x: changed byte[]'s to ByteBuffers.
Browse files Browse the repository at this point in the history
  • Loading branch information
swillis committed Aug 3, 2011
1 parent ccfbd61 commit 95e8bc2
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 12 deletions.
8 changes: 5 additions & 3 deletions src/java/org/apache/cassandra/plugins/CassandraClient.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.apache.cassandra.plugins;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -83,8 +84,9 @@ public void close() throws IOException {
}

/** Inserts columns into a column family in a given row. */
public void insert(byte[] key, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
public void insert(byte[] keyarray, String columnFamily, Column[] columns, ConsistencyLevel consistencyLevel) throws IOException
{
ByteBuffer key = ByteBuffer.wrap(keyarray);
List<Mutation> mutationList = new ArrayList<Mutation>();
for(int i = 0; i < columns.length; i++) {
Mutation mutation = new Mutation();
Expand All @@ -95,15 +97,15 @@ public void insert(byte[] key, String columnFamily, Column[] columns, Consistenc
}
Map<String, List<Mutation>> innerMutationMap = new HashMap<String, List<Mutation>>();
innerMutationMap.put(columnFamily, mutationList);
Map<byte[], Map<String, List<Mutation>>> mutationMap = new HashMap<byte[], Map<String, List<Mutation>>>();
Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();
mutationMap.put(key, innerMutationMap);

batchMutate(mutationMap, consistencyLevel);
}

/** Attempts to perform a batch mutation and retries upon failure. */
private void batchMutate(
Map<byte[], Map<String, List<Mutation>>> mutationMap,
Map<ByteBuffer, Map<String, List<Mutation>>> mutationMap,
ConsistencyLevel consistencyLevel)
throws IOException
{
Expand Down
33 changes: 27 additions & 6 deletions src/java/org/apache/cassandra/plugins/LogsandraSyslogSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,10 @@ public void open() throws IOException {

/**
* Writes the message to Cassandra in a Logsandra compatible way.
* @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException {
public void append(Event event) throws IOException, InterruptedException {

long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;

Expand Down Expand Up @@ -96,14 +97,34 @@ public void append(Event event) throws IOException {
finalBytes[i + eventInfoBytes.length] = eventBody[i];
}

Column identColumn = new Column(IDENT_NAME, "ident".getBytes(), timestamp);
Column sourceColumn = new Column(SOURCE_NAME, host.getBytes(), timestamp);
Column dateColumn = new Column(DATE_NAME, date.getBytes(), timestamp);
Column entryColumn = new Column(ENTRY_NAME, finalBytes, timestamp);
Column identColumn = new Column();
identColumn.setName(IDENT_NAME);
identColumn.setValue("ident".getBytes());
identColumn.setTimestamp(timestamp);

Column sourceColumn = new Column();
sourceColumn.setName(SOURCE_NAME);
sourceColumn.setValue(host.getBytes());
sourceColumn.setTimestamp(timestamp);

Column dateColumn = new Column();
dateColumn.setName(DATE_NAME);
dateColumn.setValue(date.getBytes());
dateColumn.setTimestamp(timestamp);

Column entryColumn = new Column();
entryColumn.setName(ENTRY_NAME);
entryColumn.setValue(finalBytes);
entryColumn.setTimestamp(timestamp);

Column[] entryColumns = {identColumn, sourceColumn, dateColumn, entryColumn};

Long time = System.currentTimeMillis() * MILLI_TO_MICRO;
Column timeColumn = new Column(toBytes(time), uuid.toString().getBytes(), timestamp);
Column timeColumn = new Column();
timeColumn.setName(toBytes(time));
timeColumn.setValue(uuid.toString().getBytes());
timeColumn.setTimestamp(timestamp);

Column[] byDateColumns = {timeColumn};

// Insert the entry
Expand Down
13 changes: 10 additions & 3 deletions src/java/org/apache/cassandra/plugins/SimpleCassandraSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,25 @@ public void open() throws IOException {
* The key is the current date (YYYYMMDD) and the column
* name is a type 1 UUID, which includes a time stamp
* component.
* @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException {
public void append(Event event) throws IOException, InterruptedException {

long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;

// Make the index column
UUID uuid = uuidGen.generateTimeBasedUUID();
Column indexColumn = new Column(uuid.toByteArray(), new byte[0], timestamp);
Column indexColumn = new Column();
indexColumn.setName(uuid.toByteArray());
indexColumn.setValue(new byte[0]);
indexColumn.setTimestamp(timestamp);

// Make the data column
Column dataColumn = new Column("data".getBytes(), event.getBody(), timestamp);
Column dataColumn = new Column();
dataColumn.setName("data".getBytes());
dataColumn.setValue(event.getBody());
dataColumn.setTimestamp(timestamp);

// Insert the index
this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
Expand Down

0 comments on commit 95e8bc2

Please sign in to comment.