Skip to content

Commit

Permalink
[PDI-13545] - Records are lost when you copy from a mapping to a step…
Browse files Browse the repository at this point in the history
… with multiple copies
  • Loading branch information
Andrey Khayrutdinov committed Mar 11, 2015
1 parent 1b6421e commit 1cbd593
Show file tree
Hide file tree
Showing 6 changed files with 806 additions and 27 deletions.
64 changes: 37 additions & 27 deletions engine/src/org/pentaho/di/trans/steps/mapping/Mapping.java
Expand Up @@ -31,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.RowSet;
Expand Down Expand Up @@ -476,34 +477,9 @@ public void prepareMappingExecution() throws KettleException {
mappingOutputSource = mappingOutputSteps[0];
}

// To what step in this transformation are we writing to?
// To what steps in this transformation are we writing to?
//
StepInterface[] targetSteps;
if ( !Const.isEmpty( outputDefinition.getOutputStepname() ) ) {
// If we have a target step specification for the output of the mapping,
// we need to send it over there...
//
StepInterface target = getTrans().findRunThread( outputDefinition.getOutputStepname() );
if ( target == null ) {
throw new KettleException( BaseMessages.getString( PKG, "MappingDialog.Exception.StepNameNotFound",
outputDefinition.getOutputStepname() ) );
}
targetSteps = new StepInterface[] { target, };
} else {
// No target step is specified.
// See if we can find the next steps in the transformation..
//
List<StepMeta> nextSteps = getTransMeta().findNextSteps( getStepMeta() );

// Let's send the data to all the next steps we find...
// The origin is the mapping output step
// The target is all the next steps after this mapping step.
//
targetSteps = new StepInterface[nextSteps.size()];
for ( int s = 0; s < targetSteps.length; s++ ) {
targetSteps[s] = getTrans().findRunThread( nextSteps.get( s ).getName() );
}
}
StepInterface[] targetSteps = pickupTargetStepsFor( outputDefinition );

// Now tell the mapping output step where to look...
// Also explain the mapping output steps how to rename the values back...
Expand All @@ -523,6 +499,40 @@ public void prepareMappingExecution() throws KettleException {
getTrans().getActiveSubtransformations().put( getStepname(), getData().getMappingTrans() );
}

@VisibleForTesting StepInterface[] pickupTargetStepsFor( MappingIODefinition outputDefinition )
throws KettleException {
List<StepInterface> result;
if ( !Const.isEmpty( outputDefinition.getOutputStepname() ) ) {
// If we have a target step specification for the output of the mapping,
// we need to send it over there...
//
result = getTrans().findStepInterfaces( outputDefinition.getOutputStepname() );
if ( Const.isEmpty( result ) ) {
throw new KettleException( BaseMessages.getString( PKG, "MappingDialog.Exception.StepNameNotFound",
outputDefinition.getOutputStepname() ) );
}
} else {
// No target step is specified.
// See if we can find the next steps in the transformation..
//
List<StepMeta> nextSteps = getTransMeta().findNextSteps( getStepMeta() );

// Let's send the data to all the next steps we find...
// The origin is the mapping output step
// The target is all the next steps after this mapping step.
//
result = new ArrayList<StepInterface>();
for ( StepMeta nextStep : nextSteps ) {
// need to take into the account different copies of the step
List<StepInterface> copies = getTrans().findStepInterfaces( nextStep.getName() );
if ( copies != null ) {
result.addAll( copies );
}
}
}
return result.toArray( new StepInterface[ result.size() ] );
}

