Skip to content

Commit

Permalink
[PDI-8927] Trans.activeSubtransformations not threadsafe
Browse files Browse the repository at this point in the history
- Changing HM to CHM for synchronization
- Separating add/remove/get methods, encapsulating map-implementation
- Test written
- Concurrent tests from kettle's core project are moved to engine, as they are not running on CI
- Checkstyle applied
  • Loading branch information
Nikolaichuk Ivan committed Jul 12, 2016
1 parent 1629906 commit f43d152
Show file tree
Hide file tree
Showing 16 changed files with 193 additions and 26 deletions.
22 changes: 19 additions & 3 deletions engine/src/org/pentaho/di/trans/Trans.java
Expand Up @@ -430,7 +430,7 @@ public Trans() {
stepPerformanceSnapshotSeqNr = new AtomicInteger( 0 );
lastWrittenStepPerformanceSequenceNr = 0;

activeSubtransformations = new HashMap<String, Trans>();
activeSubtransformations = new ConcurrentHashMap<>();
activeSubjobs = new HashMap<String, Job>();

resultRows = new ArrayList<RowMetaAndData>();
Expand Down Expand Up @@ -5136,14 +5136,30 @@ public List<LoggingHierarchy> getLoggingHierarchy() {
}

/**
* Gets the active sub-transformations.
* Use:
* {@link #addActiveSubTransformation(String, Trans),
* {@link #getActiveSubTransformation(String)},
* {@link #removeActiveSubTransformation(String)}
*
* @return a map (by name) of the active sub-transformations
* instead
*/
@Deprecated
public Map<String, Trans> getActiveSubtransformations() {
return activeSubtransformations;
}

public void addActiveSubTransformation( final String subTransName, Trans subTrans ) {
activeSubtransformations.put( subTransName, subTrans );
}

public Trans removeActiveSubTransformation( final String subTransName ) {
return activeSubtransformations.remove( subTransName );
}

public Trans getActiveSubTransformation( final String subTransName ) {
return activeSubtransformations.get( subTransName );
}

/**
* Gets the active sub-jobs.
*
Expand Down
6 changes: 3 additions & 3 deletions engine/src/org/pentaho/di/trans/steps/mapping/Mapping.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -496,7 +496,7 @@ public void prepareMappingExecution() throws KettleException {
// Finally, add the mapping transformation to the active sub-transformations
// map in the parent transformation
//
getTrans().getActiveSubtransformations().put( getStepname(), getData().getMappingTrans() );
getTrans().addActiveSubTransformation( getStepname(), getData().getMappingTrans() );
}

@VisibleForTesting StepInterface[] pickupTargetStepsFor( MappingIODefinition outputDefinition )
Expand Down Expand Up @@ -623,7 +623,7 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

// Remove it from the list of active sub-transformations...
//
getTrans().getActiveSubtransformations().remove( getStepname() );
getTrans().removeActiveSubTransformation( getStepname() );

// See if there was an error in the sub-transformation, in that case, flag error etc.
if ( getData().getMappingTrans().getErrors() > 0 ) {
Expand Down
Expand Up @@ -149,7 +149,7 @@ public void transStopped( Trans parentTrans ) {
// Finally, add the mapping transformation to the active sub-transformations
// map in the parent transformation
//
getTrans().getActiveSubtransformations().put( getStepname(), injectTrans );
getTrans().addActiveSubTransformation( getStepname(), injectTrans );

if ( !Const.isEmpty( meta.getSourceStepName() ) ) {
StepInterface stepInterface = injectTrans.getStepInterface( meta.getSourceStepName(), 0 );
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -254,7 +254,7 @@ public void prepareMappingExecution() throws KettleException {
// Finally, add the mapping transformation to the active sub-transformations
// map in the parent transformation
//
getTrans().getActiveSubtransformations().put( getStepname(), simpleMappingData.mappingTrans );
getTrans().addActiveSubTransformation( getStepname(), simpleMappingData.mappingTrans );
}

void initServletConfig() {
Expand Down Expand Up @@ -313,7 +313,7 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

// Remove it from the list of active sub-transformations...
//
getTrans().getActiveSubtransformations().remove( getStepname() );
getTrans().removeActiveSubTransformation( getStepname() );
}

// See if there was an error in the sub-transformation, in that case, flag error etc.
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -266,7 +266,7 @@ public void rowWrittenEvent( RowMetaInterface rowMeta, Object[] row ) throws Ket

// Add the mapping transformation to the active sub-transformations map in the parent transformation
//
getTrans().getActiveSubtransformations().put( getStepname(), singleThreaderData.mappingTrans );
getTrans().addActiveSubTransformation( getStepname(), singleThreaderData.mappingTrans );
}

void initServletConfig() {
Expand Down
Expand Up @@ -196,7 +196,7 @@ private void executeTransformation() throws KettleException {
passParametersToTrans();

// keep track for drill down in Spoon...
getTrans().getActiveSubtransformations().put( getStepname(), executorTrans );
getTrans().addActiveSubTransformation( getStepname(), executorTrans );

Result result = new Result();
result.setRows( transExecutorData.groupBuffer );
Expand Down
@@ -0,0 +1,151 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.concurrency;

import org.junit.Test;
import org.pentaho.di.trans.Trans;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* In this test we add new elements to shared transformation concurrently
* and get added elements from this transformation concurrently.
*
* When working with {@link java.util.HashMap} with default loadFactor this test will fail
* when HashMap will try to rearrange it's elements (it will happen when number of elements in map will be equal to
* capacity/loadFactor).
*
* Map will be in inconsistent state, because in the same time, when rearrange happens other threads will be adding
* new elements to map.
* This will lead to unpredictable result of executing {@link java.util.HashMap#size()} method (as a result there
* would be an error in {@link Getter#call()} ).
*
*/
public class ActiveSubTransformationsConcurrencyTest {
private static final int NUMBER_OF_GETTERS = 10;
private static final int NUMBER_OF_CREATES = 10;
private static final int NUMBER_OF_CREATE_CYCLES = 20;
private static final int INITIAL_NUMBER_OF_TRANS = 100;


private static final String TRANS_NAME = "transformation";
private final Object lock = new Object();

@Test
public void getAndCreateConcurrently() throws Exception {
AtomicBoolean condition = new AtomicBoolean( true );
Trans trans = new Trans();
createSubTransformations( trans );

List<Getter> getters = generateGetters( trans, condition );
List<Creator> creators = generateCreators( trans, condition );

ConcurrencyTestRunner.runAndCheckNoExceptionRaised( creators, getters, condition );
}

private void createSubTransformations( Trans trans ) {
for ( int i = 0; i < INITIAL_NUMBER_OF_TRANS; i++ ) {
trans.addActiveSubTransformation( createTransName( i ), new Trans() );
}
}

private List<Getter> generateGetters( Trans trans, AtomicBoolean condition ) {
List<Getter> getters = new ArrayList<>();
for ( int i = 0; i < NUMBER_OF_GETTERS; i++ ) {
getters.add( new Getter( trans, condition ) );
}

return getters;
}

private List<Creator> generateCreators( Trans trans, AtomicBoolean condition ) {
List<Creator> creators = new ArrayList<Creator>();
for ( int i = 0; i < NUMBER_OF_CREATES; i++ ) {
creators.add( new Creator( trans, condition ) );
}

return creators;
}


private class Getter extends StopOnErrorCallable<Object> {
private final Trans trans;
private final Random random;

Getter( Trans trans, AtomicBoolean condition ) {
super( condition );
this.trans = trans;
random = new Random();
}

@Override
Object doCall() throws Exception {
while ( condition.get() ) {
final String activeSubTransName = createTransName( random.nextInt( INITIAL_NUMBER_OF_TRANS ) );
Trans subTrans = trans.getActiveSubTransformation( activeSubTransName );

if ( subTrans == null ) {
throw new IllegalStateException(
String.format(
"Returned transformation must not be null. Transformation name = %s",
activeSubTransName ) );
}
}

return null;
}
}

private class Creator extends StopOnErrorCallable<Object> {
private final Trans trans;
private final Random random;

Creator( Trans trans, AtomicBoolean condition ) {
super( condition );
this.trans = trans;
random = new Random();
}

@Override
Object doCall() throws Exception {
for ( int i = 0; i < NUMBER_OF_CREATE_CYCLES; i++ ) {
synchronized ( lock ) {
String transName = createTransName( randomInt( INITIAL_NUMBER_OF_TRANS, Integer.MAX_VALUE ) );
trans.addActiveSubTransformation( transName, new Trans() );
}
}
return null;
}

private int randomInt( int min, int max ) {
return random.nextInt( max - min ) + min;
}
}

private String createTransName( int id ) {
return TRANS_NAME + " - " + id;
}
}
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -47,7 +47,7 @@
* <p/>
* There is a special condition flag, shared among all actors. Each of them must stop when it has found out the flag has
* been cleared. Also, in most cases it makes sense to clear the flag after any exception has raised (see {@linkplain
* StopOnErrorCallable}, because any actor can face with it in concurrent environment.
* StopOnErrorCallable}, because any actor can face with it in concurrency environment.
* <p/>
* The runner stores results of all actors, though in most cases this information is needless - what is important that
* is the fact the execution has completed with no errors.
Expand Down Expand Up @@ -223,5 +223,3 @@ Map<Callable<? extends M>, ExecutionResult<M>> getMonitoredResults() {
return Collections.unmodifiableMap( monitoredResults );
}
}


Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2014 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -139,7 +139,9 @@ public void testStepShouldProcessError_WhenMappingTransHasError() throws KettleE
smp.dispose( stepMockHelper.processRowsStepMetaInterface, simpleMpData );
verify( stepMockHelper.trans, times( 1 ) ).isFinished();
verify( stepMockHelper.trans, never() ).waitUntilFinished();
verify( stepMockHelper.trans, never() ).getActiveSubtransformations();
verify( stepMockHelper.trans, never() ).addActiveSubTransformation( anyString(), any( Trans.class ) );
verify( stepMockHelper.trans, never() ).removeActiveSubTransformation( anyString() );
verify( stepMockHelper.trans, never() ).getActiveSubTransformation( anyString() );
verify( stepMockHelper.trans, times( 1 ) ).getErrors();
assertTrue( "The step contains the errors", smp.getErrors() == errorCount );

Expand Down
4 changes: 2 additions & 2 deletions ui/src/org/pentaho/di/ui/spoon/trans/TransGraph.java
Expand Up @@ -4326,7 +4326,7 @@ public void openMapping( StepMeta stepMeta, int index ) {
*/
private void attachActiveTrans( TransGraph transGraph, StepMeta stepMeta ) {
if ( trans != null && transGraph != null ) {
Trans subTransformation = trans.getActiveSubtransformations().get( stepMeta.getName() );
Trans subTransformation = trans.getActiveSubTransformation( stepMeta.getName() );
transGraph.setTrans( subTransformation );
if ( !transGraph.isExecutionResultsPaneVisible() ) {
transGraph.showExecutionResults();
Expand All @@ -4343,7 +4343,7 @@ private void attachActiveTrans( TransGraph transGraph, StepMeta stepMeta ) {
*/
private Trans getActiveSubtransformation( TransGraph transGraph, StepMeta stepMeta ) {
if ( trans != null && transGraph != null ) {
return trans.getActiveSubtransformations().get( stepMeta.getName() );
return trans.getActiveSubTransformation( stepMeta.getName() );
}
return null;
}
Expand Down

0 comments on commit f43d152

Please sign in to comment.