Skip to content

Commit

Permalink
[PDI-17019] PDI 8.0 server run configuration settings are ignored
Browse files Browse the repository at this point in the history
- The new fields for storing parameters have been added into AbstractMeta and JobMeta classes
- The updating of executionConfiguration have been added into SpoonJobDelegate and SpoonTransformationDelegate classes
- The schedule request generating has been fixed
  • Loading branch information
vasilii-komarov committed Feb 14, 2018
1 parent 0be5af8 commit 4177e68
Show file tree
Hide file tree
Showing 8 changed files with 395 additions and 63 deletions.
29 changes: 28 additions & 1 deletion engine/src/main/java/org/pentaho/di/base/AbstractMeta.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2017 by Hitachi Vantara : http://www.pentaho.com * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -198,6 +198,8 @@ public abstract class AbstractMeta implements ChangedFlagInterface, UndoInterfac


protected int undo_position; protected int undo_position;


protected RunOptions runOptions = new RunOptions();

private boolean showDialog = true; private boolean showDialog = true;
private boolean alwaysShowRunOptions = true; private boolean alwaysShowRunOptions = true;


Expand Down Expand Up @@ -2090,4 +2092,29 @@ public Boolean getVersioningEnabled() {
return this.versioningEnabled; return this.versioningEnabled;
} }


private class RunOptions {
boolean clearingLog;
boolean safeModeEnabled;

RunOptions() {
clearingLog = true;
safeModeEnabled = false;
}
}

public boolean isClearingLog() {
return runOptions.clearingLog;
}

public void setClearingLog( boolean clearingLog ) {
this.runOptions.clearingLog = clearingLog;
}

public boolean isSafeModeEnabled() {
return runOptions.safeModeEnabled;
}

public void setSafeModeEnabled( boolean safeModeEnabled ) {
this.runOptions.safeModeEnabled = safeModeEnabled;
}
} }
20 changes: 20 additions & 0 deletions engine/src/main/java/org/pentaho/di/job/JobMeta.java
Expand Up @@ -147,6 +147,10 @@ public class JobMeta extends AbstractMeta


protected List<LogTableInterface> extraLogTables; protected List<LogTableInterface> extraLogTables;


protected String startCopyName;

protected boolean expandingRemoteJob;

/** The log channel interface. */ /** The log channel interface. */
protected LogChannelInterface log; protected LogChannelInterface log;


Expand Down Expand Up @@ -2821,4 +2825,20 @@ public NamedClusterEmbedManager getNamedClusterEmbedManager( ) {
} }
return namedClusterEmbedManager; return namedClusterEmbedManager;
} }

public String getStartCopyName() {
return startCopyName;
}

public void setStartCopyName( String startCopyName ) {
this.startCopyName = startCopyName;
}

public boolean isExpandingRemoteJob() {
return expandingRemoteJob;
}

public void setExpandingRemoteJob( boolean expandingRemoteJob ) {
this.expandingRemoteJob = expandingRemoteJob;
}
} }
@@ -1,26 +1,24 @@
/* /*! ******************************************************************************
* *****************************************************************************
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2017 by Hitachi Vantara : http://www.pentaho.com * Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
* *
* ******************************************************************************* *******************************************************************************
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* *
* Unless required by applicable law or agreed to in writing, software * http://www.apache.org/licenses/LICENSE-2.0
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* *
* ***************************************************************************** * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* *
*/ ******************************************************************************/


package org.pentaho.di.engine.configuration.impl.pentaho.scheduler; package org.pentaho.di.engine.configuration.impl.pentaho.scheduler;


Expand All @@ -31,6 +29,7 @@
import org.pentaho.di.base.AbstractMeta; import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.parameters.UnknownParamException; import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.Repository; import org.pentaho.di.repository.Repository;


