Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Variable Length ID Support #86

Open
wants to merge 1 commit into from

1 participant

@herriojr

See Issue #36

Manually merged changes I had with old internal copy of camus since the layout of the project had changed significantly. This allows id or a variable length byte array to be used as the identifier, which is then treated as a hex string internal to the system.

The encoder reads directly from the config passed in its constructor for the value of id.variable.length which defaults to false for backwards compatibility. If found and true, will allow variable length strings to be used as the id instead of an integer.

Jonathan Herriott Merged changes I had with old internal copy of camus since the layout of
the project had changed significantly.  This allows id or a variable
length byte array to be used as the identifier, which is then treated as
a hex string internal to the system.
b623af5
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 15, 2014
  1. Merged changes I had with old internal copy of camus since the layout of

    Jonathan Herriott authored
    the project had changed significantly.  This allows id or a variable
    length byte array to be used as the identifier, which is then treated as
    a hex string internal to the system.
This page is out of date. Refresh to see the latest.
View
29 camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageDecoder.java
@@ -11,6 +11,7 @@
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DecoderFactory;
+import org.apache.commons.codec.binary.Hex;
import com.linkedin.camus.coders.CamusWrapper;
import com.linkedin.camus.coders.MessageDecoder;
@@ -50,7 +51,8 @@ public void init(Properties props, String topicName) {
private int start;
private int length;
private Schema targetSchema;
- private static final byte MAGIC_BYTE = 0x0;
+ private static final byte MAGIC_BYTE_V0 = 0x0;
+ private static final byte MAGIC_BYTE_V1 = 0x1;
private final SchemaRegistry<Schema> registry;
private final String topicName;
private byte[] payload;
@@ -84,15 +86,30 @@ public Schema getTargetSchema() {
}
private ByteBuffer getByteBuffer(byte[] payload) {
- ByteBuffer buffer = ByteBuffer.wrap(payload);
- if (buffer.get() != MAGIC_BYTE)
- throw new IllegalArgumentException("Unknown magic byte!");
- return buffer;
+ return ByteBuffer.wrap(payload);
+
}
public MessageDecoderHelper invoke() {
buffer = getByteBuffer(payload);
- String id = Integer.toString(buffer.getInt());
+ byte magicByte = buffer.get();
+ if (magicByte != MAGIC_BYTE_V0 || magicByte != MAGIC_BYTE_V1)
+ throw new IllegalArgumentException("Unknown magic byte!");
+
+ String id;
+ if (magicByte == MAGIC_BYTE_V0) {
+ // This is for backwards compatibility where the id was only represented
+ // by an integer.
+ id = Integer.toString(buffer.getInt());
+ } else {
+ // The new method is to get the length of the ID, then read that length
+ // from the buffer into a Hex String.
+ int idLength = buffer.getInt();
+ byte[] dst = new byte[idLength];
+ buffer.get(dst);
+ id = Hex.encodeHexString(dst);
+ }
+
schema = registry.getSchemaByID(topicName, id);
if (schema == null)
throw new IllegalStateException("Unknown schema id: " + id);
View
20 camus-kafka-coders/src/main/java/com/linkedin/camus/etl/kafka/coders/KafkaAvroMessageEncoder.java
@@ -19,6 +19,8 @@
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
+import org.apache.commons.codec.DecoderException;
+import org.apache.commons.codec.binary.Hex;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
@@ -30,18 +32,21 @@
public class KafkaAvroMessageEncoder extends MessageEncoder<IndexedRecord, byte[]> {
public static final String KAFKA_MESSAGE_CODER_SCHEMA_REGISTRY_CLASS = "kafka.message.coder.schema.registry.class";
- private static final byte MAGIC_BYTE = 0x0;
+ private static final byte MAGIC_BYTE_V0 = 0x0;
+ private static final byte MAGIC_BYTE_V1 = 0x1;
+
private static final Logger logger = Logger.getLogger(KafkaAvroMessageEncoder.class);
private SchemaRegistry<Schema> client;
private final Map<Schema, String> cache = Collections
.synchronizedMap(new HashMap<Schema, String>());
private final EncoderFactory encoderFactory = EncoderFactory.get();
+ private final byte magicByte;
@SuppressWarnings("unchecked")
public KafkaAvroMessageEncoder(String topicName, Configuration conf) {
this.topicName = topicName;
-
+ this.magicByte = conf.getBoolean("id.variable.length", false) ? MAGIC_BYTE_V1 : MAGIC_BYTE_V0;
}
@Override
@@ -61,7 +66,7 @@ public void init(Properties props, String topicName) {
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
- out.write(MAGIC_BYTE);
+ out.write(magicByte);
Schema schema = record.getSchema();
@@ -78,7 +83,12 @@ public void init(Properties props, String topicName) {
id = cache.get(schema);
}
- out.write(ByteBuffer.allocate(4).putInt(Integer.parseInt(id)).array());
+ if (magicByte == MAGIC_BYTE_V0) {
+ out.write(ByteBuffer.allocate(4).putInt(Integer.parseInt(id)).array());
+ } else {
+ out.write(ByteBuffer.allocate(4).putInt(id.length()).array());
+ out.write(Hex.decodeHex(id.toCharArray()));
+ }
BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null);
DatumWriter<IndexedRecord> writer;
@@ -94,6 +104,8 @@ public void init(Properties props, String topicName) {
//return new Message(out.toByteArray());
} catch (IOException e) {
throw new MessageEncoderException(e);
+ } catch (DecoderException e) {
+ throw new MessageEncoderException(e);
}
}
}
Something went wrong with that request. Please try again.