Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Bug fix, timestamp fix, documentation update #4

Merged
merged 11 commits into from

2 participants

@roycamp

Fixed serverIndex Null Pointer bug
Preserve flume timestamp if it exists
Updated documentation

roycamp added some commits
@roycamp roycamp Removing serverIndex as this.servers.get(this.serverIndex++) causes A…
…rray Index Out of Bounds when more than one server is supplied. This increment functionality isn't needed since server failures result in the server being removed from the server list and re-added after the retry period.
6730479
@roycamp roycamp Added example usage for multiple servers.
Added Troubleshooting section.
2f18a36
@roycamp roycamp Formatting fixes 8655add
@roycamp roycamp Preserve timestamp from when the event was generated if it exists. 607f809
@roycamp roycamp Fix key generation to use timestamp of event; still handles empty/mis…
…sing time.
7502fa8
@thobbs thobbs commented on the diff
...ava/org/apache/cassandra/plugins/CassandraClient.java
@@ -165,7 +163,7 @@ String get() throws NoServersAvailableException {
if(this.servers.isEmpty()) {
throw new NoServersAvailableException();
}
- return this.servers.get(this.serverIndex++);
+ return this.servers.get(0);
@thobbs Owner
thobbs added a note

So, just incrementing serverIndex without making sure it hasn't gone off the end of the list was definitely broken, but we do want to cycle through the servers in our list. If we always use 0, that won't happen.

I can make the change, but we probably just need to modulus by serverList.size().

@roycamp
roycamp added a note

My understanding is that ServerSet.get() is only called from CassandraClient.open(). CassandraClient.open() is only called during start up and when a Cassandra exception is thrown which results in ServerSet.markDead(). markDead() removes the server from the servers array thus causing the remaining servers to shift up in the array. When the markDead() timeout expires, the server is added onto the end of the servers array. This is how the cycling of servers is done. Is there a scenario where open() is called which I am missing?

@thobbs Owner
thobbs added a note

Ah, you're totally correct. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@thobbs thobbs commented on the diff
...org/apache/cassandra/plugins/SimpleCassandraSink.java
@@ -63,7 +63,10 @@ public void open() throws IOException {
@Override
public void append(Event event) throws IOException, InterruptedException {
- long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
+ // Preserve timestamp from when the event was generated
+ long timestamp = event.getTimestamp();
@thobbs Owner
thobbs added a note

The main problem with doing this is that the TimeUUID we generate just below this won't use the same timestamp, so columns in the index rows won't necessarily match up with the time range in the row key.

It's possible to create a TimeUUID given a timestamp, but we would need to swap out UUID libraries and use something like this: https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java

@roycamp
roycamp added a note

Ah, good catch. I only accounted for the row key not the column TimeUUID. I will commit a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@roycamp

I merged the most recent updates (maven support) -- hopefully did this right.
Added TimeUUID generation support for column names based on the event timestamp.

@roycamp

Do I need to include the UUID jar (from http://johannburkard.de/software/uuid/) in the lib folder or will the maven dependency handle it?

@thobbs
Owner

If the build were set up to make a single uber-jar, then we could forgo adding it to lib (and remove the requirement for the user to add multiple jars to the classpath). I'll merge this in and then see if I can make that happen.

Thanks!

@thobbs thobbs merged commit d4cd56d into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 14, 2011
  1. @roycamp

    Removing serverIndex as this.servers.get(this.serverIndex++) causes A…

    roycamp authored
    …rray Index Out of Bounds when more than one server is supplied. This increment functionality isn't needed since server failures result in the server being removed from the server list and re-added after the retry period.
  2. @roycamp

    Added example usage for multiple servers.

    roycamp authored
    Added Troubleshooting section.
  3. @roycamp

    Formatting fixes

    roycamp authored
Commits on Dec 21, 2011
  1. @roycamp
Commits on Jan 4, 2012
  1. @roycamp
Commits on Jan 9, 2012
  1. Merge remote branch 'original/master'

    Roy Camp authored
  2. @roycamp
Commits on Jan 10, 2012
  1. Added TimeUUIDUtils from Hector with minor modifications. Updated Sin…

    Roy Camp authored
    …k to generate column TimeUUID from incoming timestamp instead of current time.
  2. Convert java.util.uuid to byte array

    Roy Camp authored
This page is out of date. Refresh to see the latest.
View
19 README.mkd
@@ -86,10 +86,15 @@ http://localhost:35871/flumeconfig.jsp), you will use a sink like:
`simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160")`
+For multiple servers use:
+
+`simpleCassandraSink("Keyspace1", "FlumeData", "FlumeIndexes", "localhost:9160", "hostname2:9160", "hostname3:9160")`
+
If you're new to flume and you want to test that the plugin works, I recommend
using a Source like `asciisynth(20, 100)`. You should see 20 corresponding entries
in each of the column families if you use `list` in cassandra-cli.
+
#### How it Works
When the Cassandra sink receives an event, it does the following:
@@ -105,3 +110,17 @@ This allows you to easily fetch all logs for a slice of time. Simply use
something like get_slice() on the index column family to get the uuids you
want for a particular slice of time, and then multiget the data column
family using those uuids as the keys.
+
+
+Troubleshooting
+---------------
+
+#### "No Cassandra servers available" Exception when trying to write data to a single server setup.
+
+Your keyspace may of defaulted to the `org.apache.cassandra.locator.NetworkTopologyStrategy`
+replication strategy resulting in Cassandra being unable to successfully commit. You can check
+using `describe Keyspace1;` in the cassandra-cli. Try using the following create keyspace
+statement instead:
+
+`create keyspace Keyspace1 with placement_strategy = 'org.apache.cassandra.locator.SimpleStrategy' and strategy_options = {replication_factor:1};`
+
View
21 pom.xml
@@ -53,7 +53,13 @@
<version>0.9.3-CDH3B4</version>
</dependency>
-
+ <dependency>
+ <groupId>com.eaio.uuid</groupId>
+ <artifactId>uuid</artifactId>
+ <version>3.2</version>
+ </dependency>
+
+
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
@@ -63,9 +69,14 @@
</dependencies>
<repositories>
- <repository>
- <id>cdh repository</id>
- <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
- </repository>
+ <repository>
+ <id>cdh repository</id>
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
+ </repository>
+
+ <repository>
+ <id>eaio.com</id>
+ <url>http://eaio.com/maven2</url>
+ </repository>
</repositories>
</project>
View
4 src/java/org/apache/cassandra/plugins/CassandraClient.java
@@ -130,7 +130,6 @@ private void batchMutate(
private class ServerSet {
private ArrayList<String> servers;
- private int serverIndex;
private Queue<Pair> dead = new LinkedList<Pair>();
private long retryTime;
@@ -150,7 +149,6 @@ private void batchMutate(
for(int i=0; i < servers.length; i++) {
this.servers.add(servers[i]);
}
- this.serverIndex = 0;
}
/** Gets the next available server. */
@@ -165,7 +163,7 @@ String get() throws NoServersAvailableException {
if(this.servers.isEmpty()) {
throw new NoServersAvailableException();
}
- return this.servers.get(this.serverIndex++);
+ return this.servers.get(0);
@thobbs Owner
thobbs added a note

So, just incrementing serverIndex without making sure it hasn't gone off the end of the list was definitely broken, but we do want to cycle through the servers in our list. If we always use 0, that won't happen.

I can make the change, but we probably just need to modulus by serverList.size().

@roycamp
roycamp added a note

My understanding is that ServerSet.get() is only called from CassandraClient.open(). CassandraClient.open() is only called during start up and when a Cassandra exception is thrown which results in ServerSet.markDead(). markDead() removes the server from the servers array thus causing the remaining servers to shift up in the array. When the markDead() timeout expires, the server is added onto the end of the servers array. This is how the cycling of servers is done. Is there a scenario where open() is called which I am missing?

@thobbs Owner
thobbs added a note

Ah, you're totally correct. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
}
/**
View
21 src/java/org/apache/cassandra/plugins/SimpleCassandraSink.java
@@ -6,12 +6,10 @@
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
+import java.util.UUID;
import org.apache.cassandra.thrift.*;
-import org.safehaus.uuid.UUID;
-import org.safehaus.uuid.UUIDGenerator;
-
import com.cloudera.flume.conf.Context;
import com.cloudera.flume.conf.SinkFactory.SinkBuilder;
import com.cloudera.flume.core.Event;
@@ -36,8 +34,6 @@
private CassandraClient cClient;
- private static final UUIDGenerator uuidGen = UUIDGenerator.getInstance();
-
private static final long MILLI_TO_MICRO = 1000; // 1ms = 1000us
public SimpleCassandraSink(String keyspace, String dataColumnFamily,
@@ -63,12 +59,15 @@ public void open() throws IOException {
@Override
public void append(Event event) throws IOException, InterruptedException {
- long timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
+ // Preserve timestamp from when the event was generated
+ long timestamp = event.getTimestamp();
@thobbs Owner
thobbs added a note

The main problem with doing this is that the TimeUUID we generate just below this won't use the same timestamp, so columns in the index rows won't necessarily match up with the time range in the row key.

It's possible to create a TimeUUID given a timestamp, but we would need to swap out UUID libraries and use something like this: https://github.com/rantav/hector/blob/master/core/src/main/java/me/prettyprint/cassandra/utils/TimeUUIDUtils.java

@roycamp
roycamp added a note

Ah, good catch. I only accounted for the row key not the column TimeUUID. I will commit a fix.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ if(timestamp == 0)
+ timestamp = System.currentTimeMillis() * MILLI_TO_MICRO;
// Make the index column
- UUID uuid = uuidGen.generateTimeBasedUUID();
+ UUID uuid = TimeUUIDUtils.getTimeUUID(timestamp);
Column indexColumn = new Column();
- indexColumn.setName(uuid.toByteArray());
+ indexColumn.setName(TimeUUIDUtils.asByteArray(uuid));
indexColumn.setValue(new byte[0]);
indexColumn.setTimestamp(timestamp);
@@ -79,7 +78,7 @@ public void append(Event event) throws IOException, InterruptedException {
dataColumn.setTimestamp(timestamp);
// Insert the index
- this.cClient.insert(this.getKey(), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
+ this.cClient.insert(this.getKey(timestamp), this.indexColumnFamily, new Column[] {indexColumn}, ConsistencyLevel.QUORUM);
// Insert the data (row key is the uuid and there is only one column)
this.cClient.insert(uuid.toString().getBytes(), this.dataColumnFamily, new Column[] {dataColumn}, ConsistencyLevel.QUORUM);
super.append(event);
@@ -89,8 +88,10 @@ public void append(Event event) throws IOException, InterruptedException {
* Returns a String representing the current date to be used as
* a key. This has the format "YYYYMMDDHH".
*/
- private byte[] getKey() {
+ private byte[] getKey(long timestamp) {
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT+0"));
+ cal.setTimeInMillis(timestamp);
+
int day = cal.get(Calendar.DAY_OF_MONTH);
int month = cal.get(Calendar.MONTH);
int year = cal.get(Calendar.YEAR);
View
153 src/java/org/apache/cassandra/plugins/TimeUUIDUtils.java
@@ -0,0 +1,153 @@
+package org.apache.cassandra.plugins;
+
+import java.nio.ByteBuffer;
+import java.util.UUID;
+
+import com.eaio.uuid.UUIDGen;
+
+
+/**
+* This was copied from them Hector library and has minimal changes.
+* me.prettyprint.cassandra.utils.TimeUUIDUtils.java
+*
+* Utilitary class to generate TimeUUID (type 1)
+*
+* @author Patricio Echague (pechague@gmail.com)
+*
+*/
+public final class TimeUUIDUtils {
+
+ static final long NUM_100NS_INTERVALS_SINCE_UUID_EPOCH = 0x01b21dd213814000L;
+
+ /**
+* Gets a new and unique time uuid in milliseconds. It is useful to use in a TimeUUIDType sorted column family.
+*
+* @return the time uuid
+*/
+ public static java.util.UUID getUniqueTimeUUIDinMillis() {
+ return new java.util.UUID(UUIDGen.newTime(), UUIDGen.getClockSeqAndNode());
+ }
+
+ /**
+* Gets a new time uuid using {@link ClockResolution#createClock()} as a time generator.
+* It is useful to use in a TimeUUIDType sorted column family.
+*
+* @param clock a ClockResolution
+* @return the time uuid
+*/
+ /*
+ public static java.util.UUID getTimeUUID(ClockResolution clock) {
+ return getTimeUUID(clock.createClock());
+ }
+*/
+ /**
+* Gets a new time uuid based on <code>time<code>.
+* NOTE: this algorithm does not resolve duplicates. To avoid duplicates use
+* {@link getTimeUUID(ClockResolution clock)} with an implementaion that provides unique timestamp resolution, like
+* {@link MicrosecondsSyncClockResolution}
+* It is useful to use in a TimeUUIDType sorted column family.
+*
+* @param clock a ClockResolution
+* @return the time uuid
+*/
+ public static java.util.UUID getTimeUUID(long time) {
+ return new java.util.UUID(createTime(time), UUIDGen.getClockSeqAndNode());
+ }
+
+ private static long createTime(long currentTime) {
+ long time;
+
+ // UTC time
+ long timeToUse = (currentTime * 10000) + NUM_100NS_INTERVALS_SINCE_UUID_EPOCH;
+
+ // time low
+ time = timeToUse << 32;
+
+ // time mid
+ time |= (timeToUse & 0xFFFF00000000L) >> 16;
+
+ // time hi and version
+ time |= 0x1000 | ((timeToUse >> 48) & 0x0FFF); // version 1
+ return time;
+ }
+
+
+ /**
+* Returns an instance of uuid. Useful for when you read out of cassandra
+* you are getting a byte[] that needs to be converted into a TimeUUID.
+*
+* @param uuid the uuid
+* @return the java.util.uuid
+*/
+ public static java.util.UUID toUUID(byte[] uuid) {
+ return uuid(uuid, 0);
+ }
+
+ /**
+* Retrieves the time as long based on the byte[] representation of a UUID.
+*
+* @param uuid byte[] uuid representation
+* @return a long representing the time
+*/
+ public static long getTimeFromUUID(byte[] uuid) {
+ return getTimeFromUUID(TimeUUIDUtils.toUUID(uuid));
+ }
+
+ public static long getTimeFromUUID(UUID uuid) {
+ return (uuid.timestamp() - NUM_100NS_INTERVALS_SINCE_UUID_EPOCH) / 10000;
+ }
+
+ /**
+* As byte array.
+* This method is often used in conjunction with @link {@link #getTimeUUID()}
+*
+* @param uuid the uuid
+*
+* @return the byte[]
+*/
+ public static byte[] asByteArray(java.util.UUID uuid) {
+ long msb = uuid.getMostSignificantBits();
+ long lsb = uuid.getLeastSignificantBits();
+ byte[] buffer = new byte[16];
+
+ for (int i = 0; i < 8; i++) {
+ buffer[i] = (byte) (msb >>> 8 * (7 - i));
+ }
+
+ for (int i = 8; i < 16; i++) {
+ buffer[i] = (byte) (lsb >>> 8 * (7 - i));
+ }
+
+ return buffer;
+ }
+
+ /**
+* Coverts a java.util.UUID into a ByteBuffer.
+* @param uuid a java.util.UUID
+* @return a ByteBuffer representaion of the param UUID
+*/
+ public static ByteBuffer asByteBuffer(java.util.UUID uuid) {
+ if (uuid == null) {
+ return null;
+ }
+
+ return ByteBuffer.wrap(asByteArray(uuid));
+ }
+
+
+ public static UUID uuid(byte[] uuid, int offset) {
+ ByteBuffer bb = ByteBuffer.wrap(uuid, offset, 16);
+ return new UUID(bb.getLong(), bb.getLong());
+ }
+
+ /**
+* Converts a ByteBuffer containing a UUID into a java.util.UUID
+* @param bb a ByteBuffer containing a UUID
+* @return a java.util.UUID
+*/
+ public static UUID uuid(ByteBuffer bb) {
+ bb = bb.slice();
+ return new UUID(bb.getLong(), bb.getLong());
+ }
+
+}
Something went wrong with that request. Please try again.