import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
Expand Down Expand Up @@ -135,14 +134,69 @@ StringEntity buildSchedulerRequestEntity( AbstractMeta meta )
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append( "<jobScheduleRequest>\n" ); sb.append( "<jobScheduleRequest>\n" );
sb.append( "<inputFile>" ).append( filename ).append( "</inputFile>\n" ); sb.append( "<inputFile>" ).append( filename ).append( "</inputFile>\n" );
sb.append( "<pdiParameters>\n" );
for ( String param : meta.listParameters() ) { // Set the log level
sb.append( "<entry>\n" ); if ( meta.getLogLevel() != null ) {
sb.append( "<key>" ).append( param ).append( "</key>\n" ); sb.append( "<jobParameters>\n" );
sb.append( "<value>" ).append( meta.getParameterValue( param ) ).append( "</value>\n" ); sb.append( "<name>" ).append( "logLevel" ).append( "</name>\n" );
sb.append( "</entry>\n" ); sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( meta.getLogLevel().getCode() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );
}

// Set the clearing log param
sb.append( "<jobParameters>\n" );
sb.append( "<name>" ).append( "clearLog" ).append( "</name>\n" );
sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( meta.isClearingLog() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );

// Set the safe mode enabled param
sb.append( "<jobParameters>\n" );
sb.append( "<name>" ).append( "runSafeMode" ).append( "</name>\n" );
sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( meta.isSafeModeEnabled() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );

// Set the gathering metrics param
sb.append( "<jobParameters>\n" );
sb.append( "<name>" ).append( "gatheringMetrics" ).append( "</name>\n" );
sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( meta.isGatheringMetrics() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );

if ( meta instanceof JobMeta ) {
JobMeta jobMeta = (JobMeta) meta;

if ( jobMeta.getStartCopyName() != null ) {
// Set the start step name
sb.append( "<jobParameters>\n" );
sb.append( "<name>" ).append( "startCopyName" ).append( "</name>\n" );
sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( jobMeta.getStartCopyName() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );
}

// Set the expanding remote job param
sb.append( "<jobParameters>\n" );
sb.append( "<name>" ).append( "expandingRemoteJob" ).append( "</name>\n" );
sb.append( "<type>" ).append( "string" ).append( "</type>\n" );
sb.append( "<stringValue>" ).append( jobMeta.isExpandingRemoteJob() ).append( "</stringValue>\n" );
sb.append( "</jobParameters>\n" );
} }
sb.append( "</pdiParameters>\n" );
// Set the PDI parameters
if ( meta.listParameters() != null ) {
sb.append( "<pdiParameters>\n" );
for ( String param : meta.listParameters() ) {
sb.append( "<entry>\n" );
sb.append( "<key>" ).append( param ).append( "</key>\n" );
sb.append( "<value>" ).append( meta.getParameterValue( param ) ).append( "</value>\n" );
sb.append( "</entry>\n" );
}
sb.append( "</pdiParameters>\n" );
}

sb.append( "</jobScheduleRequest>" ); sb.append( "</jobScheduleRequest>" );


return new StringEntity( sb.toString() ); return new StringEntity( sb.toString() );
Expand Down
@@ -1,22 +1,104 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2018 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.engine.configuration.impl.pentaho.scheduler; package org.pentaho.di.engine.configuration.impl.pentaho.scheduler;


import org.apache.http.entity.StringEntity;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.pentaho.di.base.AbstractMeta; import org.pentaho.di.base.AbstractMeta;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.parameters.UnknownParamException; import org.pentaho.di.core.parameters.UnknownParamException;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.RepositoryDirectoryInterface; import org.pentaho.di.repository.RepositoryDirectoryInterface;


import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;


import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doCallRealMethod; import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doReturn;


