Permalink
Browse files

Merge remote-tracking branch 'roycamp/7502fa82fa1f382b2808dc980075832…

…eb3a0a1f4'

Conflicts:
	src/java/org/apache/cassandra/plugins/flume/sink/SimpleCassandraSink.java
  • Loading branch information...
2 parents 0da7730 + d4cd56d commit e2f6d14950f214d9356455861d09f81b642d0cce @thobbs committed Jan 13, 2012
View
@@ -86,10 +86,15 @@ http://localhost:35871/flumeconfig.jsp), you will use a sink like:
`simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160")`
+For multiple servers use:
+
+`simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160", "hostname2:9160", "hostname3:9160")`
+
If you're new to flume and you want to test that the plugin works, I recommend
using a Source like `asciisynth(20, 100)`. You should see 20 corresponding entries
in each of the column families if you use `list` in cassandra-cli.
+
#### How it Works
When the Cassandra sink receives an event, it does the following:
@@ -105,3 +110,17 @@ This allows you to easily fetch all logs for a slice of time. Simply use
something like get_slice() on the index column family to get the uuids you
want for a particular slice of time, and then multiget the data column
family using those uuids as the keys.
+
+
+Troubleshooting
+---------------
+
+#### "No Cassandra servers available" Exception when trying to write data to a single server setup.
+
+Your keyspace may of defaulted to the `org.apache.cassandra.locator.NetworkTopologyStrategy`
+replication strategy resulting in Cassandra being unable to successfully commit. You can check
+using `describe Keyspace1;` in the cassandra-cli. Try using the following create keyspace
+statement instead:
+
+`create keyspace Keyspace1 with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};`
+
View
22 pom.xml
@@ -53,19 +53,29 @@
<version>0.9.3-CDH3B4</version>
</dependency>
+ <dependency>
+ <groupId>com.eaio.uuid</groupId>
+ <artifactId>uuid</artifactId>
+ <version>3.2</version>
+ </dependency>
- <dependency>
+ <dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.3.1</version>
<scope>test</scope>
- </dependency>
+ </dependency>
</dependencies>
<repositories>
- <repository>
- <id>cdh repository</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
- </repository>
+ <repository>
+ <id>cdh repository</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+
+ <repository>
+ <id>eaio.com</id>
+ <url>http://eaio.com/maven2</url>
+ </repository>
</repositories>
</project>
@@ -130,7 +130,6 @@ private void batchMutate(
private class ServerSet {
private ArrayList<String> servers;
- private int serverIndex;
private Queue<Pair> dead = new LinkedList<Pair>();
private long retryTime;
@@ -150,7 +149,6 @@ private void batchMutate(
for(int i=0; i < servers.length; i++) {
this.servers.add(servers[i]);
}
- this.serverIndex = 0;
}
/** Gets the next available server. */
@@ -165,7 +163,7 @@ String get() throws NoServersAvailableException {
if(this.servers.isEmpty()) {
throw new NoServersAvailableException();
}
- return this.servers.get(this.serverIndex++);
+ return this.servers.get(0);
}
/**
@@ -61,7 +61,7 @@ public void open() throws IOException {
/**
* Writes the message to Cassandra in a Logsandra compatible way.
- * @throws InterruptedException
+ * @throws InterruptedException
*/
@Override
public void append(Event event) throws IOException, InterruptedException {
@@ -6,12 +6,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
+import java.util.UUID;
import org.apache.cassandra.thrift.*;
-import org.safehaus.uuid.UUID;
-import org.safehaus.uuid.UUIDGenerator;
-
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
@@ -36,8 +34,6 @@
private CassandraClient cClient;
- private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
-
private static final long MILLI_TO_MICRO = 1000; // 1ms = 1000us
public SimpleCassandraSink(String keyspace, String dataColumnFamily,
@@ -63,23 +59,26 @@ public void open() throws IOException {
@Override
public void append(Event event) throws IOException, InterruptedException {
- long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
+ // Preserve timestamp from when the event was generated
+ long timestamp = event.getTimestamp();
+ if(timestamp == 0)
+ timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
// Make the index column
- UUID uuid = uuidGen.generateTimeBasedUUID();
+ UUID uuid = TimeUUIDUtils.getTimeUUID(timestamp);
Column indexColumn = new Column();
- indexColumn.setName(uuid.toByteArray());
- indexColumn.setValue(new byte[0]);
- indexColumn.setTimestamp(timestamp);
+ indexColumn.setName(TimeUUIDUtils.asByteArray(uuid));
+ indexColumn.setValue(new byte[0]);
+ indexColumn.setTimestamp(timestamp);
// Make the data column
Column dataColumn = new Column();
- dataColumn.setName("data".getBytes());
- dataColumn.setValue(event.getBody());
- dataColumn.setTimestamp(timestamp);
+ dataColumn.setName("data".getBytes());
+ dataColumn.setValue(event.getBody());
+ dataColumn.setTimestamp(timestamp);
// Insert the index
- this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
+ this.cClient.insert(this.getKey(timestamp), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
// Insert the data (row key is the uuid and there is only one column)
this.cClient.insert(uuid.toString().getBytes(), this.dataColumnFamily, new Column[] {dataColumn}, ConsistencyLevel.QUORUM);
super.append(event);
@@ -89,8 +88,10 @@ public void append(Event event) throws IOException, InterruptedException {
* Returns a String representing the current date to be used as
* a key. This has the format "YYYYMMDDHH".
*/
- private byte[] getKey() {
+ private byte[] getKey(long timestamp) {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT+0"));
+ cal.setTimeInMillis(timestamp);
+
int day = cal.get(Calendar.DAY_OF_MONTH);
int month = cal.get(Calendar.MONTH);
int year = cal.get(Calendar.YEAR);
@@ -0,0 +1,153 @@
+package org.apache.cassandra.plugins.flume.sink;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.eaio.uuid.UUIDGen;
+
+
+/**
+* This was copied from them Hector library and has minimal changes.
+* me.prettyprint.cassandra.utils.TimeUUIDUtils.java
+*
+* Utilitary class to generate TimeUUID (type 1)
+*
+* @author Patricio Echague (pechague@gmail.com)
+*
+*/
+public final class TimeUUIDUtils {
+
+ static final long NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+ /**
+* Gets a new and unique time uuid in milliseconds. It is useful to use in a TimeUUIDType sorted column family.
+*
+* @return the time uuid
+*/
+ public static java.util.UUID getUniqueTimeUUIDinMillis() {
+ return new java.util.UUID(UUIDGen.newTime(), UUIDGen.getClockSeqAndNode());
+ }
+
+ /**
+* Gets a new time uuid using {@link ClockResolution#createClock()} as a time generator.
+* It is useful to use in a TimeUUIDType sorted column family.
+*
+* @param clock a ClockResolution
+* @return the time uuid
+*/
+ /*
+ public static java.util.UUID getTimeUUID(ClockResolution clock) {
+ return getTimeUUID(clock.createClock());
+ }
+*/
+ /**
+* Gets a new time uuid based on <code>time<code>.
+* NOTE: this algorithm does not resolve duplicates. To avoid duplicates use
+* {@link getTimeUUID(ClockResolution clock)} with an implementaion that provides unique timestamp resolution, like
+* {@link MicrosecondsSyncClockResolution}
+* It is useful to use in a TimeUUIDType sorted column family.
+*
+* @param clock a ClockResolution
+* @return the time uuid
+*/
+ public static java.util.UUID getTimeUUID(long time) {
+ return new java.util.UUID(createTime(time), UUIDGen.getClockSeqAndNode());
+ }
+
+ private static long createTime(long currentTime) {
+ long time;
+
+ // UTC time
+ long timeToUse = (currentTime * 10000) + NUM_100NS_INTERVALS_SINCE_UUID_EPOCH;
+
+ // time low
+ time = timeToUse << 32;
+
+ // time mid
+ time |= (timeToUse & 0xFFFF00000000L) >> 16;
+
+ // time hi and version
+ time |= 0x1000 | ((timeToUse >> 48) & 0x0FFF); // version 1
+ return time;
+ }
+
+
+ /**
+* Returns an instance of uuid. Useful for when you read out of cassandra
+* you are getting a byte[] that needs to be converted into a TimeUUID.
+*
+* @param uuid the uuid
+* @return the java.util.uuid
+*/
+ public static java.util.UUID toUUID(byte[] uuid) {
+ return uuid(uuid, 0);
+ }
+
+ /**
+* Retrieves the time as long based on the byte[] representation of a UUID.
+*
+* @param uuid byte[] uuid representation
+* @return a long representing the time
+*/
+ public static long getTimeFromUUID(byte[] uuid) {
+ return getTimeFromUUID(TimeUUIDUtils.toUUID(uuid));
+ }
+
+ public static long getTimeFromUUID(UUID uuid) {
+ return (uuid.timestamp() - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000;
+ }
+
+ /**
+* As byte array.
+* This method is often used in conjunction with @link {@link #getTimeUUID()}
+*
+* @param uuid the uuid
+*
+* @return the byte[]
+*/
+ public static byte[] asByteArray(java.util.UUID uuid) {
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+ byte[] buffer = new byte[16];
+
+ for (int i = 0; i < 8; i++) {
+ buffer[i] = (byte) (msb >>> 8 * (7 - i));
+ }
+
+ for (int i = 8; i < 16; i++) {
+ buffer[i] = (byte) (lsb >>> 8 * (7 - i));
+ }
+
+ return buffer;
+ }
+
+ /**
+* Coverts a java.util.UUID into a ByteBuffer.
+* @param uuid a java.util.UUID
+* @return a ByteBuffer representaion of the param UUID
+*/
+ public static ByteBuffer asByteBuffer(java.util.UUID uuid) {
+ if (uuid == null) {
+ return null;
+ }
+
+ return ByteBuffer.wrap(asByteArray(uuid));
+ }
+
+
+ public static UUID uuid(byte[] uuid, int offset) {
+ ByteBuffer bb = ByteBuffer.wrap(uuid, offset, 16);
+ return new UUID(bb.getLong(), bb.getLong());
+ }
+
+ /**
+* Converts a ByteBuffer containing a UUID into a java.util.UUID
+* @param bb a ByteBuffer containing a UUID
+* @return a java.util.UUID
+*/
+ public static UUID uuid(ByteBuffer bb) {
+ bb = bb.slice();
+ return new UUID(bb.getLong(), bb.getLong());
+ }
+
+}

0 comments on commit e2f6d14

Please sign in to comment.