Skip to content

Commit

Permalink
[PDI-14913]- Corrected and fixed logic of exposing Kettle data over a…
Browse files Browse the repository at this point in the history
… web service using "passing output to servlet" checkbox. Affected steps : Json Output, Text File Output and XML Output.
  • Loading branch information
ArtsiomYurhevich committed May 19, 2017
1 parent d139dca commit 6fd565d
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 8 deletions.
12 changes: 11 additions & 1 deletion engine/src/org/pentaho/di/trans/step/StepMetaInterface.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -718,4 +718,14 @@ public String exportResources( VariableSpace space, Map<String, ResourceDefiniti
public default boolean cleanAfterHopFromRemove() {
return false;
}

/**
* True if the step passes it's result data straight to the servlet output. See exposing Kettle data over a web service
* <a href="http://wiki.pentaho.com/display/EAI/PDI+data+over+web+services">http://wiki.pentaho.com/display/EAI/PDI+data+over+web+services</a>
*
* @return True if the step passes it's result data straight to the servlet output, false otherwise
*/
default boolean passDataToServletOutput() {
return false;
}
}
Expand Up @@ -1313,4 +1313,12 @@ protected void saveSourceRep( Repository rep, ObjectId id_transformation, Object
throws KettleException {
rep.saveStepAttribute( id_transformation, id_step, "file_name", fileName );
}

/**
* {@inheritDoc}
*/
@Override
public boolean passDataToServletOutput() {
return servletOutput;
}
}
28 changes: 23 additions & 5 deletions engine/src/org/pentaho/di/www/RunTransServlet.java
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -31,6 +31,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import com.google.common.annotations.VisibleForTesting;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.util.Utils;
import org.pentaho.di.core.exception.KettleException;
Expand Down Expand Up @@ -247,15 +248,12 @@ public void doGet( HttpServletRequest request, HttpServletResponse response ) th
String message = "Transformation '" + trans.getName() + "' was added to the list with id " + carteObjectId;
logBasic( message );

//
try {
// Execute the transformation...
//
trans.execute( null );

WebResult webResult = new WebResult( WebResult.STRING_OK, "Transformation started", carteObjectId );
out.println( webResult.getXML() );
out.flush();
finishProcessing( trans, out );

} catch ( Exception executionException ) {
String logging = KettleLogStore.getAppender().getBuffer( trans.getLogChannelId(), false ).toString();
Expand Down Expand Up @@ -299,6 +297,26 @@ private TransMeta loadTrans( Repository repository, String transformationName )
}
}


/**
If the transformation has at least one step in a transformation,
which writes it's data straight to a servlet output
we should wait transformation's termination.
Otherwise the servlet's response lifecycle may come to an end and
the response will be closed by container while
the transformation will be still trying writing data into it.
*/
@VisibleForTesting
void finishProcessing( Trans trans, PrintWriter out ) {
if ( trans.getSteps().stream().anyMatch( step -> step.meta.passDataToServletOutput() ) ) {
trans.waitUntilFinished();
} else {
WebResult webResult = new WebResult( WebResult.STRING_OK, "Transformation started", trans.getContainerObjectId() );
out.println( webResult.getXML() );
out.flush();
}
}

public String toString() {
return "Run Transformation";
}
Expand Down
114 changes: 114 additions & 0 deletions engine/test-src/org/pentaho/di/www/RunTransServletTest.java
@@ -0,0 +1,114 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2017-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/
package org.pentaho.di.www;


import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.runners.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMetaDataCombi;
import org.pentaho.di.trans.step.StepMetaInterface;

import java.io.ByteArrayOutputStream;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.mock;


@RunWith( MockitoJUnitRunner.class )
public class RunTransServletTest {

@Mock
TransMeta transMeta;
@Mock
Trans trans;

List<StepMetaDataCombi> stepList;
ByteArrayOutputStream outData;
PrintWriter out;
String lineSeparator = System.getProperty( "line.separator" );

String transId = "testId";
String expectedOutputIfNoServletOutputSteps = "<webresult>" + lineSeparator
+ " <result>OK</result>" + lineSeparator
+ " <message>Transformation started</message>" + lineSeparator
+ " <id>testId</id>" + lineSeparator
+ "</webresult>" + lineSeparator + lineSeparator;


RunTransServlet runTransServlet;

@Before
public void setup() throws Exception {
runTransServlet = new RunTransServlet();
outData = new ByteArrayOutputStream();
out = new PrintWriter( outData );
stepList = new ArrayList<>();
for ( int i = 0; i < 5; i++ ) {
StepMetaDataCombi stepMetaDataCombi = new StepMetaDataCombi();
StepMetaInterface stepMeta = mock( StepMetaInterface.class );
when( stepMeta.passDataToServletOutput() ).thenReturn( false );
stepMetaDataCombi.meta = stepMeta;
stepList.add( stepMetaDataCombi );
}
when( trans.getSteps() ).thenReturn( stepList );
when( trans.getContainerObjectId() ).thenReturn( transId );
}

@Test
public void testFinishProcessingTransWithoutServletOutputSteps() throws Exception {
runTransServlet.finishProcessing( trans, out );
assertEquals( expectedOutputIfNoServletOutputSteps, outData.toString() );
}

@Test
public void testFinishProcessingTransWithServletOutputSteps() throws Exception {
StepMetaDataCombi stepMetaDataCombi = new StepMetaDataCombi();
StepMetaInterface stepMeta = mock( StepMetaInterface.class );
when( stepMeta.passDataToServletOutput() ).thenReturn( true );
stepMetaDataCombi.meta = stepMeta;
stepList.add( stepMetaDataCombi );
doAnswer( new Answer<Void>() {
@Override
public Void answer( InvocationOnMock invocation ) throws Throwable {
Thread.currentThread().sleep( 2000 );
return null;
}
} ).when( trans ).waitUntilFinished();
runTransServlet.finishProcessing( trans, out );
assertTrue( outData.toString().isEmpty() );
}


}
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -725,4 +725,12 @@ public void setCompatibilityMode( boolean compatibilityMode ) {
public StepMetaInjectionInterface getStepMetaInjectionInterface() {
return new JsonOutputMetaInjection( this );
}

/**
* {@inheritDoc}
*/
@Override
public boolean passDataToServletOutput() {
return servletOutput;
}
}
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2017 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -856,4 +856,11 @@ public String exportResources( VariableSpace space, Map<String, ResourceDefiniti
}
}

/**
* {@inheritDoc}
*/
@Override
public boolean passDataToServletOutput() {
return servletOutput;
}
}

0 comments on commit 6fd565d

Please sign in to comment.