public class SchedulerRequestTest { public class SchedulerRequestTest {
private static final String EMPTY_STRING = ""; // input file
private static final String TEST_PARAM_NAME = "paramName"; private static final String TEST_REPOSITORY_DIRECTORY = "/home/admin";
private static final String[] ARRAY_WITH_PARAM_NAME = new String[]{TEST_PARAM_NAME}; private static final String TEST_JOB_NAME = "jobName";
private static final String JOB_EXTENSION = "kjb";

// job parameters
private static final String STRING_PARAM_TYPE = "string";
private static final String LOG_LEVEL_PARAM_NAME = "logLevel";
private static final String TEST_LOG_LEVEL_PARAM_VALUE = "Rowlevel";
private static final String CLEAR_LOG_PARAM_NAME = "clearLog";
private static final String TEST_CLEAR_LOG_PARAM_VALUE = "true";
private static final String RUN_SAFE_MODE_PARAM_NAME = "runSafeMode";
private static final String TEST_RUN_SAFE_MODE_PARAM_VALUE = "false";
private static final String GATHERING_METRICS_PARAM_NAME = "gatheringMetrics";
private static final String TEST_GATHERING_METRICS_PARAM_VALUE = "false";
private static final String START_COPY_NAME_PARAM_NAME = "startCopyName";
private static final String TEST_START_COPY_NAME_PARAM_VALUE = "stepName";
private static final String EXPANDING_REMOTE_JOB_PARAM_NAME = "expandingRemoteJob";
private static final String TEST_EXPANDING_REMOTE_JOB_PARAM_VALUE = "false";

// pdi parameters
private static final String TEST_PDI_PARAM_NAME = "paramName";
private static final String TEST_PDI_PARAM_VALUE = "paramValue";
private static final String[] ARRAY_WITH_TEST_PDI_PARAM_NAME = new String[]{TEST_PDI_PARAM_NAME};

private static final String REFERENCE_TEST_REQUEST = String.format( "<jobScheduleRequest>\n"
+ "<inputFile>%s/%s.%s</inputFile>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<jobParameters>\n"
+ "<name>%s</name>\n" + "<type>%s</type>\n" + "<stringValue>%s</stringValue>\n"
+ "</jobParameters>\n"
+ "<pdiParameters>\n"
+ "<entry>\n"
+ "<key>%s</key>\n" + "<value>%s</value>\n"
+ "</entry>\n"
+ "</pdiParameters>\n"
+ "</jobScheduleRequest>", TEST_REPOSITORY_DIRECTORY, TEST_JOB_NAME, JOB_EXTENSION,
LOG_LEVEL_PARAM_NAME, STRING_PARAM_TYPE, TEST_LOG_LEVEL_PARAM_VALUE,
CLEAR_LOG_PARAM_NAME, STRING_PARAM_TYPE, TEST_CLEAR_LOG_PARAM_VALUE,
RUN_SAFE_MODE_PARAM_NAME, STRING_PARAM_TYPE, TEST_RUN_SAFE_MODE_PARAM_VALUE,
GATHERING_METRICS_PARAM_NAME, STRING_PARAM_TYPE, TEST_GATHERING_METRICS_PARAM_VALUE,
START_COPY_NAME_PARAM_NAME, STRING_PARAM_TYPE, TEST_START_COPY_NAME_PARAM_VALUE,
EXPANDING_REMOTE_JOB_PARAM_NAME, STRING_PARAM_TYPE, TEST_EXPANDING_REMOTE_JOB_PARAM_VALUE,
TEST_PDI_PARAM_NAME, TEST_PDI_PARAM_VALUE );


private SchedulerRequest schedulerRequest; private SchedulerRequest schedulerRequest;


Expand All @@ -28,19 +110,45 @@ public void before() {
@Test @Test
@SuppressWarnings( "ResultOfMethodCallIgnored" ) @SuppressWarnings( "ResultOfMethodCallIgnored" )
public void testBuildSchedulerRequestEntity() throws UnknownParamException, UnsupportedEncodingException { public void testBuildSchedulerRequestEntity() throws UnknownParamException, UnsupportedEncodingException {
AbstractMeta abstractMeta = mock( AbstractMeta.class ); AbstractMeta meta = mock( JobMeta.class );
RepositoryDirectoryInterface repositoryDirectoryInterface = mock( RepositoryDirectoryInterface.class ); RepositoryDirectoryInterface repositoryDirectory = mock( RepositoryDirectoryInterface.class );


doReturn( repositoryDirectoryInterface ).when( abstractMeta ).getRepositoryDirectory(); doReturn( repositoryDirectory ).when( meta ).getRepositoryDirectory();
doReturn( EMPTY_STRING ).when( repositoryDirectoryInterface ).getPath(); doReturn( TEST_REPOSITORY_DIRECTORY ).when( repositoryDirectory ).getPath();
doReturn( EMPTY_STRING ).when( abstractMeta ).getName(); doReturn( TEST_JOB_NAME ).when( meta ).getName();
doReturn( EMPTY_STRING ).when( abstractMeta ).getDefaultExtension(); doReturn( JOB_EXTENSION ).when( meta ).getDefaultExtension();
doReturn( ARRAY_WITH_PARAM_NAME ).when( abstractMeta ).listParameters();


doCallRealMethod().when( schedulerRequest ).buildSchedulerRequestEntity( abstractMeta ); doReturn( LogLevel.getLogLevelForCode( TEST_LOG_LEVEL_PARAM_VALUE ) ).when( meta ).getLogLevel();
schedulerRequest.buildSchedulerRequestEntity( abstractMeta ); doReturn( Boolean.valueOf( TEST_CLEAR_LOG_PARAM_VALUE ) ).when( meta ).isClearingLog();
doReturn( Boolean.valueOf( TEST_RUN_SAFE_MODE_PARAM_VALUE ) ).when( meta ).isSafeModeEnabled();
doReturn( Boolean.valueOf( TEST_GATHERING_METRICS_PARAM_VALUE ) ).when( meta ).isGatheringMetrics();
doReturn( TEST_START_COPY_NAME_PARAM_VALUE ).when( (JobMeta) meta ).getStartCopyName();
doReturn( Boolean.valueOf( TEST_EXPANDING_REMOTE_JOB_PARAM_VALUE ) ).when( (JobMeta) meta ).isExpandingRemoteJob();

doReturn( ARRAY_WITH_TEST_PDI_PARAM_NAME ).when( meta ).listParameters();
doReturn( TEST_PDI_PARAM_VALUE ).when( meta ).getParameterValue( TEST_PDI_PARAM_NAME );

doCallRealMethod().when( schedulerRequest ).buildSchedulerRequestEntity( meta );

assertTrue( compareContentOfStringEntities( schedulerRequest.buildSchedulerRequestEntity( meta ),
new StringEntity( REFERENCE_TEST_REQUEST ) ) );
}


verify( abstractMeta ).listParameters(); private boolean compareContentOfStringEntities( StringEntity entity1, StringEntity entity2 ) {
verify( abstractMeta ).getParameterValue( TEST_PARAM_NAME ); if ( entity1.getContentLength() == entity2.getContentLength() ) {
try ( InputStream stream1 = entity1.getContent();
InputStream stream2 = entity2.getContent() ) {
while ( stream1.available() > 0 ) {
if ( stream1.read() != stream2.read() ) {
return false;
}
}
return true;
} catch ( IOException e ) {
return false;
}
} else {
return false;
}
} }
} }

0 comments on commit 4177e68

Please sign in to comment.