void initTransFromMeta() throws KettleException {
// Create the transformation from meta-data...
//
Expand Down
@@ -0,0 +1,96 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.trans.steps.mapping;

import org.junit.Before;
import org.junit.Test;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.StepMockUtil;
import org.pentaho.di.trans.steps.mock.StepMockHelper;

import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.matchers.JUnitMatchers.hasItems;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* @author Andrey Khayrutdinov
*/
public class MappingUnitTest {

private StepMockHelper<MappingMeta, StepDataInterface> mockHelper;
private Mapping mapping;

@Before
public void setUp() throws Exception {
mockHelper = StepMockUtil.getStepMockHelper( MappingMeta.class, "MappingUnitTest" );
mapping =
new Mapping( mockHelper.stepMeta, mockHelper.stepDataInterface, 0, mockHelper.transMeta, mockHelper.trans );
}


@SuppressWarnings( "unchecked" )
@Test
public void pickupTargetStepsFor_OutputIsNotDefined() throws Exception {
StepMeta singleMeta = new StepMeta( "single", null );
StepMeta copiedMeta = new StepMeta( "copied", null );
when( mockHelper.transMeta.findNextSteps( mockHelper.stepMeta ) ).thenReturn( asList( singleMeta, copiedMeta ) );

StepInterface single = mock( StepInterface.class );
when( mockHelper.trans.findStepInterfaces( "single" ) ).thenReturn( singletonList( single ) );

StepInterface copy1 = mock( StepInterface.class );
StepInterface copy2 = mock( StepInterface.class );
when( mockHelper.trans.findStepInterfaces( "copied" ) ).thenReturn( asList( copy1, copy2 ) );

MappingIODefinition definition = new MappingIODefinition( null, null );
StepInterface[] targetSteps = mapping.pickupTargetStepsFor( definition );

assertThat( asList( targetSteps ), hasItems( is( single ), is( copy1 ), is( copy2 ) ) );
}

@SuppressWarnings( "unchecked" )
@Test
public void pickupTargetStepsFor_OutputIsDefined() throws Exception {
StepInterface copy1 = mock( StepInterface.class );
StepInterface copy2 = mock( StepInterface.class );
when( mockHelper.trans.findStepInterfaces( "copied" ) ).thenReturn( asList( copy1, copy2 ) );

MappingIODefinition definition = new MappingIODefinition( null, "copied" );
StepInterface[] targetSteps = mapping.pickupTargetStepsFor( definition );

assertThat( asList( targetSteps ), hasItems( is( copy1 ), is( copy2 ) ) );
}

@Test(expected = KettleException.class)
public void pickupTargetStepsFor_OutputIsDefined_ThrowsExceptionIfFindsNone() throws Exception {
MappingIODefinition definition = new MappingIODefinition( null, "non-existing" );
mapping.pickupTargetStepsFor( definition );
}
}
51 changes: 51 additions & 0 deletions test/org/pentaho/di/trans/steps/mapping/MappingTest.java
Expand Up @@ -250,4 +250,55 @@ public void testMapping_WhenSharingPreviousStepWithAnother() throws Exception {

assertEquals( 0, trans.getErrors() );
}


/**
* This test case relates to PDI-13545. It executes a transformation with a Mapping step that is not configured
* manually with an <tt>outputStepname</tt> property and, therefore, it is set to be a mapping output step from the
* internal transformation.
*
* @throws Exception
*/
public void testMapping_WhenNextStepHasTwoCopies_AndOutputIsNotDefinedExplicitly() throws Exception {
runTransWhenMappingsIsFollowedByCopiedStep(
"testfiles/org/pentaho/di/trans/steps/mapping/pdi-13545/PDI-13545-1.ktr" );
}

/**
* This test case relates to PDI-13545. It executes a transformation with a Mapping step that is configured manually
* with an <tt>outputStepname</tt> property.
*
* @throws Exception
*/
public void testMapping_WhenNextStepHasTwoCopies_AndOutputIsDefinedExplicitly() throws Exception {
runTransWhenMappingsIsFollowedByCopiedStep(
"testfiles/org/pentaho/di/trans/steps/mapping/pdi-13545/PDI-13545-2.ktr" );
}

/**
* This method runs transformations related to PDI-13545.<br/> The scenario is the following: there are two step
* generating data, the latter of which is a Mapping step. They are followed with a Join Rows step, that has two
* copies. The last in a row is a Dummy step, named "Last". Since both generating steps output 3 rows ([10, 20, 30]
* and [1, 2, 3] respectively), the last step must obtain 3*3=9 rows.
*
* @param transPath a path to transformation file
* @throws Exception
*/
private void runTransWhenMappingsIsFollowedByCopiedStep( String transPath ) throws Exception {
KettleEnvironment.init();

TransMeta transMeta = new TransMeta( transPath );
transMeta.setTransformationType( TransMeta.TransformationType.Normal );

Trans trans = new Trans( transMeta );
trans.prepareExecution( null );
trans.startThreads();
trans.waitUntilFinished();

assertEquals( 0, trans.getErrors() );

List<StepInterface> list = trans.findBaseSteps( "Last" );
assertEquals( 1, list.size() );
assertEquals( 9, list.get( 0 ).getLinesRead() );
}
}

0 comments on commit 1cbd593

Please sign in to comment.