Skip to content

Commit

Permalink
[PDI-16066] Transformation Executor Step does not Inherit Parameters (#…
Browse files Browse the repository at this point in the history
…4664)

-fixed a broken option "Inherit all variables from the transformation"
-extracted a common behavior for activation params in one methods for children of StepWithMappingMeta
-overwriting parameters and the next logic
When the child parameter does not exist in the parent parameters, keep it.
When the child parameter does exist in the parent parameters, overwrite the child parameter by the parent parameter.
All other parent parameters need to get copied into the child parameters  (when the 'Inherit all variables from the transformation?' option is checked)
  • Loading branch information
AliaksandrShuhayeu authored and Ben Morrise committed Dec 12, 2017
1 parent a87b51f commit 07d7d71
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 214 deletions.
84 changes: 81 additions & 3 deletions engine/src/main/java/org/pentaho/di/trans/StepWithMappingMeta.java
Expand Up @@ -22,10 +22,13 @@

package org.pentaho.di.trans;

import org.apache.commons.lang.ArrayUtils;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.ObjectLocationSpecificationMethod;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.LogChannel;
import org.pentaho.di.core.parameters.NamedParams;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.core.util.CurrentDirectoryResolver;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.core.variables.VariableSpace;
Expand All @@ -38,7 +41,12 @@
import org.pentaho.di.resource.ResourceNamingInterface;
import org.pentaho.di.trans.step.BaseStepMeta;
import org.pentaho.metastore.api.IMetaStore;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
* This class is supposed to use in steps where the mapping to sub transformations takes place
Expand Down Expand Up @@ -66,8 +74,10 @@ public static synchronized TransMeta loadMappingMeta( StepWithMappingMeta execut
TransMeta mappingTransMeta = null;

CurrentDirectoryResolver r = new CurrentDirectoryResolver();
// send parentVariables = null we don't need it here for resolving resolveCurrentDirectory.
// Otherwise we destroy child variables and the option "Inherit all variables from the transformation" is enabled always.
VariableSpace tmpSpace =
r.resolveCurrentDirectory( executorMeta.getSpecificationMethod(), space, rep, executorMeta.getParentStepMeta(),
r.resolveCurrentDirectory( executorMeta.getSpecificationMethod(), null, rep, executorMeta.getParentStepMeta(),
executorMeta.getFileName() );

switch ( executorMeta.getSpecificationMethod() ) {
Expand Down Expand Up @@ -158,10 +168,17 @@ public static synchronized TransMeta loadMappingMeta( StepWithMappingMeta execut
default:
break;
}
if ( mappingTransMeta == null ) { //skip warning
return null;
}

// Pass some important information to the mapping transformation metadata:
// When the child parameter does exist in the parent parameters, overwrite the child parameter by the
// parent parameter.
replaceVariableValues( mappingTransMeta, space );
if ( share ) {
mappingTransMeta.copyVariablesFrom( space );
// All other parent parameters need to get copied into the child parameters (when the 'Inherit all
// variables from the transformation?' option is checked)
addMissingVariables( mappingTransMeta, space );
}
mappingTransMeta.setRepository( rep );
mappingTransMeta.setMetaStore( metaStore );
Expand All @@ -170,6 +187,43 @@ public static synchronized TransMeta loadMappingMeta( StepWithMappingMeta execut
return mappingTransMeta;
}

public static void activateParams( VariableSpace childVariableSpace, NamedParams childNamedParams, VariableSpace parent, String[] listParameters,
String[] mappingVariables, String[] inputFields ) {
Map<String, String> parameters = new HashMap<>();
Set<String> subTransParameters = new HashSet<>( Arrays.asList( listParameters ) );

for ( int i = 0; i < mappingVariables.length; i++ ) {
parameters.put( mappingVariables[ i ], parent.environmentSubstitute( inputFields[ i ] ) );
}

for ( String variableName : parent.listVariables() ) {
// When the child parameter does exist in the parent parameters, overwrite the child parameter by the
// parent parameter.
if ( parameters.containsKey( variableName ) ) {
parameters.put( variableName, parent.getVariable( variableName ) );
} else if ( ArrayUtils.contains( listParameters, variableName ) ) {
// there is a definition only in Transformation properties - params tab
parameters.put( variableName, parent.getVariable( variableName ) );
}
}

for ( Map.Entry<String, String> entry : parameters.entrySet() ) {
String key = entry.getKey();
String value = Const.NVL( entry.getValue(), "" );
if ( subTransParameters.contains( key ) ) {
try {
childNamedParams.setParameterValue( key, Const.NVL( entry.getValue(), "" ) );
} catch ( UnknownParamException e ) {
// this is explicitly checked for up front
}
} else {
childVariableSpace.setVariable( key, value );
}
}

childNamedParams.activateParameters();
}

/**
* @return the specificationMethod
*/
Expand Down Expand Up @@ -288,4 +342,28 @@ public String exportResources( VariableSpace space, Map<String, ResourceDefiniti
throw new KettleException( BaseMessages.getString( PKG, "StepWithMappingMeta.Exception.UnableToLoadTrans", fileName ) );
}
}

private static void addMissingVariables( TransMeta source, VariableSpace target ) {
if ( target == null ) {
return;
}
String[] variableNames = target.listVariables();
for ( String variable : variableNames ) {
if ( source.getVariable( variable ) == null ) {
source.setVariable( variable, target.getVariable( variable ) );
}
}
}

private static void replaceVariableValues( TransMeta childTransMeta, VariableSpace replaceBy ) {
if ( replaceBy == null ) {
return;
}
String[] variableNames = replaceBy.listVariables();
for ( String variableName : variableNames ) {
if ( childTransMeta.getVariable( variableName ) != null ) {
childTransMeta.setVariable( variableName, replaceBy.getVariable( variableName ) );
}
}
}
}
Expand Up @@ -23,13 +23,7 @@
package org.pentaho.di.trans.steps.mapping;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.pentaho.di.core.Const;
Expand All @@ -41,6 +35,7 @@
import org.pentaho.di.core.logging.TransLogTable;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.SingleThreadedTransExecutor;
import org.pentaho.di.trans.StepWithMappingMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.TransMeta.TransformationType;
Expand Down Expand Up @@ -280,39 +275,6 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
}
}

