forked from nathanmarz/storm
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Enable plugin for tuple serialization, and provide blowfish encryptio…
…n plugin as an example
- Loading branch information
afeng
committed
Feb 10, 2013
1 parent
4859f0d
commit 06dcc2a
Showing
2 changed files
with
95 additions
and
0 deletions.
There are no files selected for viewing
80 changes: 80 additions & 0 deletions
80
src/jvm/backtype/storm/security/serialization/BlowfishTupleSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/** | ||
* Copyright (c) 2013 Yahoo! Inc. All Rights Reserved. | ||
* | ||
* Copyrights licensed under the Eclipse Public License. | ||
* See the accompanying LICENSE file for terms. | ||
*/ | ||
package backtype.storm.security.serialization; | ||
|
||
import java.util.Map; | ||
import org.apache.commons.codec.binary.Hex; | ||
import org.apache.log4j.Logger; | ||
|
||
import javax.crypto.KeyGenerator; | ||
import javax.crypto.SecretKey; | ||
|
||
import com.esotericsoftware.kryo.Kryo; | ||
import com.esotericsoftware.kryo.io.Input; | ||
import com.esotericsoftware.kryo.io.Output; | ||
import com.esotericsoftware.kryo.Serializer; | ||
import com.esotericsoftware.kryo.serializers.BlowfishSerializer; | ||
|
||
import backtype.storm.serialization.types.ListDelegateSerializer; | ||
import backtype.storm.utils.ListDelegate; | ||
import backtype.storm.Config; | ||
|
||
/** | ||
* Apply Blowfish encrption for tuple communication to bolts | ||
*/ | ||
public class BlowfishTupleSerializer extends Serializer<ListDelegate> { | ||
/** | ||
* The secret key (if any) for data encryption by blowfish payload serialization factory (BlowfishSerializationFactory). | ||
* You should use in via "storm -c topology.tuple.serializer.blowfish.key=YOURKEY -c topology.tuple.serializer=backtype.storm.security.serialization.BlowfishTupleSerializer jar ...". | ||
*/ | ||
public static String SECRET_KEY = "topology.tuple.serializer.blowfish.key"; | ||
private static final Logger LOG = Logger.getLogger(BlowfishSerializer.class); | ||
private BlowfishSerializer _serializer; | ||
|
||
public BlowfishTupleSerializer(Kryo kryo, Map storm_conf) { | ||
String encryption_key = null; | ||
try { | ||
encryption_key = (String)storm_conf.get(SECRET_KEY); | ||
LOG.debug("Blowfish serializer being constructed ..."); | ||
if (encryption_key == null) { | ||
LOG.error("Encryption key not specified"); | ||
throw new RuntimeException("Blowfish encryption key not specified"); | ||
} | ||
byte[] bytes = Hex.decodeHex(encryption_key.toCharArray()); | ||
_serializer = new BlowfishSerializer(new ListDelegateSerializer(), bytes); | ||
} catch (org.apache.commons.codec.DecoderException ex) { | ||
LOG.error("Invalid encryption key"); | ||
throw new RuntimeException("Blowfish encryption key invalid"); | ||
} | ||
} | ||
|
||
@Override | ||
public void write(Kryo kryo, Output output, ListDelegate object) { | ||
_serializer.write(kryo, output, object); | ||
} | ||
|
||
@Override | ||
public ListDelegate read(Kryo kryo, Input input, Class<ListDelegate> type) { | ||
return (ListDelegate)_serializer.read(kryo, input, type); | ||
} | ||
|
||
/** | ||
* Produce a blowfish key to be used in "Storm jar" command | ||
*/ | ||
public static void main(String[] args) { | ||
try{ | ||
KeyGenerator kgen = KeyGenerator.getInstance("Blowfish"); | ||
SecretKey skey = kgen.generateKey(); | ||
byte[] raw = skey.getEncoded(); | ||
String keyString = new String(Hex.encodeHex(raw)); | ||
System.out.println("storm -c "+SECRET_KEY+"="+keyString+" -c "+Config.TOPOLOGY_TUPLE_SERIALIZER+"="+BlowfishTupleSerializer.class.getName() + " ..." ); | ||
} catch (Exception ex) { | ||
LOG.error(ex.getMessage()); | ||
ex.printStackTrace(); | ||
} | ||
} | ||
} |
15 changes: 15 additions & 0 deletions
15
src/jvm/backtype/storm/serialization/types/ListDelegateSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
package backtype.storm.serialization.types; | ||
|
||
import com.esotericsoftware.kryo.Kryo; | ||
import com.esotericsoftware.kryo.io.Input; | ||
import com.esotericsoftware.kryo.serializers.CollectionSerializer; | ||
import backtype.storm.utils.ListDelegate; | ||
import java.util.Collection; | ||
|
||
|
||
public class ListDelegateSerializer extends CollectionSerializer { | ||
@Override | ||
public Collection create(Kryo kryo, Input input, Class<Collection> type) { | ||
return new ListDelegate(); | ||
} | ||
} |