Skip to content

Commit

Permalink
updated serialization tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Nov 19, 2011
1 parent 3539864 commit c91da28
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 77 deletions.
Expand Up @@ -4,5 +4,5 @@
import java.io.IOException;

public interface ITupleDeserializer {
Tuple deserialize(byte[] ser) throws IOException;
Tuple deserialize(byte[] ser);
}
3 changes: 1 addition & 2 deletions src/jvm/backtype/storm/serialization/ITupleSerializer.java
@@ -1,10 +1,9 @@
package backtype.storm.serialization;

import backtype.storm.tuple.Tuple;
import java.io.IOException;


public interface ITupleSerializer {
byte[] serialize(Tuple tuple) throws IOException;
byte[] serialize(Tuple tuple);
long crc32(Tuple tuple);
}
26 changes: 14 additions & 12 deletions src/jvm/backtype/storm/serialization/KryoTupleDeserializer.java
Expand Up @@ -4,30 +4,32 @@
import backtype.storm.tuple.MessageId;
import backtype.storm.tuple.Tuple;
import backtype.storm.utils.WritableUtils;
import com.esotericsoftware.kryo.ObjectBuffer;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class KryoTupleDeserializer implements ITupleDeserializer {
ObjectBuffer _kryo;
TopologyContext _context;
KryoValuesDeserializer _kryo;

public KryoTupleDeserializer(Map conf, TopologyContext context) {
_kryo = KryoFactory.getKryo(conf);
_kryo = new KryoValuesDeserializer(conf);
_context = context;
}

public Tuple deserialize(byte[] ser) throws IOException {
ByteArrayInputStream bin = new ByteArrayInputStream(ser);
DataInputStream in = new DataInputStream(bin);
int taskId = WritableUtils.readVInt(in);
int streamId = WritableUtils.readVInt(in);
MessageId id = MessageId.deserialize(in);
List<Object> values = (List<Object>) _kryo.readObject(in, ArrayList.class);
return new Tuple(_context, values, taskId, streamId, id);
public Tuple deserialize(byte[] ser) {
try {
ByteArrayInputStream bin = new ByteArrayInputStream(ser);
DataInputStream in = new DataInputStream(bin);
int taskId = WritableUtils.readVInt(in);
int streamId = WritableUtils.readVInt(in);
MessageId id = MessageId.deserialize(in);
List<Object> values = _kryo.deserializeFrom(bin);
return new Tuple(_context, values, taskId, streamId, id);
} catch(IOException e) {
throw new RuntimeException(e);
}
}
}
42 changes: 21 additions & 21 deletions src/jvm/backtype/storm/serialization/KryoTupleSerializer.java
Expand Up @@ -2,9 +2,7 @@

import backtype.storm.tuple.Tuple;
import backtype.storm.utils.CRC32OutputStream;
import backtype.storm.utils.ListDelegate;
import backtype.storm.utils.WritableUtils;
import com.esotericsoftware.kryo.ObjectBuffer;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
Expand All @@ -13,34 +11,36 @@
public class KryoTupleSerializer implements ITupleSerializer {
ByteArrayOutputStream _outputter;
DataOutputStream _dataOutputter;
ObjectBuffer _kryo;
ListDelegate _delegate;
KryoValuesSerializer _kryo;

public KryoTupleSerializer(Map conf) {
_outputter = new ByteArrayOutputStream();
_dataOutputter = new DataOutputStream(_outputter);
_kryo = KryoFactory.getKryo(conf);
_delegate = new ListDelegate();
_kryo = new KryoValuesSerializer(conf);
}

public byte[] serialize(Tuple tuple) throws IOException {
_outputter.reset();
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceTask());
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceStreamId());
tuple.getMessageId().serialize(_dataOutputter);
// this ensures that list of values is always written the same way, regardless
// of whether it's a java collection or one of clojure's persistent collections
// (which have different serializers)
// Doing this lets us deserialize as ArrayList and avoid writing the class here
_delegate.setDelegate(tuple.getValues());
_kryo.writeObject(_outputter, _delegate);
return _outputter.toByteArray();
public byte[] serialize(Tuple tuple) {
try {
_outputter.reset();
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceTask());
WritableUtils.writeVInt(_dataOutputter, tuple.getSourceStreamId());
tuple.getMessageId().serialize(_dataOutputter);
_kryo.serializeInto(tuple.getValues(), _outputter);
return _outputter.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

public long crc32(Tuple tuple) {
CRC32OutputStream hasher = new CRC32OutputStream();
_kryo.writeObject(hasher, tuple.getValues());
return hasher.getValue();
try {
CRC32OutputStream hasher = new CRC32OutputStream();
_kryo.serializeInto(tuple.getValues(), hasher);
return hasher.getValue();
} catch (IOException e) {
throw new RuntimeException(e);
}

}


Expand Down
7 changes: 7 additions & 0 deletions src/jvm/backtype/storm/testing/TestSerObject.java
Expand Up @@ -10,4 +10,11 @@ public TestSerObject(int f1, int f2) {
this.f1 = f1;
this.f2 = f2;
}

@Override
public boolean equals(Object o) {
TestSerObject other = (TestSerObject) o;
return f1 == other.f1 && f2 == other.f2;
}

}
87 changes: 46 additions & 41 deletions test/clj/backtype/storm/serialization_test.clj
Expand Up @@ -2,49 +2,54 @@
(:use [clojure test])
(:import [java.io ByteArrayOutputStream DataOutputStream
ByteArrayInputStream DataInputStream])
(:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer])
(:import [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer
KryoValuesSerializer KryoValuesDeserializer])
(:import [backtype.storm.testing TestSerObject])
(:use [backtype.storm util config])
)


