Permalink
Browse files

Merge remote-tracking branch 'anfeng/master'

  • Loading branch information...
2 parents 83af2b8 + 01c4147 commit de923c4df839f4ad1d94dd75f4f30ed546805208 @nathanmarz committed Feb 12, 2013
View
@@ -104,6 +104,7 @@ topology.sleep.spout.wait.strategy.time.ms: 1
topology.error.throttle.interval.secs: 10
topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
+topology.tuple.serializer: "backtype.storm.serialization.types.ListDelegateSerializer"
topology.trident.batch.emit.interval.millis: 500
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
@@ -65,6 +65,12 @@
public static String STORM_LOCAL_HOSTNAME = "storm.local.hostname";
/**
+ * The serializer class for ListDelegate (tuple payload).
+ * The default serializer will be ListDelegateSerializer
+ */
+ public static String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
+
+ /**
* Whether or not to use ZeroMQ for messaging in local mode. If this is set
* to false, then Storm will use a pure-Java messaging system. The purpose
* of this flag is to make it easy to run Storm in local mode by eliminating
@@ -199,8 +205,7 @@
* whether topologies are allowed to run or not.
*/
public static String NIMBUS_TOPOLOGY_VALIDATOR = "nimbus.topology.validator";
-
-
+
/**
* Storm UI binds to this port.
*/
@@ -0,0 +1,74 @@
+package backtype.storm.security.serialization;
+
+import java.util.Map;
+import org.apache.commons.codec.binary.Hex;
+import org.apache.log4j.Logger;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.serializers.BlowfishSerializer;
+
+import backtype.storm.serialization.types.ListDelegateSerializer;
+import backtype.storm.utils.ListDelegate;
+import backtype.storm.Config;
+
+/**
+ * Apply Blowfish encrption for tuple communication to bolts
+ */
+public class BlowfishTupleSerializer extends Serializer<ListDelegate> {
+ /**
+ * The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory).
+ * You should use in via "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ...".
+ */
+ public static String SECRET_KEY = "topology.tuple.serializer.blowfish.key";
+ private static final Logger LOG = Logger.getLogger(BlowfishSerializer.class);
+ private BlowfishSerializer _serializer;
+
+ public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) {
+ String encryption_key = null;
+ try {
+ encryption_key = (String)storm_conf.get(SECRET_KEY);
+ LOG.debug("Blowfish serializer being constructed ...");
+ if (encryption_key == null) {
+ LOG.error("Encryption key not specified");
+ throw new RuntimeException("Blowfish encryption key not specified");
+ }
+ byte[] bytes = Hex.decodeHex(encryption_key.toCharArray());
+ _serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes);
+ } catch (org.apache.commons.codec.DecoderException ex) {
+ LOG.error("Invalid encryption key");
+ throw new RuntimeException("Blowfish encryption key invalid");
+ }
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ListDelegate object) {
+ _serializer.write(kryo, output, object);
+ }
+
+ @Override
+ public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) {
+ return (ListDelegate)_serializer.read(kryo, input, type);
+ }
+
+ /**
+ * Produce a blowfish key to be used in "Storm jar" command
+ */
+ public static void main(String[] args) {
+ try{
+ KeyGenerator kgen = KeyGenerator.getInstance("Blowfish");
+ SecretKey skey = kgen.generateKey();
+ byte[] raw = skey.getEncoded();
+ String keyString = new String(Hex.encodeHex(raw));
+ System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." );
+ } catch (Exception ex) {
+ LOG.error(ex.getMessage());
+ ex.printStackTrace();
+ }
+ }
+}
@@ -1,5 +1,6 @@
package backtype.storm.serialization;
+import backtype.storm.utils.ListDelegate;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import java.io.IOException;
@@ -17,7 +18,8 @@ public KryoValuesDeserializer(Map conf) {
}
public List<Object> deserializeFrom(Input input) {
- return (List<Object>) _kryo.readObject(input, ArrayList.class);
+ ListDelegate delegate = (ListDelegate) _kryo.readObject(input, ListDelegate.class);
+ return delegate.getDelegate();
}
public List<Object> deserialize(byte[] ser) throws IOException {
@@ -4,6 +4,7 @@
import backtype.storm.generated.ComponentCommon;
import backtype.storm.generated.StormTopology;
import backtype.storm.serialization.types.ArrayListSerializer;
+import backtype.storm.serialization.types.ListDelegateSerializer;
import backtype.storm.serialization.types.HashMapSerializer;
import backtype.storm.serialization.types.HashSetSerializer;
import backtype.storm.transactional.TransactionAttempt;
@@ -32,7 +33,18 @@ public static Kryo getKryo(Map conf) {
IKryoFactory kryoFactory = (IKryoFactory) Utils.newInstance((String) conf.get(Config.TOPOLOGY_KRYO_FACTORY));
Kryo k = kryoFactory.getKryo(conf);
k.register(byte[].class);
- k.register(ListDelegate.class);
+
+ /* tuple payload serializer is specified via configuration */
+ String payloadSerializerName = (String)conf.get(Config.TOPOLOGY_TUPLE_SERIALIZER);
+ try {
+ Class serializerClass = Class.forName(payloadSerializerName);
+ Serializer serializer = resolveSerializerInstance(k, ListDelegate.class, serializerClass, conf);
+ k.register(ListDelegate.class, serializer);
+ } catch (ClassNotFoundException ex) {
+ LOG.error("Could not load class in class path: " + payloadSerializerName.length(), ex);
+ throw new RuntimeException(ex);
+ }
+
k.register(ArrayList.class, new ArrayListSerializer());
k.register(HashMap.class, new HashMapSerializer());
k.register(HashSet.class, new HashSetSerializer());
@@ -63,9 +75,8 @@ public static Kryo getKryo(Map conf) {
if(serializerClass == null) {
k.register(klass);
} else {
- k.register(klass, resolveSerializerInstance(k, klass, serializerClass));
+ k.register(klass, resolveSerializerInstance(k, klass, serializerClass, conf));
}
-
} catch (ClassNotFoundException e) {
if(skipMissing) {
LOG.info("Could not find serialization or class for " + serializerClassName + ". Skipping registration...");
@@ -139,18 +150,30 @@ public String getStreamName(String component, int stream) {
}
}
- private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass) {
+ private static Serializer resolveSerializerInstance(Kryo k, Class superClass, Class<? extends Serializer> serializerClass, Map conf) {
try {
try {
- return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
+ return serializerClass.getConstructor(Kryo.class, Class.class, Map.class).newInstance(k, superClass, conf);
} catch (Exception ex1) {
try {
- return serializerClass.getConstructor(Kryo.class).newInstance(k);
+ return serializerClass.getConstructor(Kryo.class, Class.class).newInstance(k, superClass);
} catch (Exception ex2) {
try {
- return serializerClass.getConstructor(Class.class).newInstance(superClass);
+ return serializerClass.getConstructor(Kryo.class, Map.class).newInstance(k, conf);
} catch (Exception ex3) {
- return serializerClass.newInstance();
+ try {
+ return serializerClass.getConstructor(Kryo.class).newInstance(k);
+ } catch (Exception ex4) {
+ try {
+ return serializerClass.getConstructor(Class.class, Map.class).newInstance(superClass, conf);
+ } catch (Exception ex5) {
+ try {
+ return serializerClass.getConstructor(Class.class).newInstance(superClass);
+ } catch (Exception ex6) {
+ return serializerClass.newInstance();
+ }
+ }
+ }
}
}
}
@@ -0,0 +1,15 @@
+package backtype.storm.serialization.types;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.serializers.CollectionSerializer;
+import backtype.storm.utils.ListDelegate;
+import java.util.Collection;
+
+
+public class ListDelegateSerializer extends CollectionSerializer {
+ @Override
+ public Collection create(Kryo kryo, Input input, Class<Collection> type) {
+ return new ListDelegate();
+ }
+}
@@ -2,15 +2,24 @@
import java.util.Collection;
import java.util.Iterator;
+import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
public class ListDelegate implements List<Object> {
private List<Object> _delegate;
+ public ListDelegate() {
+ _delegate = new ArrayList<Object>();
+ }
+
public void setDelegate(List<Object> delegate) {
_delegate = delegate;
}
+
+ public List<Object> getDelegate() {
+ return _delegate;
+ }
@Override
public int size() {

0 comments on commit de923c4

Please sign in to comment.