Skip to content

Commit

Permalink
[BACKLOG-7997] - Can not execute partitioned clustered transformation…
Browse files Browse the repository at this point in the history
… - fix NPE at clean-up and prepareExecution

[BACKLOG-7997] - Can not execute partitioned clustered transformation - test
  • Loading branch information
AndreyBurikhin committed May 4, 2016
1 parent 962a58c commit 6d5a6ba
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 10 deletions.
8 changes: 5 additions & 3 deletions engine/src/org/pentaho/di/trans/Trans.java
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -1462,8 +1462,10 @@ protected void fireTransFinishedListeners() throws KettleException {
badGuys.add( e );
}
}
// Signal for the the waitUntilFinished blocker...
transFinishedBlockingQueue.add( new Object() );
if ( transFinishedBlockingQueue != null ) {
// Signal for the the waitUntilFinished blocker...
transFinishedBlockingQueue.add( new Object() );
}
if ( !badGuys.isEmpty() ) {
// FIFO
throw new KettleException( badGuys.get( 0 ) );
Expand Down
17 changes: 13 additions & 4 deletions engine/src/org/pentaho/di/trans/step/BaseStep.java
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -781,10 +781,19 @@ public void cleanup() {
}
}

for ( RemoteStep remoteStep : getRemoteInputSteps() ) {
remoteStep.cleanup();
List<RemoteStep> remoteInputSteps = getRemoteInputSteps();
if ( remoteInputSteps != null ) {
cleanupRemoteSteps( remoteInputSteps );
}

List<RemoteStep> remoteOutputSteps = getRemoteOutputSteps();
if ( remoteOutputSteps != null ) {
cleanupRemoteSteps( remoteOutputSteps );
}
for ( RemoteStep remoteStep : getRemoteOutputSteps() ) {
}

static void cleanupRemoteSteps( List<RemoteStep> remoteSteps ) {
for ( RemoteStep remoteStep : remoteSteps ) {
remoteStep.cleanup();
}
}
Expand Down
30 changes: 28 additions & 2 deletions engine/test-src/org/pentaho/di/trans/TransTest.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -26,14 +26,19 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -285,6 +290,27 @@ public void testRecordsCleanUpMethodIsCalled() throws Exception {
verify( mockedDataBase ).cleanupLogRecords( stepLogTable );
}

@Test
public void testFireTransFinishedListeners() throws Exception {
Trans trans = new Trans();
TransListener mockListener = mock( TransListener.class );
trans.setTransListeners( Collections.singletonList( mockListener ) );

trans.fireTransFinishedListeners();

verify( mockListener ).transFinished( trans );
}

@Test( expected = KettleException.class )
public void testFireTransFinishedListenersExceprionOnTransFinished() throws Exception {
Trans trans = new Trans();
TransListener mockListener = mock( TransListener.class );
doThrow( KettleException.class ).when( mockListener ).transFinished( trans );
trans.setTransListeners( Collections.singletonList( mockListener ) );

trans.fireTransFinishedListeners();
}

private void startThreads( Runnable one, Runnable two, CountDownLatch start ) throws InterruptedException {
Thread th = new Thread( one );
Thread tt = new Thread( two );
Expand Down
55 changes: 54 additions & 1 deletion engine/test-src/org/pentaho/di/trans/step/BaseStepTest.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -26,9 +26,12 @@
import static org.mockito.Mockito.*;
import static org.junit.Assert.*;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -57,6 +60,7 @@
import org.pentaho.di.core.row.value.ValueMetaString;
import org.pentaho.di.trans.BasePartitioner;
import org.pentaho.di.trans.steps.mock.StepMockHelper;
import org.pentaho.di.www.SocketRepository;

public class BaseStepTest {
private StepMockHelper<StepMetaInterface, StepDataInterface> mockHelper;
Expand Down Expand Up @@ -312,4 +316,53 @@ public void testBuildLog() throws KettleValueException {
assertEquals( ValueMetaInterface.TYPE_DATE, result.getValueMeta( 8 ).getType() );
assertEquals( endDate, result.getDate( 8, Calendar.getInstance().getTime() ) );
}

@Test
public void testCleanupRemoteSteps() {
RemoteStep remoteStepMock = mock( RemoteStep.class );
BaseStep.cleanupRemoteSteps( Collections.singletonList( remoteStepMock ) );
verify( remoteStepMock ).cleanup();
}

@Test
public void testCleanup() throws IOException {
when( mockHelper.logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn(
mockHelper.logChannelInterface );
BaseStep baseStep =
new BaseStep( mockHelper.stepMeta, mockHelper.stepDataInterface, 0, mockHelper.transMeta, mockHelper.trans );
ServerSocket serverSocketMock = mock( ServerSocket.class );
doReturn( 0 ).when( serverSocketMock ).getLocalPort();
baseStep.setServerSockets( Collections.singletonList( serverSocketMock ) );
SocketRepository socketRepositoryMock = mock( SocketRepository.class );
baseStep.setSocketRepository( socketRepositoryMock );

baseStep.cleanup();

verify( socketRepositoryMock ).releaseSocket( 0 );
}

@Test
public void testCleanupWithInexistentRemoteSteps() throws IOException {
when( mockHelper.logChannelInterfaceFactory.create( any(), any( LoggingObjectInterface.class ) ) ).thenReturn(
mockHelper.logChannelInterface );
BaseStep baseStep =
spy( new BaseStep( mockHelper.stepMeta, mockHelper.stepDataInterface, 0, mockHelper.transMeta,
mockHelper.trans ) );
ServerSocket serverSocketMock = mock( ServerSocket.class );
doReturn( 0 ).when( serverSocketMock ).getLocalPort();
baseStep.setServerSockets( Collections.singletonList( serverSocketMock ) );
SocketRepository socketRepositoryMock = mock( SocketRepository.class );
baseStep.setSocketRepository( socketRepositoryMock );
RemoteStep inputStep = mock( RemoteStep.class );
doReturn( Collections.singletonList( inputStep ) ).when( baseStep ).getRemoteInputSteps();
RemoteStep outputStep = mock( RemoteStep.class );
doReturn( Collections.singletonList( outputStep ) ).when( baseStep ).getRemoteOutputSteps();

baseStep.cleanup();

verify( inputStep ).cleanup();
verify( outputStep ).cleanup();
verify( socketRepositoryMock ).releaseSocket( 0 );
}

}

0 comments on commit 6d5a6ba

Please sign in to comment.