Skip to content

Commit

Permalink
Enable plugin for tuple serialization, and provide blowfish encryptio…
Browse files Browse the repository at this point in the history
…n plugin as an example
  • Loading branch information
afeng committed Feb 10, 2013
1 parent 06dcc2a commit 5ea2e6f
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 11 deletions.
9 changes: 7 additions & 2 deletions src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public class Config extends HashMap<String, Object> {
*/
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";

/**
* The serializer class for ListDelegate (tuple payload).
* The default serializer will be ListDelegateSerializer
*/
public static String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";

/**
* Whether or not to use ZeroMQ for messaging in local mode. If this is set
* to false, then Storm will use a pure-Java messaging system. The purpose
Expand Down Expand Up @@ -199,8 +205,7 @@ public class Config extends HashMap<String, Object> {
* whether topologies are allowed to run or not.
*/
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";



/**
* Storm UI binds to this port.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package backtype.storm.serialization;

import backtype.storm.utils.ListDelegate;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import java.io.IOException;
Expand All @@ -17,7 +18,8 @@ public KryoValuesDeserializer(Map conf) {
}

public List<Object> deserializeFrom(Input input) {
return (List<Object>) _kryo.readObject(input, ArrayList.class);
ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
return delegate.getDelegate();
}

public List<Object> deserialize(byte[] ser) throws IOException {
Expand Down
46 changes: 38 additions & 8 deletions src/jvm/backtype/storm/serialization/SerializationFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
import backtype.storm.serialization.types.ListDelegateSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
Expand Down Expand Up @@ -32,7 +33,25 @@ public static Kryo getKryo(Map conf) {
IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);
k.register(byte[].class);
k.register(ListDelegate.class);

/* tuple payload serializer could be specified via configuration */
String payloadSerializerName = (String)conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
if (payloadSerializerName==null)
k.register(ListDelegate.class, new ListDelegateSerializer()); //use default payload serializer
else {
try {
Class serializerClass = Class.forName(payloadSerializerName);
Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
if (serializer == null)
k.register(ListDelegate.class, new ListDelegateSerializer());
else
k.register(ListDelegate.class, serializer);
} catch (ClassNotFoundException ex ){
LOG.error(ex + " Could not load class in class path: " + payloadSerializerName);
k.register(ListDelegate.class, new ListDelegateSerializer());
}
}

k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
Expand Down Expand Up @@ -63,9 +82,8 @@ public static Kryo getKryo(Map conf) {
if(serializerClass == null) {
k.register(klass);
} else {
k.register(klass, resolveSerializerInstance(k, klass, serializerClass));
k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
}

} catch (ClassNotFoundException e) {
if(skipMissing) {
LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
Expand Down Expand Up @@ -139,18 +157,30 @@ private static Map<String, Integer> idify(List<String> names) {
}
}

private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass) {
private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) {
try {
try {
return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
return serializerClass.getConstructor(Kryo.class, Class.class, Map.class).newInstance(k, superClass, conf);
} catch (Exception ex1) {
try {
return serializerClass.getConstructor(Kryo.class).newInstance(k);
return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
} catch (Exception ex2) {
try {
return serializerClass.getConstructor(Class.class).newInstance(superClass);
return serializerClass.getConstructor(Kryo.class, Map.class).newInstance(k, conf);
} catch (Exception ex3) {
return serializerClass.newInstance();
try {
return serializerClass.getConstructor(Kryo.class).newInstance(k);
} catch (Exception ex4) {
try {
return serializerClass.getConstructor(Class.class, Map.class).newInstance(superClass, conf);
} catch (Exception ex5) {
try {
return serializerClass.getConstructor(Class.class).newInstance(superClass);
} catch (Exception ex6) {
return serializerClass.newInstance();
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
/**
* Copyright (c) 2013 Yahoo! Inc. All Rights Reserved.
*
* Copyrights licensed under the Eclipse Public License.
* See the accompanying LICENSE file for terms.
*/
package backtype.storm.serialization.types;

import com.esotericsoftware.kryo.Kryo;
Expand Down
9 changes: 9 additions & 0 deletions src/jvm/backtype/storm/utils/ListDelegate.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,24 @@

import java.util.Collection;
import java.util.Iterator;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;

public class ListDelegate implements List<Object> {
private List<Object> _delegate;

public ListDelegate() {
_delegate = new ArrayList<Object>();
}

public void setDelegate(List<Object> delegate) {
_delegate = delegate;
}

public List<Object> getDelegate() {
return _delegate;
}

@Override
public int size() {
Expand Down

0 comments on commit 5ea2e6f

Please sign in to comment.