Skip to content

Commit

Permalink
[BACKLOG-21184] Add Safe Stop functionality to the Abort Step (#5035)
Browse files Browse the repository at this point in the history
  • Loading branch information
DFieldFL authored and Kurtis Walker committed Mar 13, 2018
1 parent 0330c51 commit 98dfe95
Show file tree
Hide file tree
Showing 19 changed files with 523 additions and 151 deletions.
25 changes: 24 additions & 1 deletion core/src/main/java/org/pentaho/di/core/Result.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -128,6 +128,11 @@ public class Result implements Cloneable {
/** The log text. */ /** The log text. */
private String logText; private String logText;


/**
* safe stop.
*/
private boolean safeStop;

/** /**
* Instantiates a new Result object, setting default values for all members * Instantiates a new Result object, setting default values for all members
*/ */
Expand Down Expand Up @@ -858,4 +863,22 @@ public String getLogText() {
public void setLogText( String logText ) { public void setLogText( String logText ) {
this.logText = logText; this.logText = logText;
} }

/**
* Sets flag for safe stopping a transformation
*
* @return the safe stop flag
*/
public boolean isSafeStop() {
return safeStop;
}

/**
* Returns the flag for safe stopping a transformation
*
* @param safeStop the safe stop flag
*/
public void setSafeStop( boolean safeStop ) {
this.safeStop = safeStop;
}
} }
7 changes: 6 additions & 1 deletion engine/src/main/java/org/pentaho/di/trans/Trans.java
Expand Up @@ -1885,7 +1885,8 @@ public void safeStop() {
if ( steps == null ) { if ( steps == null ) {
return; return;
} }
steps.stream().filter( combi -> combi.step.getInputRowSets().isEmpty() ) steps.stream()
.filter( combi -> combi.step.getInputRowSets().isEmpty() )
.forEach( combi -> stopStep( combi, true ) ); .forEach( combi -> stopStep( combi, true ) );


notifyStoppedListeners(); notifyStoppedListeners();
Expand Down Expand Up @@ -2610,6 +2611,10 @@ public Result getResult() {
result.setNrErrors( result.getNrErrors() + sid.step.getErrors() ); result.setNrErrors( result.getNrErrors() + sid.step.getErrors() );
result.getResultFiles().putAll( step.getResultFiles() ); result.getResultFiles().putAll( step.getResultFiles() );


if ( step.isSafeStopped() ) {
result.setSafeStop( step.isSafeStopped() );
}

if ( step.getStepname().equals( transLogTable.getSubjectString( TransLogTable.ID.LINES_READ ) ) ) { if ( step.getStepname().equals( transLogTable.getSubjectString( TransLogTable.ID.LINES_READ ) ) ) {
result.setNrLinesRead( result.getNrLinesRead() + step.getLinesRead() ); result.setNrLinesRead( result.getNrLinesRead() + step.getLinesRead() );
} }
Expand Down
6 changes: 6 additions & 0 deletions engine/src/main/java/org/pentaho/di/trans/step/BaseStep.java
Expand Up @@ -2961,6 +2961,12 @@ public void setStopped( boolean stopped ) {
public void setSafeStopped( boolean stopped ) { public void setSafeStopped( boolean stopped ) {
this.safeStopped.set( stopped ); this.safeStopped.set( stopped );
} }

@Override
public boolean isSafeStopped() {
return safeStopped.get();
}

/* /*
* (non-Javadoc) * (non-Javadoc)
* *
Expand Down
12 changes: 10 additions & 2 deletions engine/src/main/java/org/pentaho/di/trans/step/StepInterface.java
Expand Up @@ -150,7 +150,15 @@ public interface StepInterface extends VariableSpace, HasLogChannelInterface {
* @param stopped * @param stopped
* true if the step needs to be safe stopped * true if the step needs to be safe stopped
*/ */
public void setSafeStopped( boolean stopped ); default void setSafeStopped( boolean stopped ) {
}

/**
* @return true if step is safe stopped.
*/
default boolean isSafeStopped() {
return false;
}


/** /**
* @return True if the step is paused * @return True if the step is paused
Expand Down Expand Up @@ -397,7 +405,7 @@ public interface StepInterface extends VariableSpace, HasLogChannelInterface {
public void setPartitioned( boolean partitioned ); public void setPartitioned( boolean partitioned );


/** /**
* @param partitioningMethodNone * @param partitioningMethod
* The repartitioning method * The repartitioning method
*/ */
public void setRepartitioning( int partitioningMethod ); public void setRepartitioning( int partitioningMethod );
Expand Down
Expand Up @@ -115,7 +115,13 @@ remarks, getTransMeta(), stepMeta.getParentStepMeta(),


source.open(); source.open();


bufferStream().forEach( result -> putRows( result.getRows() ) ); bufferStream().forEach( result -> {
if ( result.isSafeStop() ) {
getTrans().safeStop();
}

putRows( result.getRows() );
} );
super.setOutputDone(); super.setOutputDone();
source.close(); source.close();
return false; return false;
Expand Down
22 changes: 22 additions & 0 deletions engine/src/test/java/org/pentaho/di/trans/TransTest.java
Expand Up @@ -23,7 +23,9 @@
package org.pentaho.di.trans; package org.pentaho.di.trans;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.doThrow;
Expand All @@ -50,6 +52,7 @@
import org.mockito.Mockito; import org.mockito.Mockito;
import org.pentaho.di.core.KettleEnvironment; import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.ProgressMonitorListener; import org.pentaho.di.core.ProgressMonitorListener;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.database.Database; import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
Expand All @@ -61,6 +64,7 @@
import org.pentaho.di.junit.rules.RestorePDIEngineEnvironment; import org.pentaho.di.junit.rules.RestorePDIEngineEnvironment;
import org.pentaho.di.repository.Repository; import org.pentaho.di.repository.Repository;
import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.RepositoryDirectoryInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMetaDataCombi; import org.pentaho.di.trans.step.StepMetaDataCombi;


public class TransTest { public class TransTest {
Expand Down Expand Up @@ -262,6 +266,24 @@ public void testFinishStatus() throws Exception {
assertEquals( Trans.STRING_FINISHED, trans.getStatus() ); assertEquals( Trans.STRING_FINISHED, trans.getStatus() );
} }


@Test
public void testSafeStop() {
StepInterface step = mock( StepInterface.class );
when( step.isSafeStopped() ).thenReturn( false );
when( step.getStepname() ).thenReturn( "stepName" );

StepMetaDataCombi stepMetaDataCombi = new StepMetaDataCombi();
stepMetaDataCombi.step = step;

trans.setSteps( Collections.singletonList( stepMetaDataCombi ) );
Result result = trans.getResult();
assertFalse( result.isSafeStop() );

when( step.isSafeStopped() ).thenReturn( true );
result = trans.getResult();
assertTrue( result.isSafeStop() );
}

private void startThreads( Runnable one, Runnable two, CountDownLatch start ) throws InterruptedException { private void startThreads( Runnable one, Runnable two, CountDownLatch start ) throws InterruptedException {
Thread th = new Thread( one ); Thread th = new Thread( one );
Thread tt = new Thread( two ); Thread tt = new Thread( two );
Expand Down
@@ -0,0 +1,115 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.trans.streaming.common;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.logging.LogChannelInterfaceFactory;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.pentaho.di.trans.streaming.api.StreamSource;
import org.pentaho.di.trans.streaming.api.StreamWindow;

import java.util.Arrays;
import java.util.Collections;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith( MockitoJUnitRunner.class )
public class BaseStreamStepTest {
StepMeta stepMeta;
TransMeta transMeta;
Trans trans;
BaseStreamStep baseStreamStep;

@Mock BaseStreamStepMeta meta;
@Mock StepDataInterface stepData;
@Mock StreamSource streamSource;
@Mock StreamWindow streamWindow;
@Mock LogChannelInterfaceFactory logChannelFactory;
@Mock LogChannelInterface logChannel;


@Before
public void setUp() throws KettleException {
KettleLogStore.setLogChannelInterfaceFactory( logChannelFactory );
when( logChannelFactory.create( any(), any() ) ).thenReturn( logChannel );

stepMeta = new StepMeta( "BaseStreamStep", meta );

transMeta = new TransMeta();
transMeta.addStep( stepMeta );
trans = new Trans( transMeta );

baseStreamStep = new BaseStreamStep( stepMeta, stepData, 1, transMeta, trans );
baseStreamStep.source = streamSource;
baseStreamStep.window = streamWindow;

StepMetaDataCombi stepMetaDataCombi = new StepMetaDataCombi();
stepMetaDataCombi.step = baseStreamStep;
stepMetaDataCombi.data = stepData;
stepMetaDataCombi.stepMeta = stepMeta;
stepMetaDataCombi.meta = meta;

trans.prepareExecution( new String[ 0 ] );
trans.getSteps().add( stepMetaDataCombi );
}

@Test
public void testStop() throws KettleException {
Result result = new Result();
result.setSafeStop( false );
result.setRows( Collections.emptyList() );
when( streamWindow.buffer( any() ) ).thenReturn( Arrays.asList( result ) );

baseStreamStep.processRow( meta, stepData );
assertFalse( baseStreamStep.isSafeStopped() );
verify( streamSource ).close();
}

@Test
public void testSafeStop() throws KettleException {
Result result = new Result();
result.setSafeStop( true );
when( streamWindow.buffer( any() ) ).thenReturn( Arrays.asList( result ) );

baseStreamStep.processRow( meta, stepData );
assertTrue( baseStreamStep.isSafeStopped() );
verify( streamSource, times( 2 ) ).close();
}
}
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -92,10 +92,15 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
} else { } else {
logError( message ); logError( message );
} }
if ( meta.isAbortWithError() ) { if ( meta.isSafeStop() ) {
setErrors( 1 ); getTrans().safeStop();
} else {
if ( meta.isAbortWithError() ) {
setErrors( 1 );
}

stopAll();
} }
stopAll();
} else { } else {
// seen a row but not yet reached the threshold // seen a row but not yet reached the threshold
if ( meta.isAlwaysLogRows() ) { if ( meta.isAlwaysLogRows() ) {
Expand Down

0 comments on commit 98dfe95

Please sign in to comment.