;; (defn mk-conf [extra]
;; (merge (read-default-config) extra))
;;
;; (deftest test-java-serialization
;; (letlocals
;;
;; ))
;;
;; (defn serialize [vals]
;; (let [serializer (ValuesSerializer. (mk-conf {}))
;; bos (ByteArrayOutputStream.)
;; os (DataOutputStream. bos)]
;; (.serializeInto serializer vals os)
;; (.toByteArray bos)
;; ))
;;
;; (defn deserialize [bytes]
;; (let [deserializer (ValuesDeserializer. (mk-conf {}))
;; bin (ByteArrayInputStream. bytes)
;; in (DataInputStream. bin)]
;; (.deserializeFrom deserializer in)
;; ))
;;
;; (defn roundtrip [vals]
;; (deserialize (serialize vals)))
;;
;; (defn mk-string [size]
;; (let [builder (StringBuilder.)]
;; (doseq [i (range size)]
;; (.append builder "a"))
;; (.toString builder)))
;;
;; (defn is-roundtrip [vals]
;; (is (= vals (roundtrip vals))))
;;
;; (deftest test-string-serialization
;; (is-roundtrip ["a" "bb" "cde"])
;; (is-roundtrip [(mk-string (* 64 1024))])
;; (is-roundtrip [(mk-string (* 1024 1024))])
;; )
(defn mk-conf [extra]
(merge (read-default-config) extra))

(defn serialize [vals conf]
(let [serializer (KryoValuesSerializer. (mk-conf conf))
bos (ByteArrayOutputStream.)]
(.serializeInto serializer vals bos)
(.toByteArray bos)
))

(defn deserialize [bytes conf]
(let [deserializer (KryoValuesDeserializer. (mk-conf conf))
bin (ByteArrayInputStream. bytes)]
(.deserializeFrom deserializer bin)
))

(defn roundtrip
([vals] (roundtrip vals {}))
([vals conf]
(deserialize (serialize vals conf) conf)))

(deftest test-java-serialization
(letlocals
(bind obj (TestSerObject. 1 2))
(is (thrown? Exception
(roundtrip [obj] {TOPOLOGY-KRYO-REGISTER {"backtype.storm.testing.TestSerObject" nil}
TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION false})))
(= [obj] (roundtrip [obj] {TOPOLOGY-FALL-BACK-ON-JAVA-SERIALIZATION true}))
))

(defn mk-string [size]
(let [builder (StringBuilder.)]
(doseq [i (range size)]
(.append builder "a"))
(.toString builder)))

(defn is-roundtrip [vals]
(is (= vals (roundtrip vals))))

(deftest test-string-serialization
(is-roundtrip ["a" "bb" "cde"])
(is-roundtrip [(mk-string (* 64 1024))])
(is-roundtrip [(mk-string (* 1024 1024))])
)

0 comments on commit c91da28

Please sign in to comment.