public void setMappingParameters( Trans trans, TransMeta transMeta, MappingParameters mappingParameters )
throws KettleException {
if ( mappingParameters == null ) {
return;
}

Map<String, String> parameters = new HashMap<>();
Set<String> subTransParameters = new HashSet<>( Arrays.asList( transMeta.listParameters() ) );

if ( mappingParameters.isInheritingAllVariables() ) {
// This will include parameters
for ( String variableName : listVariables() ) {
parameters.put( variableName, getVariable( variableName ) );
}
}

String[] mappingVariables = mappingParameters.getVariable();
String[] inputFields = mappingParameters.getInputField();
for ( int i = 0; i < mappingVariables.length; i++ ) {
parameters.put( mappingVariables[i], environmentSubstitute( inputFields[i] ) );
}

for ( Entry<String, String> entry : parameters.entrySet() ) {
String key = entry.getKey();
String value = Const.NVL( entry.getValue(), "" );
if ( subTransParameters.contains( key ) ) {
trans.setParameterValue( key, Const.NVL( entry.getValue(), "" ) );
} else {
trans.setVariable( key, value );
}
}
trans.activateParameters();
}

public void prepareMappingExecution() throws KettleException {
initTransFromMeta();
Expand Down Expand Up @@ -567,7 +529,14 @@ void initTransFromMeta() throws KettleException {

// Set the parameters values in the mapping.
//
setMappingParameters( data.mappingTrans, data.mappingTransMeta, meta.getMappingParameters() );

MappingParameters mappingParameters = meta.getMappingParameters();
if ( mappingParameters != null ) {
StepWithMappingMeta
.activateParams( data.mappingTrans, data.mappingTrans, this, data.mappingTransMeta.listParameters(),
mappingParameters.getVariable(), mappingParameters.getInputField() );
}

}

void initServletConfig() {
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.RowProducer;
import org.pentaho.di.trans.StepWithMappingMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.TransMeta.TransformationType;
Expand All @@ -41,7 +42,6 @@
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.steps.TransStepUtil;
import org.pentaho.di.trans.steps.mapping.MappingParameters;
import org.pentaho.di.trans.steps.mapping.MappingValueRename;
import org.pentaho.di.trans.steps.mappinginput.MappingInput;
import org.pentaho.di.trans.steps.mappingoutput.MappingOutput;
Expand Down Expand Up @@ -134,41 +134,6 @@ public void putRow( RowMetaInterface rowMeta, Object[] rowData ) throws KettleSt
}
}

