Skip to content

Commit

Permalink
[BACKLOG-15685] As a Chris, I want to be able to disable steps when r…
Browse files Browse the repository at this point in the history
…unning on Spark

Remove steps following inactive input.
  • Loading branch information
pavel-sakun committed Apr 28, 2017
1 parent 2f4a54a commit 75d600d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
Expand Up @@ -116,7 +116,7 @@ private static Operation getOp( org.pentaho.di.engine.model.Transformation trans
private static TransMeta cleanupDisabledHops( TransMeta transMeta ) { private static TransMeta cleanupDisabledHops( TransMeta transMeta ) {
TransMeta copyTransMeta = (TransMeta) transMeta.clone(); TransMeta copyTransMeta = (TransMeta) transMeta.clone();


removeUnusedInputs( copyTransMeta ); removeDisabledInputs( copyTransMeta );


removeInactivePaths( copyTransMeta, null ); removeInactivePaths( copyTransMeta, null );


Expand Down Expand Up @@ -163,14 +163,17 @@ private static void removeInactivePaths( TransMeta trans, List<StepMeta> steps )
* Removes input steps having only disabled output hops so they will not be executed. * Removes input steps having only disabled output hops so they will not be executed.
* @param transMeta transMeta to process * @param transMeta transMeta to process
*/ */
private static void removeUnusedInputs( TransMeta transMeta ) { private static void removeDisabledInputs( TransMeta transMeta ) {
List<StepMeta> unusedInputs = findHops( transMeta, hop -> !hop.isEnabled() ).stream() List<StepMeta> unusedInputs = findHops( transMeta, hop -> !hop.isEnabled() ).stream()
.map( hop -> hop.getFromStep() ) .map( hop -> hop.getFromStep() )
.filter( step -> isUnusedInput( transMeta, step ) ) .filter( step -> isUnusedInput( transMeta, step ) )
.collect( Collectors.toList() ); .collect( Collectors.toList() );
for ( StepMeta unusedInput : unusedInputs ) { for ( StepMeta unusedInput : unusedInputs ) {
transMeta.findAllTransHopFrom( unusedInput ).forEach( transMeta::removeTransHop ); List<TransHopMeta> outHops = transMeta.findAllTransHopFrom( unusedInput );
List<StepMeta> subsequentSteps = outHops.stream().map( hop -> hop.getToStep() ).collect( Collectors.toList() );
outHops.forEach( transMeta::removeTransHop );
transMeta.getSteps().remove( unusedInput ); transMeta.getSteps().remove( unusedInput );
removeInactivePaths( transMeta, subsequentSteps );
} }
} }


Expand Down
Expand Up @@ -171,25 +171,29 @@ public void testRemovingDisabledInputSteps() {
trans.addStep( inputToBeRemoved ); trans.addStep( inputToBeRemoved );
StepMeta inputToStay = new StepMeta( "InputToStay", stepMetaInterface ); StepMeta inputToStay = new StepMeta( "InputToStay", stepMetaInterface );
trans.addStep( inputToStay ); trans.addStep( inputToStay );
StepMeta inputReceiver = new StepMeta( "InputReceiver", stepMetaInterface ); StepMeta inputReceiver1 = new StepMeta( "InputReceiver1", stepMetaInterface );
trans.addStep( inputReceiver ); trans.addStep( inputReceiver1 );

StepMeta inputReceiver2 = new StepMeta( "InputReceiver2", stepMetaInterface );
TransHopMeta hop1 = new TransHopMeta( inputToBeRemoved, inputReceiver, false ); trans.addStep( inputReceiver2 );
TransHopMeta hop2 = new TransHopMeta( inputToStay, inputReceiver );
TransHopMeta hop1 = new TransHopMeta( inputToBeRemoved, inputReceiver1, false );
TransHopMeta hop2 = new TransHopMeta( inputToStay, inputReceiver1 );
TransHopMeta hop3 = new TransHopMeta( inputToBeRemoved, inputReceiver2, false );
trans.addTransHop( hop1 ); trans.addTransHop( hop1 );
trans.addTransHop( hop2 ); trans.addTransHop( hop2 );
trans.addTransHop( hop3 );


Transformation transformation = TransMetaConverter.convert( trans ); Transformation transformation = TransMetaConverter.convert( trans );


List<String> List<String>
steps = steps =
transformation.getOperations().stream().map( op -> op.getId() ).collect( Collectors.toList() ); transformation.getOperations().stream().map( op -> op.getId() ).collect( Collectors.toList() );
assertThat( "Only 2 ops should exist", steps.size(), is( 2 ) ); assertThat( "Only 2 ops should exist", steps.size(), is( 2 ) );
assertThat( steps, hasItems( "InputToStay", "InputReceiver" ) ); assertThat( steps, hasItems( "InputToStay", "InputReceiver1" ) );


List<String> hops = transformation.getHops().stream().map( hop -> hop.getId() ).collect( Collectors.toList() ); List<String> hops = transformation.getHops().stream().map( hop -> hop.getId() ).collect( Collectors.toList() );
assertThat( "Only 1 hop should exist", hops.size(), is( 1 ) ); assertThat( "Only 1 hop should exist", hops.size(), is( 1 ) );
assertThat( hops, hasItems( "InputToStay -> InputReceiver" ) ); assertThat( hops, hasItems( "InputToStay -> InputReceiver1" ) );


} }


Expand Down

0 comments on commit 75d600d

Please sign in to comment.