Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Enable plugin for tuple serialization, and provide blowfish encryptio…

…n plugin as an example
  • Loading branch information...
commit 5ea2e6fc5d94e24f3de05a27c66238db999d9a2e 1 parent 06dcc2a
afeng authored
View
9 src/jvm/backtype/storm/Config.java
@@ -65,6 +65,12 @@
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
/**
+ * The serializer class for ListDelegate (tuple payload).
+ * The default serializer will be ListDelegateSerializer
+ */
+ public static String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
+
+ /**
* Whether or not to use ZeroMQ for messaging in local mode. If this is set
* to false, then Storm will use a pure-Java messaging system. The purpose
* of this flag is to make it easy to run Storm in local mode by eliminating
@@ -199,8 +205,7 @@
* whether topologies are allowed to run or not.
*/
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
-
-
+
/**
* Storm UI binds to this port.
*/
View
4 src/jvm/backtype/storm/serialization/KryoValuesDeserializer.java
@@ -1,5 +1,6 @@
package backtype.storm.serialization;
+import backtype.storm.utils.ListDelegate;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import java.io.IOException;
@@ -17,7 +18,8 @@ public KryoValuesDeserializer(Map conf) {
}
public List<Object> deserializeFrom(Input input) {
- return (List<Object>) _kryo.readObject(input, ArrayList.class);
+ ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+ return delegate.getDelegate();
}
public List<Object> deserialize(byte[] ser) throws IOException {
View
46 src/jvm/backtype/storm/serialization/SerializationFactory.java
@@ -4,6 +4,7 @@
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
+import backtype.storm.serialization.types.ListDelegateSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
@@ -32,7 +33,25 @@ public static Kryo getKryo(Map conf) {
IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);
k.register(byte[].class);
- k.register(ListDelegate.class);
+
+ /* tuple payload serializer could be specified via configuration */
+ String payloadSerializerName = (String)conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
+ if (payloadSerializerName==null)
+ k.register(ListDelegate.class, new ListDelegateSerializer()); //use default payload serializer
+ else {
+ try {
+ Class serializerClass = Class.forName(payloadSerializerName);
+ Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
+ if (serializer == null)
+ k.register(ListDelegate.class, new ListDelegateSerializer());
+ else
+ k.register(ListDelegate.class, serializer);
+ } catch (ClassNotFoundException ex ){
+ LOG.error(ex + " Could not load class in class path: " + payloadSerializerName);
+ k.register(ListDelegate.class, new ListDelegateSerializer());
+ }
+ }
+
k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
@@ -63,9 +82,8 @@ public static Kryo getKryo(Map conf) {
if(serializerClass == null) {
k.register(klass);
} else {
- k.register(klass, resolveSerializerInstance(k, klass, serializerClass));
+ k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
}
-
} catch (ClassNotFoundException e) {
if(skipMissing) {
LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
@@ -139,18 +157,30 @@ public String getStreamName(String component, int stream) {
}
}
- private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass) {
+ private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) {
try {
try {
- return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
+ return serializerClass.getConstructor(Kryo.class, Class.class, Map.class).newInstance(k, superClass, conf);
} catch (Exception ex1) {
try {
- return serializerClass.getConstructor(Kryo.class).newInstance(k);
+ return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
} catch (Exception ex2) {
try {
- return serializerClass.getConstructor(Class.class).newInstance(superClass);
+ return serializerClass.getConstructor(Kryo.class, Map.class).newInstance(k, conf);
} catch (Exception ex3) {
- return serializerClass.newInstance();
+ try {
+ return serializerClass.getConstructor(Kryo.class).newInstance(k);
+ } catch (Exception ex4) {
+ try {
+ return serializerClass.getConstructor(Class.class, Map.class).newInstance(superClass, conf);
+ } catch (Exception ex5) {
+ try {
+ return serializerClass.getConstructor(Class.class).newInstance(superClass);
+ } catch (Exception ex6) {
+ return serializerClass.newInstance();
+ }
+ }
+ }
}
}
}
View
6 src/jvm/backtype/storm/serialization/types/ListDelegateSerializer.java
@@ -1,3 +1,9 @@
+/**
+ * Copyright (c) 2013 Yahoo! Inc. All Rights Reserved.
+ *
+ * Copyrights licensed under the Eclipse Public License.
+ * See the accompanying LICENSE file for terms.
+ */
package backtype.storm.serialization.types;
import com.esotericsoftware.kryo.Kryo;
View
9 src/jvm/backtype/storm/utils/ListDelegate.java
@@ -2,15 +2,24 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
public class ListDelegate implements List<Object> {
private List<Object> _delegate;
+ public ListDelegate() {
+ _delegate = new ArrayList<Object>();
+ }
+
public void setDelegate(List<Object> delegate) {
_delegate = delegate;
}
+
+ public List<Object> getDelegate() {
+ return _delegate;
+ }
@Override
public int size() {
Please sign in to comment.
Something went wrong with that request. Please try again.