private void setMappingParameters() throws KettleException {
MappingParameters mappingParameters = meta.getMappingParameters();
if ( mappingParameters != null ) {

String[] parameters;
String[] parameterValues;

if ( mappingParameters.isInheritingAllVariables() ) {
// We pass the values for all the parameters from the parent transformation
//
parameters = getData().mappingTransMeta.listParameters();
parameterValues = new String[parameters.length];
for ( int i = 0; i < parameters.length; i++ ) {
parameterValues[i] = getVariable( parameters[i] );
}
} else {
// We pass down the listed variables with the specified values...
//
parameters = mappingParameters.getVariable();
parameterValues = new String[parameters.length];
for ( int i = 0; i < parameters.length; i++ ) {
parameterValues[i] = environmentSubstitute( mappingParameters.getInputField()[i] );
}
}

for ( int i = 0; i < parameters.length; i++ ) {
String value = Const.NVL( parameterValues[i], "" );

getData().mappingTrans.setParameterValue( parameters[i], value );
}

getData().mappingTrans.activateParameters();
}
}

public void prepareMappingExecution() throws KettleException {

SimpleMappingData simpleMappingData = getData();
Expand All @@ -177,8 +142,8 @@ public void prepareMappingExecution() throws KettleException {

// Set the parameters values in the mapping.
//
setMappingParameters();

StepWithMappingMeta.activateParams( simpleMappingData.mappingTrans, simpleMappingData.mappingTrans, this, simpleMappingData.mappingTransMeta.listParameters(),
meta.getMappingParameters().getVariable(), meta.getMappingParameters().getInputField() );
if ( simpleMappingData.mappingTransMeta.getTransformationType() != TransformationType.Normal ) {
simpleMappingData.mappingTrans.getTransMeta().setUsingThreadPriorityManagment( false );
}
Expand Down Expand Up @@ -280,7 +245,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
//
meta.setRepository( getTransMeta().getRepository() );
simpleMappingData.mappingTransMeta =
SimpleMappingMeta.loadMappingMeta( meta, meta.getRepository(), meta.getMetaStore(), this );
SimpleMappingMeta.loadMappingMeta( meta, meta.getRepository(), meta.getMetaStore(), this, meta.getMappingParameters().isInheritingAllVariables() );
if ( simpleMappingData.mappingTransMeta != null ) { // Do we have a mapping at all?

// OK, now prepare the execution of the mapping.
Expand Down
Expand Up @@ -23,7 +23,6 @@
package org.pentaho.di.trans.steps.simplemapping;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.pentaho.di.core.CheckResult;
Expand All @@ -35,7 +34,6 @@
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.variables.VariableSpace;
Expand Down Expand Up @@ -279,42 +277,22 @@ public void getFields( RowMetaInterface row, String origin, RowMetaInterface[] i
//
TransMeta mappingTransMeta = null;
try {
mappingTransMeta = loadMappingMeta( this, repository, metaStore, space );
mappingTransMeta =
loadMappingMeta( this, repository, metaStore, space, mappingParameters.isInheritingAllVariables() );
} catch ( KettleException e ) {
throw new KettleStepException( BaseMessages.getString(
PKG, "SimpleMappingMeta.Exception.UnableToLoadMappingTransformation" ), e );
}

// The field structure may depend on the input parameters as well (think of parameter replacements in MDX queries
// for instance)
if ( mappingParameters != null ) {

// See if we need to pass all variables from the parent or not...
//
if ( mappingParameters.isInheritingAllVariables() ) {
mappingTransMeta.copyVariablesFrom( space );
}
if ( mappingParameters != null && mappingTransMeta != null ) {

// Just set the variables in the transformation statically.
// This just means: set a number of variables or parameter values:
//
List<String> subParams = Arrays.asList( mappingTransMeta.listParameters() );

for ( int i = 0; i < mappingParameters.getVariable().length; i++ ) {
String name = mappingParameters.getVariable()[i];
String value = space.environmentSubstitute( mappingParameters.getInputField()[i] );
if ( !Utils.isEmpty( name ) && !Utils.isEmpty( value ) ) {
if ( subParams.contains( name ) ) {
try {
mappingTransMeta.setParameterValue( name, value );
} catch ( UnknownParamException e ) {
// this is explicitly checked for up front
}
}
mappingTransMeta.setVariable( name, value );

}
}
StepWithMappingMeta.activateParams( mappingTransMeta, mappingTransMeta, space, mappingTransMeta.listParameters(),
mappingParameters.getVariable(), mappingParameters.getInputField() );
}

// Keep track of all the fields that need renaming...
Expand Down

0 comments on commit 07d7d71

Please sign in to comment.