Skip to content

Commit

Permalink
BACKLOG-2041 -- Create encrypted data transfer flow
Browse files Browse the repository at this point in the history
What was done:
1) added initial functionality: key generation, session key encryption \
sharing
2) added rmote step encryption support
3) added unit tests for encoding \ decoding keys and random keys
generation
  • Loading branch information
kolinus committed Feb 19, 2015
1 parent 52f254c commit dfe0dfb
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 14 deletions.
Expand Up @@ -23,10 +23,17 @@
package org.pentaho.di.core.encryption; package org.pentaho.di.core.encryption;


import java.security.Key; import java.security.Key;
import java.security.KeyFactory;
import java.security.KeyPair; import java.security.KeyPair;
import java.security.KeyPairGenerator; import java.security.KeyPairGenerator;
import java.security.NoSuchAlgorithmException;
import java.security.spec.KeySpec;
import java.security.spec.PKCS8EncodedKeySpec;
import java.security.spec.X509EncodedKeySpec;


import javax.crypto.Cipher; import javax.crypto.Cipher;
import javax.crypto.KeyGenerator;
import javax.crypto.spec.SecretKeySpec;


import org.pentaho.di.core.logging.LogChannel; import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.logging.LoggingObjectInterface; import org.pentaho.di.core.logging.LoggingObjectInterface;
Expand All @@ -36,15 +43,17 @@
public class CertificateGenEncryptUtil { public class CertificateGenEncryptUtil {


public static final int KEY_SIZE = 1024; public static final int KEY_SIZE = 1024;
public static final String ALGORYTHM = "RSA"; public static final String PUBLIC_KEY_ALGORYTHM = "RSA";
public static final String SINGLE_KEY_ALGORYTHM = "AES";
public static final String TRANSMISSION_CIPHER_PARAMS = "RSA/ECB/PKCS1Padding";
private static final LoggingObjectInterface loggingObject = new SimpleLoggingObject( private static final LoggingObjectInterface loggingObject = new SimpleLoggingObject(
"Certificate Encryption Utility", LoggingObjectType.GENERAL, null ); "Certificate Encryption Utility", LoggingObjectType.GENERAL, null );
private static final LogChannel log = new LogChannel( loggingObject ); private static final LogChannel log = new LogChannel( loggingObject );


public static KeyPair generateKeyPair() { public static KeyPair generateKeyPair() {
KeyPair pair = null; KeyPair pair = null;
try { try {
KeyPairGenerator keyPairGen = KeyPairGenerator.getInstance( ALGORYTHM ); KeyPairGenerator keyPairGen = KeyPairGenerator.getInstance( PUBLIC_KEY_ALGORYTHM );
keyPairGen.initialize( KEY_SIZE ); keyPairGen.initialize( KEY_SIZE );
pair = keyPairGen.generateKeyPair(); pair = keyPairGen.generateKeyPair();
} catch ( Exception ex ) { } catch ( Exception ex ) {
Expand All @@ -53,10 +62,52 @@ public static KeyPair generateKeyPair() {
return pair; return pair;
} }


public static Key generateSingleKey() throws NoSuchAlgorithmException {
Key key = KeyGenerator.getInstance( SINGLE_KEY_ALGORYTHM ).generateKey();
return key;
}

public static byte[] encodeKeyForTransmission( Key encodingKey, Key keyToEncode ) throws Exception {
Cipher cipher = Cipher.getInstance( TRANSMISSION_CIPHER_PARAMS );
cipher.init( Cipher.WRAP_MODE, encodingKey );
byte[] encodedKey = cipher.wrap( keyToEncode );
return encodedKey;
}

public static Key decodeTransmittedKey( byte[] sessionKey, byte[] transmittedKey, boolean privateKey )
throws Exception {
KeySpec keySpec = null;
Key keyKey = null;
if ( transmittedKey == null || sessionKey == null ) {
return null;
}
if ( !privateKey ) {
keySpec = new X509EncodedKeySpec( sessionKey );
keyKey = KeyFactory.getInstance( PUBLIC_KEY_ALGORYTHM ).generatePublic( keySpec );
} else {
keySpec = new PKCS8EncodedKeySpec( sessionKey );
keyKey = KeyFactory.getInstance( PUBLIC_KEY_ALGORYTHM ).generatePrivate( keySpec );
}
Cipher keyCipher = Cipher.getInstance( TRANSMISSION_CIPHER_PARAMS );
keyCipher.init( Cipher.UNWRAP_MODE, keyKey );
return keyCipher.unwrap( transmittedKey, SINGLE_KEY_ALGORYTHM, Cipher.SECRET_KEY );
}

public static Cipher initDecryptionCipher( Key unwrappedKey, byte[] unencryptedKey ) throws Exception {
Cipher decryptionCip = Cipher.getInstance( SINGLE_KEY_ALGORYTHM );
if ( unwrappedKey != null ) {
decryptionCip.init( Cipher.ENCRYPT_MODE, unwrappedKey );
} else {
SecretKeySpec sks = new SecretKeySpec( unencryptedKey, SINGLE_KEY_ALGORYTHM );
decryptionCip.init( Cipher.ENCRYPT_MODE, sks );
}
return decryptionCip;
}

public static byte[] encryptUsingKey( byte[] data, Key key ) { public static byte[] encryptUsingKey( byte[] data, Key key ) {
byte[] result = null; byte[] result = null;
try { try {
Cipher cipher = Cipher.getInstance( ALGORYTHM ); Cipher cipher = Cipher.getInstance( PUBLIC_KEY_ALGORYTHM );
cipher.init( Cipher.ENCRYPT_MODE, key ); cipher.init( Cipher.ENCRYPT_MODE, key );
result = cipher.doFinal( data ); result = cipher.doFinal( data );
} catch ( Exception ex ) { } catch ( Exception ex ) {
Expand All @@ -68,7 +119,7 @@ public static byte[] encryptUsingKey( byte[] data, Key key ) {
public static byte[] decryptUsingKey( byte[] data, Key key ) { public static byte[] decryptUsingKey( byte[] data, Key key ) {
byte[] result = null; byte[] result = null;
try { try {
Cipher cipher = Cipher.getInstance( ALGORYTHM ); Cipher cipher = Cipher.getInstance( PUBLIC_KEY_ALGORYTHM );
cipher.init( Cipher.DECRYPT_MODE, key ); cipher.init( Cipher.DECRYPT_MODE, key );
result = cipher.doFinal( data ); result = cipher.doFinal( data );
} catch ( Exception ex ) { } catch ( Exception ex ) {
Expand Down
Expand Up @@ -26,6 +26,8 @@
import java.util.Arrays; import java.util.Arrays;


import org.junit.Test; import org.junit.Test;
import java.security.Key;

import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
Expand Down Expand Up @@ -98,4 +100,32 @@ public void testPrivatePrivate() {
fail(); fail();
} catch ( Exception ex ) { } } catch ( Exception ex ) { }
} }

@Test
public void testRandomSessionKeyGeneration() throws Exception {
Key key = CertificateGenEncryptUtil.generateSingleKey();
Key key1 = CertificateGenEncryptUtil.generateSingleKey();
assertFalse( key.equals( key1 ) );
assertFalse( Arrays.equals( key.getEncoded(), key1.getEncoded() ) );
}

@Test
public void testSessionKeyEncryptionDecryption() throws Exception {
Key key = CertificateGenEncryptUtil.generateSingleKey();
KeyPair kp = CertificateGenEncryptUtil.generateKeyPair();
Key privateKey = kp.getPrivate();
byte[] encryptedKey = CertificateGenEncryptUtil.encodeKeyForTransmission( privateKey, key );
Key key1 = CertificateGenEncryptUtil.decodeTransmittedKey( kp.getPublic().getEncoded(), encryptedKey, false );
assertTrue( key.equals( key1 ) );
}

@Test
public void testSessionKeyEncryptionDecryption2() throws Exception {
Key key = CertificateGenEncryptUtil.generateSingleKey();
KeyPair kp = CertificateGenEncryptUtil.generateKeyPair();
Key privateKey = kp.getPrivate();
byte[] encryptedKey = CertificateGenEncryptUtil.encodeKeyForTransmission( kp.getPublic(), key );
Key key1 = CertificateGenEncryptUtil.decodeTransmittedKey( privateKey.getEncoded(), encryptedKey, true );
assertTrue( key.equals( key1 ) );
}
} }
29 changes: 29 additions & 0 deletions engine/src/org/pentaho/di/trans/TransMeta.java
Expand Up @@ -282,6 +282,9 @@ public class TransMeta extends AbstractMeta implements XMLInterface, Comparator<
/** The log channel interface. */ /** The log channel interface. */
protected LogChannelInterface log; protected LogChannelInterface log;


protected byte[] key;
boolean privateKey;

/** /**
* The TransformationType enum describes the various types of transformations in terms of execution, including Normal, * The TransformationType enum describes the various types of transformations in terms of execution, including Normal,
* Serial Single-Threaded, and Single-Threaded. * Serial Single-Threaded, and Single-Threaded.
Expand Down Expand Up @@ -2423,6 +2426,13 @@ public String getXML( boolean includeSteps, boolean includeDatabase, boolean inc
retval retval
.append( " " ).append( XMLHandler.addTagValue( "modified_date", XMLHandler.date2string( modifiedDate ) ) ); .append( " " ).append( XMLHandler.addTagValue( "modified_date", XMLHandler.date2string( modifiedDate ) ) );


try {
retval.append( " " ).append( XMLHandler.addTagValue( "key", key ) );
} catch ( Exception ex ) {
log.logError( "Unable to decode key", ex );
}
retval.append( " " ).append( XMLHandler.addTagValue( "private", privateKey ) );

retval.append( " " ).append( XMLHandler.closeTag( XML_TAG_INFO ) ).append( Const.CR ); retval.append( " " ).append( XMLHandler.closeTag( XML_TAG_INFO ) ).append( Const.CR );


retval.append( " " ).append( XMLHandler.openTag( XML_TAG_NOTEPADS ) ).append( Const.CR ); retval.append( " " ).append( XMLHandler.openTag( XML_TAG_NOTEPADS ) ).append( Const.CR );
Expand Down Expand Up @@ -3289,6 +3299,9 @@ public void loadXML( Node transnode, String fname, IMetaStore metaStore, Reposit
// //
attributesMap = AttributesUtil.loadAttributes( XMLHandler.getSubNode( transnode, AttributesUtil.XML_TAG ) ); attributesMap = AttributesUtil.loadAttributes( XMLHandler.getSubNode( transnode, AttributesUtil.XML_TAG ) );


key = XMLHandler.stringToBinary( XMLHandler.getTagValue( infonode, "key" ) );
privateKey = "Y".equals( XMLHandler.getTagValue( infonode, "private" ) );

} catch ( KettleXMLException xe ) { } catch ( KettleXMLException xe ) {
throw new KettleXMLException( BaseMessages.getString( throw new KettleXMLException( BaseMessages.getString(
PKG, "TransMeta.Exception.ErrorReadingTransformation" ), xe ); PKG, "TransMeta.Exception.ErrorReadingTransformation" ), xe );
Expand Down Expand Up @@ -3318,6 +3331,22 @@ public void loadXML( Node transnode, String fname, IMetaStore metaStore, Reposit
} }
} }


public byte[] getKey() {
return key;
}

public void setKey( byte[] key ) {
this.key = key;
}

public boolean isPrivateKey() {
return privateKey;
}

public void setPrivateKey( boolean privateKey ) {
this.privateKey = privateKey;
}

/** /**
* Reads the shared objects (steps, connections, etc.). * Reads the shared objects (steps, connections, etc.).
* *
Expand Down
43 changes: 41 additions & 2 deletions engine/src/org/pentaho/di/trans/cluster/TransSplitter.java
Expand Up @@ -22,6 +22,10 @@


package org.pentaho.di.trans.cluster; package org.pentaho.di.trans.cluster;


import java.security.Key;
import java.security.KeyPair;
import java.security.PrivateKey;
import java.security.PublicKey;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
Expand All @@ -37,6 +41,7 @@
import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Const; import org.pentaho.di.core.Const;
import org.pentaho.di.core.NotePadMeta; import org.pentaho.di.core.NotePadMeta;
import org.pentaho.di.core.encryption.CertificateGenEncryptUtil;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.TransLogTable; import org.pentaho.di.core.logging.TransLogTable;
import org.pentaho.di.core.xml.XMLHandler; import org.pentaho.di.core.xml.XMLHandler;
Expand Down Expand Up @@ -285,7 +290,8 @@ public String createPortCacheKey( SlaveServer sourceSlave, String sourceStepName
* the slave server to reference * the slave server to reference
* @return * @return
*/ */
private TransMeta getSlaveTransformation( ClusterSchema clusterSchema, SlaveServer slaveServer ) throws KettleException { private TransMeta getSlaveTransformation( ClusterSchema clusterSchema,
SlaveServer slaveServer ) throws KettleException {
TransMeta slave = slaveTransMap.get( slaveServer ); TransMeta slave = slaveTransMap.get( slaveServer );
if ( slave == null ) { if ( slave == null ) {
slave = getOriginalCopy( true, clusterSchema, slaveServer ); slave = getOriginalCopy( true, clusterSchema, slaveServer );
Expand All @@ -294,7 +300,8 @@ private TransMeta getSlaveTransformation( ClusterSchema clusterSchema, SlaveServ
return slave; return slave;
} }


private TransMeta getOriginalCopy( boolean isSlaveTrans, ClusterSchema clusterSchema, SlaveServer slaveServer ) throws KettleException { private TransMeta getOriginalCopy( boolean isSlaveTrans, ClusterSchema clusterSchema,
SlaveServer slaveServer ) throws KettleException {
TransMeta transMeta = new TransMeta(); TransMeta transMeta = new TransMeta();
transMeta.setSlaveTransformation( true ); transMeta.setSlaveTransformation( true );


Expand Down Expand Up @@ -453,6 +460,18 @@ public void splitOriginalTransformation() throws KettleException {
List<SlaveServer> slaveServers = clusterSchema.getSlaveServers(); List<SlaveServer> slaveServers = clusterSchema.getSlaveServers();
int nrSlavesNodes = clusterSchema.findNrSlaves(); int nrSlavesNodes = clusterSchema.findNrSlaves();


boolean encrypt = false;
byte[] transformationKey = null;
PublicKey pubK = null;
if ( encrypt ) {
KeyPair pair = CertificateGenEncryptUtil.generateKeyPair();
pubK = pair.getPublic();
PrivateKey privK = pair.getPrivate();

Key key1 = CertificateGenEncryptUtil.generateSingleKey();
transformationKey = CertificateGenEncryptUtil.encodeKeyForTransmission( privK, key1 );
}

for ( int r = 0; r < referenceSteps.length; r++ ) { for ( int r = 0; r < referenceSteps.length; r++ ) {
StepMeta referenceStep = referenceSteps[r]; StepMeta referenceStep = referenceSteps[r];


Expand Down Expand Up @@ -570,6 +589,8 @@ public void splitOriginalTransformation() throws KettleException {
masterStepCopyNr, sourceSlaveServer.getName(), masterSlaveServer.getName(), masterStepCopyNr, sourceSlaveServer.getName(), masterSlaveServer.getName(),
socketsBufferSize, compressingSocketStreams, originalTransformation socketsBufferSize, compressingSocketStreams, originalTransformation
.getStepFields( previousStep ) ); .getStepFields( previousStep ) );
remoteMasterStep.setEncryptingStreams( encrypt );
remoteMasterStep.setKey( transformationKey );
masterStep.getRemoteInputSteps().add( remoteMasterStep ); masterStep.getRemoteInputSteps().add( remoteMasterStep );


RemoteStep remoteSlaveStep = RemoteStep remoteSlaveStep =
Expand All @@ -579,6 +600,8 @@ public void splitOriginalTransformation() throws KettleException {
masterStepCopyNr, sourceSlaveServer.getName(), masterSlaveServer.getName(), masterStepCopyNr, sourceSlaveServer.getName(), masterSlaveServer.getName(),
socketsBufferSize, compressingSocketStreams, originalTransformation socketsBufferSize, compressingSocketStreams, originalTransformation
.getStepFields( previousStep ) ); .getStepFields( previousStep ) );
remoteSlaveStep.setEncryptingStreams( encrypt );
remoteSlaveStep.setKey( transformationKey );
slaveStep.getRemoteOutputSteps().add( remoteSlaveStep ); slaveStep.getRemoteOutputSteps().add( remoteSlaveStep );


// OK, create a partition number for the target step in the partition distribution... // OK, create a partition number for the target step in the partition distribution...
Expand Down Expand Up @@ -712,6 +735,8 @@ public void splitOriginalTransformation() throws KettleException {
referenceStep.getName(), targetCopyNr, masterSlaveServer.getName(), targetSlaveServer referenceStep.getName(), targetCopyNr, masterSlaveServer.getName(), targetSlaveServer
.getName(), socketsBufferSize, compressingSocketStreams, originalTransformation .getName(), socketsBufferSize, compressingSocketStreams, originalTransformation
.getStepFields( previousStep ) ); .getStepFields( previousStep ) );
remoteMasterStep.setEncryptingStreams( encrypt );
remoteMasterStep.setKey( transformationKey );
sourceStep.getRemoteOutputSteps().add( remoteMasterStep ); sourceStep.getRemoteOutputSteps().add( remoteMasterStep );


RemoteStep remoteSlaveStep = RemoteStep remoteSlaveStep =
Expand All @@ -721,6 +746,8 @@ public void splitOriginalTransformation() throws KettleException {
referenceStep.getName(), targetCopyNr, masterSlaveServer.getName(), targetSlaveServer referenceStep.getName(), targetCopyNr, masterSlaveServer.getName(), targetSlaveServer
.getName(), socketsBufferSize, compressingSocketStreams, originalTransformation .getName(), socketsBufferSize, compressingSocketStreams, originalTransformation
.getStepFields( previousStep ) ); .getStepFields( previousStep ) );
remoteSlaveStep.setEncryptingStreams( encrypt );
remoteSlaveStep.setKey( transformationKey );
targetStep.getRemoteInputSteps().add( remoteSlaveStep ); targetStep.getRemoteInputSteps().add( remoteSlaveStep );


// OK, create a partition number for the target step in the partition distribution... // OK, create a partition number for the target step in the partition distribution...
Expand Down Expand Up @@ -922,6 +949,8 @@ public void splitOriginalTransformation() throws KettleException {
.getName(), targetCopyNr, targetSlaveServer.getName(), sourceSlaveServer .getName(), targetCopyNr, targetSlaveServer.getName(), sourceSlaveServer
.getName(), socketsBufferSize, compressingSocketStreams, .getName(), socketsBufferSize, compressingSocketStreams,
originalTransformation.getStepFields( previousStep ) ); originalTransformation.getStepFields( previousStep ) );
remoteOutputStep.setEncryptingStreams( encrypt );
remoteOutputStep.setKey( transformationKey );
sourceStep.getRemoteOutputSteps().add( remoteOutputStep ); sourceStep.getRemoteOutputSteps().add( remoteOutputStep );


// OK, so the source step is sending rows out on the reserved ports // OK, so the source step is sending rows out on the reserved ports
Expand All @@ -938,6 +967,8 @@ public void splitOriginalTransformation() throws KettleException {
.getName(), targetCopyNr, sourceSlaveServer.getName(), targetSlaveServer .getName(), targetCopyNr, sourceSlaveServer.getName(), targetSlaveServer
.getName(), socketsBufferSize, compressingSocketStreams, .getName(), socketsBufferSize, compressingSocketStreams,
originalTransformation.getStepFields( previousStep ) ); originalTransformation.getStepFields( previousStep ) );
remoteInputStep.setEncryptingStreams( encrypt );
remoteInputStep.setKey( transformationKey );
targetStep.getRemoteInputSteps().add( remoteInputStep ); targetStep.getRemoteInputSteps().add( remoteInputStep );
} }
// OK, save the partition number for the target step in the partition distribution... // OK, save the partition number for the target step in the partition distribution...
Expand Down Expand Up @@ -1333,9 +1364,17 @@ public void splitOriginalTransformation() throws KettleException {
// //
for ( TransMeta transMeta : slaveTransMap.values() ) { for ( TransMeta transMeta : slaveTransMap.values() ) {
transMeta.setSlaveStepCopyPartitionDistribution( slaveStepCopyPartitionDistribution ); transMeta.setSlaveStepCopyPartitionDistribution( slaveStepCopyPartitionDistribution );
if ( encrypt ) {
transMeta.setKey( pubK.getEncoded() );
transMeta.setPrivateKey( false );
}
transMeta.clearChanged(); transMeta.clearChanged();
} }
masterTransMeta.setSlaveStepCopyPartitionDistribution( slaveStepCopyPartitionDistribution ); masterTransMeta.setSlaveStepCopyPartitionDistribution( slaveStepCopyPartitionDistribution );
if ( encrypt ) {
masterTransMeta.setKey( pubK.getEncoded() );
masterTransMeta.setPrivateKey( !false );
}
masterTransMeta.clearChanged(); masterTransMeta.clearChanged();


// We're absolutely done here... // We're absolutely done here...
Expand Down

0 comments on commit dfe0dfb

Please sign in to comment.