Skip to content

Commit

Permalink
[PDI-15138] Synchronize after merge step: commit size can't be set fr…
Browse files Browse the repository at this point in the history
…om variable
  • Loading branch information
Luis Martins committed Dec 9, 2018
1 parent f00697e commit 7f74010
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 14 deletions.
Expand Up @@ -867,7 +867,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
data.releaseSavepoint = false;
}

data.commitSize = Integer.parseInt( environmentSubstitute( "" + meta.getCommitSize() ) );
data.commitSize = Integer.parseInt( environmentSubstitute( meta.getCommitSize() ) );
data.batchMode = data.commitSize > 0 && meta.useBatchUpdate();

// Batch updates are not supported on PostgreSQL (and look-a-likes) together with error handling (PDI-366)
Expand Down Expand Up @@ -897,7 +897,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
} else {
data.db.connect( getPartitionID() );
}
data.db.setCommit( meta.getCommitSize() );
data.db.setCommit( data.commitSize );

return true;
} catch ( KettleException ke ) {
Expand Down
Expand Up @@ -109,7 +109,7 @@ public class SynchronizeAfterMergeMeta extends BaseStepMeta implements StepMetaI

/** Commit size for inserts/updates */
@Injection( name = "COMMIT_SIZE" )
private int commitSize;
private String commitSize;

@Injection( name = "TABLE_NAME_IN_FIELD" )
private boolean tablenameInField;
Expand Down Expand Up @@ -225,7 +225,7 @@ public void settablenameField( String tablenamefield ) {
/**
* @return Returns the commitSize.
*/
public int getCommitSize() {
public String getCommitSize() {
return commitSize;
}

Expand All @@ -234,6 +234,14 @@ public int getCommitSize() {
* The commitSize to set.
*/
public void setCommitSize( int commitSize ) {
this.commitSize = Integer.toString( commitSize );
}

/**
* @param commitSize
* The commitSize to set.
*/
public void setCommitSize( String commitSize ) {
this.commitSize = commitSize;
}

Expand Down Expand Up @@ -412,13 +420,11 @@ public Object clone() {

private void readData( Node stepnode, List<? extends SharedObjectInterface> databases ) throws KettleXMLException {
try {
String csize;
int nrkeys, nrvalues;
this.databases = databases;
String con = XMLHandler.getTagValue( stepnode, "connection" );
databaseMeta = DatabaseMeta.findDatabase( databases, con );
csize = XMLHandler.getTagValue( stepnode, "commit" );
commitSize = Const.toInt( csize, 0 );
commitSize = XMLHandler.getTagValue( stepnode, "commit" );
schemaName = XMLHandler.getTagValue( stepnode, "lookup", "schema" );
tableName = XMLHandler.getTagValue( stepnode, "lookup", "table" );

Expand Down Expand Up @@ -482,7 +488,7 @@ public void setDefault() {
keyStream = null;
updateLookup = null;
databaseMeta = null;
commitSize = 100;
commitSize = "100";
schemaName = "";
tableName = BaseMessages.getString( PKG, "SynchronizeAfterMergeMeta.DefaultTableName" );
operationOrderField = null;
Expand Down Expand Up @@ -561,7 +567,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
this.databases = databases;
databaseMeta = rep.loadDatabaseMetaFromStepAttribute( id_step, "id_connection", databases );

commitSize = (int) rep.getStepAttributeInteger( id_step, "commit" );
commitSize = rep.getStepAttributeString( id_step, "commit" );
schemaName = rep.getStepAttributeString( id_step, "schema" );
tableName = rep.getStepAttributeString( id_step, "table" );

Expand Down
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2016 - 2017 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2016 - 2018 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -97,9 +97,9 @@ public boolean get() {
}
} );

check( "COMMIT_SIZE", new IntGetter() {
check( "COMMIT_SIZE", new StringGetter() {
@Override
public int get() {
public String get() {
return meta.getCommitSize();
}
} );
Expand Down
@@ -0,0 +1,79 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2018 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.trans.steps.synchronizeaftermerge;

import org.junit.Test;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.database.MySQLDatabaseMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

public class SynchronizeAfterMergeTest {

private static final String STEP_NAME = "Sync";

@Test
public void initWithCommitSizeVariable() {
StepMeta stepMeta = mock( StepMeta.class );
doReturn( STEP_NAME ).when( stepMeta ).getName();
doReturn( 1 ).when( stepMeta ).getCopies();

SynchronizeAfterMergeMeta smi = mock( SynchronizeAfterMergeMeta.class );
SynchronizeAfterMergeData sdi = mock( SynchronizeAfterMergeData.class );

DatabaseMeta dbMeta = mock( DatabaseMeta.class );
doReturn( mock( MySQLDatabaseMeta.class ) ).when( dbMeta ).getDatabaseInterface();

doReturn( dbMeta ).when( smi ).getDatabaseMeta();
doReturn( "${commit.size}" ).when( smi ).getCommitSize();

TransMeta transMeta = mock( TransMeta.class );
doReturn( "1" ).when( transMeta ).getVariable( Const.INTERNAL_VARIABLE_SLAVE_SERVER_NUMBER );
doReturn( "2" ).when( transMeta ).getVariable( Const.INTERNAL_VARIABLE_CLUSTER_SIZE );
doReturn( "Y" ).when( transMeta ).getVariable( Const.INTERNAL_VARIABLE_CLUSTER_MASTER );
doReturn( stepMeta ).when( transMeta ).findStep( STEP_NAME );

SynchronizeAfterMerge step = mock( SynchronizeAfterMerge.class );
doCallRealMethod().when( step ).setTransMeta( any( TransMeta.class ) );
doCallRealMethod().when( step ).setStepMeta( any( StepMeta.class ) );
doCallRealMethod().when( step ).init( any( StepMetaInterface.class ), any( StepDataInterface.class ) );
doReturn( stepMeta ).when( step ).getStepMeta();
doReturn( transMeta ).when( step ).getTransMeta();
doReturn( "120" ).when( step ).environmentSubstitute( "${commit.size}" );

step.setTransMeta( transMeta );
step.setStepMeta( stepMeta );
step.init( smi, sdi );

assertEquals( 120, sdi.commitSize );
}
}
Expand Up @@ -335,6 +335,7 @@ public void widgetSelected( SelectionEvent e ) {
fdlCommit.top = new FormAttachment( wTable, margin );
fdlCommit.right = new FormAttachment( middle, -margin );
wlCommit.setLayoutData( fdlCommit );

wCommit = new TextVar( transMeta, wGeneralComp, SWT.SINGLE | SWT.LEFT | SWT.BORDER );
props.setLook( wCommit );
wCommit.addModifyListener( lsMod );
Expand Down Expand Up @@ -1064,7 +1065,7 @@ public void getData() {
logDebug( BaseMessages.getString( PKG, "SynchronizeAfterMergeDialog.Log.GettingKeyInfo" ) );
}

wCommit.setText( "" + input.getCommitSize() );
wCommit.setText( input.getCommitSize() );
wTablenameInField.setSelection( input.istablenameInField() );
if ( input.gettablenameField() != null ) {
wTableField.setText( input.gettablenameField() );
Expand Down Expand Up @@ -1153,7 +1154,7 @@ private void getInfo( SynchronizeAfterMergeMeta inf ) {

inf.allocate( nrkeys, nrfields );

inf.setCommitSize( Const.toInt( wCommit.getText(), 0 ) );
inf.setCommitSize( wCommit.getText() );
inf.settablenameInField( wTablenameInField.getSelection() );
inf.settablenameField( wTableField.getText() );
inf.setUseBatchUpdate( wBatch.getSelection() );
Expand Down

0 comments on commit 7f74010

Please sign in to comment.