Skip to content

Commit

Permalink
[BACKLOG-18540] SSL support for the websocket daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
Francisco Luís Brinó Câmara committed Aug 29, 2017
1 parent bf95a0a commit 7b6f417
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 9 deletions.
Expand Up @@ -72,6 +72,7 @@ public void execute( RunConfiguration runConfiguration, TransExecutionConfigurat

variableSpace.setVariable( "engine", null );
variableSpace.setVariable( "engine.remote", null );
variableSpace.setVariable( "engine.protocol", null );
variableSpace.setVariable( "engine.host", null );
variableSpace.setVariable( "engine.port", null );
}
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.pentaho.di.trans.TransExecutionConfiguration;

import java.io.IOException;
import java.net.URI;
import java.util.Dictionary;

/**
Expand All @@ -49,6 +50,7 @@ public class SparkRunConfigurationExecutor implements RunConfigurationExecutor {
public static String CONFIG_KEY = "org.apache.aries.rsa.discovery.zookeeper";
public static String JAAS_CAPABILITY_ID = "pentaho-kerberos-jaas";
public static String AEL_SECURITY_CAPABILITY_ID = "ael-security";
public static String DEFAULT_PROTOCOL = "ws";
public static String DEFAULT_HOST = "127.0.0.1";
public static String DEFAULT_PORT = "2181";

Expand Down Expand Up @@ -91,17 +93,20 @@ public SparkRunConfigurationExecutor( ConfigurationAdmin configurationAdmin ) {
if ( capabilityManager.getCapabilityById( PENTAHO_SERVER_CAPABILITY_ID ) == null ) {
SparkRunConfiguration sparkRunConfiguration = (SparkRunConfiguration) runConfiguration;

String[] parts = Const.NVL( sparkRunConfiguration.getUrl(), "" ).split( ":" );
String host = parts[ 0 ];
String port = parts.length > 1 ? parts[ 1 ] : DEFAULT_PORT;
String runConfigURL = Const.NVL( sparkRunConfiguration.getUrl(), "" );
URI uri = URI.create( runConfigURL.trim() );
String protocol = uri.getScheme();
String host = uri.getHost();
String port = uri.getPort() == -1 ? null : String.valueOf( uri.getPort() );
boolean version2 = false;

//default for now is AEL Engine RSA
String version = variableSpace.getVariable( "KETTLE_AEL_PDI_DAEMON_VERSION", "1.0" );
if ( Const.toDouble( version, 1 ) >= 2 ) {
// Variables for Websocket spark engine version
variableSpace.setVariable( "engine.protocol", Const.NVL( protocol, DEFAULT_PROTOCOL ) );
variableSpace.setVariable( "engine.host", Const.NVL( host, DEFAULT_HOST ) );
variableSpace.setVariable( "engine.port", Const.NVL( port, DEFAULT_HOST ) );
variableSpace.setVariable( "engine.port", Const.NVL( port, DEFAULT_PORT ) );
version2 = true;
}

Expand All @@ -114,9 +119,9 @@ public SparkRunConfigurationExecutor( ConfigurationAdmin configurationAdmin ) {
properties.put( "zookeeper.host", Const.NVL( host, DEFAULT_HOST ) );
properties.put( "zookeeper.port", Const.NVL( port, DEFAULT_PORT ) );
//just remove version2 variables values
variableSpace.setVariable( "engine.protocol", null );
variableSpace.setVariable( "engine.host", null );
variableSpace.setVariable( "engine.port", null );

} else {
properties.remove( "zookeeper.host" );
properties.remove( "zookeeper.port" );
Expand Down
Expand Up @@ -83,7 +83,7 @@ public void setup() throws Exception {
public void testExecute() {
SparkRunConfiguration sparkRunConfiguration = new SparkRunConfiguration();
sparkRunConfiguration.setName( "Spark Configuration" );
sparkRunConfiguration.setUrl( "127.0.0.2:8121" );
sparkRunConfiguration.setUrl( "ws://127.0.0.2:8121" );

TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();

Expand Down Expand Up @@ -120,7 +120,7 @@ public void testExecuteNoPort() {
public void testWebSocketVersionExecute() {
SparkRunConfiguration sparkRunConfiguration = new SparkRunConfiguration();
sparkRunConfiguration.setName( "Spark Configuration" );
sparkRunConfiguration.setUrl( "127.0.0.2:8121" );
sparkRunConfiguration.setUrl( "http://127.0.0.2:8121" );
doReturn( "2.0" ).when( variableSpace ).getVariable( "KETTLE_AEL_PDI_DAEMON_VERSION", "1.0" );


Expand Down Expand Up @@ -152,10 +152,47 @@ public void testWebSocketVersionExecuteNoPort() {
verify( variableSpace ).setVariable( "engine.remote", "spark" );
verify( properties ).remove( "zookeeper.host" );
verify( properties ).remove( "zookeeper.port" );
verify( variableSpace ).setVariable( "engine.protocol", SparkRunConfigurationExecutor.DEFAULT_PROTOCOL );
verify( variableSpace ).setVariable( "engine.host", SparkRunConfigurationExecutor.DEFAULT_HOST );
verify( variableSpace ).setVariable( "engine.port", SparkRunConfigurationExecutor.DEFAULT_PORT );
}

@Test
public void testWssWebSocketVersionExecute() {
SparkRunConfiguration sparkRunConfiguration = new SparkRunConfiguration();
sparkRunConfiguration.setName( "Spark Configuration" );
sparkRunConfiguration.setUrl( "wss://127.0.0.2:8121" );
doReturn( "2.0" ).when( variableSpace ).getVariable( "KETTLE_AEL_PDI_DAEMON_VERSION", "1.0" );


TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();

sparkRunConfigurationExecutor
.execute( sparkRunConfiguration, transExecutionConfiguration, abstractMeta, variableSpace );

verify( variableSpace ).setVariable( "engine.protocol", "wss" );
verify( variableSpace ).setVariable( "engine.host", "127.0.0.2" );
verify( variableSpace ).setVariable( "engine.port", "8121" );
}

@Test
public void testUrlWssWebSocketVersionExecute() {
SparkRunConfiguration sparkRunConfiguration = new SparkRunConfiguration();
sparkRunConfiguration.setName( "Spark Configuration" );
sparkRunConfiguration.setUrl( " ws://127.0.0.2:8121 " );
doReturn( "2.0" ).when( variableSpace ).getVariable( "KETTLE_AEL_PDI_DAEMON_VERSION", "1.0" );


TransExecutionConfiguration transExecutionConfiguration = new TransExecutionConfiguration();

sparkRunConfigurationExecutor
.execute( sparkRunConfiguration, transExecutionConfiguration, abstractMeta, variableSpace );

verify( variableSpace ).setVariable( "engine.protocol", "ws" );
verify( variableSpace ).setVariable( "engine.host", "127.0.0.2" );
verify( variableSpace ).setVariable( "engine.port", "8121" );
}

@Test
public void testExecuteWithAelSecurityInstalled() {
ICapability aelSecurityCapability = mock( ICapability.class );
Expand Down
Expand Up @@ -5024,10 +5024,11 @@ private Trans createTrans() throws KettleException {
//default for now is AEL Engine RSA
String version = variables.getVariable( "KETTLE_AEL_PDI_DAEMON_VERSION", "1.0" );
if ( Const.toDouble( version, 1 ) >= 2 ) {
String protocol = transMeta.getVariable( "engine.protocol" );
String host = transMeta.getVariable( "engine.host" );
String port = transMeta.getVariable( "engine.port" );
//TODO: we have
boolean ssl = false;
//default value for ssl for now false
boolean ssl = "https".equalsIgnoreCase( protocol ) || "wss".equalsIgnoreCase( protocol );
return new TransWebSocketEngineAdapter( transMeta, host, port, ssl );
} else {
return PluginRegistry.getInstance().getPlugins( EnginePluginType.class ).stream()
Expand Down

0 comments on commit 7b6f417

Please sign in to comment.