Skip to content

Commit

Permalink
[PDI-15086] Salesforce Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Tucker committed Apr 19, 2016
1 parent 9cf47b2 commit 4850f52
Show file tree
Hide file tree
Showing 60 changed files with 3,259 additions and 2,089 deletions.
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -31,7 +31,7 @@
* IBM Corporation - initial API and implementation * IBM Corporation - initial API and implementation
*******************************************************************************/ *******************************************************************************/


package org.pentaho.di.trans.steps.salesforceinput; package org.pentaho.di.trans.steps.salesforce;


import java.io.IOException; import java.io.IOException;
import java.io.StringReader; import java.io.StringReader;
Expand Down
Expand Up @@ -19,7 +19,8 @@
* limitations under the License. * limitations under the License.
* *
******************************************************************************/ ******************************************************************************/
package org.pentaho.di.trans.steps.salesforceinput;
package org.pentaho.di.trans.steps.salesforce;


import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
Expand All @@ -40,8 +41,10 @@
import org.pentaho.di.core.Const; import org.pentaho.di.core.Const;
import org.pentaho.di.core.encryption.Encr; import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogChannelInterface; import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.steps.salesforceinput.SalesforceInputMeta;
import org.w3c.dom.Element; import org.w3c.dom.Element;


import com.sforce.soap.partner.AllOrNoneHeader; import com.sforce.soap.partner.AllOrNoneHeader;
Expand Down Expand Up @@ -77,8 +80,6 @@ public class SalesforceConnection {
private String password; private String password;
private String module; private String module;
private int timeout; private int timeout;
private String condition;

private SoapBindingStub binding; private SoapBindingStub binding;
private LoginResult loginResult; private LoginResult loginResult;
private GetUserInfoResult userInfo; private GetUserInfoResult userInfo;
Expand All @@ -103,7 +104,11 @@ public class SalesforceConnection {
* Construct a new Salesforce Connection * Construct a new Salesforce Connection
*/ */
public SalesforceConnection( LogChannelInterface logInterface, String url, String username, String password ) throws KettleException { public SalesforceConnection( LogChannelInterface logInterface, String url, String username, String password ) throws KettleException {
this.log = logInterface; if ( logInterface == null ) {
this.log = KettleLogStore.getLogChannelInterfaceFactory().create( this );
} else {
this.log = logInterface;
}
this.url = url; this.url = url;
setUsername( username ); setUsername( username );
setPassword( password ); setPassword( password );
Expand All @@ -115,7 +120,6 @@ public SalesforceConnection( LogChannelInterface logInterface, String url, Strin
this.sql = null; this.sql = null;
this.serverTimestamp = null; this.serverTimestamp = null;
this.qr = null; this.qr = null;
this.condition = null;
this.startDate = null; this.startDate = null;
this.endDate = null; this.endDate = null;
this.sObjects = null; this.sObjects = null;
Expand All @@ -124,7 +128,7 @@ public SalesforceConnection( LogChannelInterface logInterface, String url, Strin
this.queryResultSize = 0; this.queryResultSize = 0;
this.recordsCount = 0; this.recordsCount = 0;
setUsingCompression( false ); setUsingCompression( false );
rollbackAllChangesOnError( false ); setRollbackAllChangesOnError( false );


// check target URL // check target URL
if ( Const.isEmpty( getURL() ) ) { if ( Const.isEmpty( getURL() ) ) {
Expand All @@ -145,15 +149,33 @@ public boolean isRollbackAllChangesOnError() {
return this.rollbackAllChangesOnError; return this.rollbackAllChangesOnError;
} }


/**
*
* @see #isRollbackAllChangesOnError(boolean)
*/
@Deprecated
public void rollbackAllChangesOnError( boolean value ) { public void rollbackAllChangesOnError( boolean value ) {
setRollbackAllChangesOnError( value );
}

public void setRollbackAllChangesOnError( boolean value ) {
this.rollbackAllChangesOnError = value; this.rollbackAllChangesOnError = value;
} }


public boolean isQueryAll() { public boolean isQueryAll() {
return this.queryAll; return this.queryAll;
} }


/**
*
* @see #setQueryAll(boolean)
*/
@Deprecated
public void queryAll( boolean value ) { public void queryAll( boolean value ) {
setQueryAll( value );
}

public void setQueryAll( boolean value ) {
this.queryAll = value; this.queryAll = value;
} }


Expand All @@ -169,20 +191,12 @@ public void setCalendar( int recordsFilter, GregorianCalendar startDate, Gregori
} }
// Calculate difference in days // Calculate difference in days
long diffDays = long diffDays =
( this.startDate.getTime().getTime() - this.endDate.getTime().getTime() ) / ( 24 * 60 * 60 * 1000 ); ( this.endDate.getTime().getTime() - this.startDate.getTime().getTime() ) / ( 24 * 60 * 60 * 1000 );
if ( diffDays > 30 ) { if ( diffDays > 30 ) {
throw new KettleException( BaseMessages.getString( PKG, "SalesforceInput.Error.StartDateTooOlder" ) ); throw new KettleException( BaseMessages.getString( PKG, "SalesforceInput.Error.StartDateTooOlder" ) );
} }
} }


public void setCondition( String condition ) {
this.condition = condition;
}

public String getCondition() {
return this.condition;
}

public void setSQL( String sql ) { public void setSQL( String sql ) {
this.sql = sql; this.sql = sql;
} }
Expand Down Expand Up @@ -215,6 +229,11 @@ public QueryResult getQueryResult() {
return this.qr; return this.qr;
} }


public void createBinding() throws ServiceException {
if ( this.binding == null ) {
this.binding = (SoapBindingStub) new SforceServiceLocator().getSoap();
}
}
public SoapBindingStub getBinding() { public SoapBindingStub getBinding() {
return this.binding; return this.binding;
} }
Expand Down Expand Up @@ -251,46 +270,36 @@ public void setPassword( String value ) {
this.password = value; this.password = value;
} }


/**
* It is extracted method for test goal, should use only for test purpose
*
* @return SoapBindingStub - new soap binding stub
* @throws ServiceException
*/
protected SoapBindingStub getSoapBinding() throws ServiceException {
return (SoapBindingStub) new SforceServiceLocator().getSoap();
}

public void connect() throws KettleException { public void connect() throws KettleException {
try { try {
this.binding = getSoapBinding(); createBinding();
if ( log.isDetailed() ) { if ( log.isDetailed() ) {
log.logDetailed( BaseMessages.getString( PKG, "SalesforceInput.Log.LoginURL", binding log.logDetailed( BaseMessages.getString( PKG, "SalesforceInput.Log.LoginURL", getBinding()
._getProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY ) ) ); ._getProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY ) ) );
} }


// Set timeout // Set timeout
if ( getTimeOut() > 0 ) { if ( getTimeOut() > 0 ) {
this.binding.setTimeout( getTimeOut() ); getBinding().setTimeout( getTimeOut() );
if ( log.isDebug() ) { if ( log.isDebug() ) {
log.logDebug( BaseMessages.getString( PKG, "SalesforceInput.Log.SettingTimeout", "" + this.timeout ) ); log.logDebug( BaseMessages.getString( PKG, "SalesforceInput.Log.SettingTimeout", "" + this.timeout ) );
} }
} }


// Set URL // Set URL
this.binding._setProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY, getURL() ); getBinding()._setProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY, getURL() );


// Do we need compression? // Do we need compression?
if ( isUsingCompression() ) { if ( isUsingCompression() ) {
this.binding._setProperty( HTTPConstants.MC_ACCEPT_GZIP, useCompression ); getBinding()._setProperty( HTTPConstants.MC_ACCEPT_GZIP, useCompression );
this.binding._setProperty( HTTPConstants.MC_GZIP_REQUEST, useCompression ); getBinding()._setProperty( HTTPConstants.MC_GZIP_REQUEST, useCompression );
} }
if ( isRollbackAllChangesOnError() ) { if ( isRollbackAllChangesOnError() ) {
// Set the SOAP header to rollback all changes // Set the SOAP header to rollback all changes
// unless all records are processed successfully. // unless all records are processed successfully.
AllOrNoneHeader allOrNoneHeader = new AllOrNoneHeader(); AllOrNoneHeader allOrNoneHeader = new AllOrNoneHeader();
allOrNoneHeader.setAllOrNone( true ); allOrNoneHeader.setAllOrNone( true );
this.binding.setHeader( getBinding().setHeader(
new SforceServiceLocator().getServiceName().getNamespaceURI(), "AllOrNoneHeader", allOrNoneHeader ); new SforceServiceLocator().getServiceName().getNamespaceURI(), "AllOrNoneHeader", allOrNoneHeader );
} }
// Attempt the login giving the user feedback // Attempt the login giving the user feedback
Expand All @@ -302,9 +311,6 @@ public void connect() throws KettleException {
if ( getModule() != null ) { if ( getModule() != null ) {
log.logDetailed( BaseMessages.getString( PKG, "SalesforceInput.Log.LoginModule", getModule() ) ); log.logDetailed( BaseMessages.getString( PKG, "SalesforceInput.Log.LoginModule", getModule() ) );
} }
if ( getCondition() != null ) {
log.logDetailed( BaseMessages.getString( PKG, "SalesforceInput.Log.LoginCondition", getCondition() ) );
}
log.logDetailed( "<-----------------------------------------" ); log.logDetailed( "<-----------------------------------------" );
} }


Expand All @@ -319,16 +325,16 @@ public void connect() throws KettleException {
} }


// set the session header for subsequent call authentication // set the session header for subsequent call authentication
this.binding._setProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY, this.loginResult.getServerUrl() ); getBinding()._setProperty( SoapBindingStub.ENDPOINT_ADDRESS_PROPERTY, this.loginResult.getServerUrl() );


// Create a new session header object and set the session id to that // Create a new session header object and set the session id to that
// returned by the login // returned by the login
SessionHeader sh = new SessionHeader(); SessionHeader sh = new SessionHeader();
sh.setSessionId( loginResult.getSessionId() ); sh.setSessionId( loginResult.getSessionId() );
this.binding.setHeader( new SforceServiceLocator().getServiceName().getNamespaceURI(), "SessionHeader", sh ); getBinding().setHeader( new SforceServiceLocator().getServiceName().getNamespaceURI(), "SessionHeader", sh );


// Return the user Infos // Return the user Infos
this.userInfo = this.binding.getUserInfo(); this.userInfo = getBinding().getUserInfo();
if ( log.isDebug() ) { if ( log.isDebug() ) {
log.logDebug( BaseMessages.getString( PKG, "SalesforceInput.Log.UserInfos" ) log.logDebug( BaseMessages.getString( PKG, "SalesforceInput.Log.UserInfos" )
+ " : " + this.userInfo.getUserFullName() ); + " : " + this.userInfo.getUserFullName() );
Expand Down
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand All @@ -20,9 +20,10 @@
* *
******************************************************************************/ ******************************************************************************/


package org.pentaho.di.trans.steps.salesforceinput; package org.pentaho.di.trans.steps.salesforce;


import org.pentaho.di.i18n.BaseMessages; import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.steps.salesforceinput.SalesforceInputMeta;


public class SalesforceConnectionUtils { public class SalesforceConnectionUtils {


Expand Down
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand All @@ -20,7 +20,7 @@
* *
******************************************************************************/ ******************************************************************************/


package org.pentaho.di.trans.steps.salesforceinput; package org.pentaho.di.trans.steps.salesforce;


import java.util.Date; import java.util.Date;


Expand Down
@@ -0,0 +1,102 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.trans.steps.salesforce;

import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;

public abstract class SalesforceStep extends BaseStep implements StepInterface {

public static Class<?> PKG = SalesforceStep.class;

public SalesforceStepMeta meta;
public SalesforceStepData data;

public SalesforceStep( StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans ) {
super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

@Override
public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
if ( !super.init( smi, sdi ) ) {
return false;
}
meta = (SalesforceStepMeta) smi;
data = (SalesforceStepData) sdi;

String realUrl = environmentSubstitute( meta.getTargetURL() );
String realUsername = environmentSubstitute( meta.getUsername() );
String realPassword = environmentSubstitute( meta.getPassword() );
String realModule = environmentSubstitute( meta.getModule() );

if ( Const.isEmpty( realUrl ) ) {
log.logError( BaseMessages.getString( PKG, "SalesforceStep.TargetURLMissing.Error" ) );
return false;
}
if ( Const.isEmpty( realUsername ) ) {
log.logError( BaseMessages.getString( PKG, "SalesforceInput.UsernameMissing.Error" ) );
return false;
}
if ( Const.isEmpty( realPassword ) ) {
log.logError( BaseMessages.getString( PKG, "SalesforceInput.PasswordMissing.Error" ) );
return false;
}
if ( Const.isEmpty( realModule ) ) {
log.logError( BaseMessages.getString( PKG, "SalesforceInputDialog.ModuleMissing.DialogMessage" ) );
return false;
}
try {
// The final step should call data.connection.connect(), as other settings may set additional options
data.connection = new SalesforceConnection( log, realUrl, realUsername, realPassword );
data.connection.setModule( realModule );
data.connection.setTimeOut( Const.toInt( environmentSubstitute( meta.getTimeout() ), 0 ) );
data.connection.setUsingCompression( meta.isCompression() );
} catch ( KettleException ke ) {
logError( BaseMessages.getString( PKG, "SalesforceInput.Log.ErrorOccurredDuringStepInitialize" )
+ ke.getMessage() );
return false;
}
return true;
}

public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
if ( data.connection != null ) {
try {
data.connection.close();
} catch ( KettleException ignored ) {
/* Ignore */
}
data.connection = null;
}
super.dispose( smi, sdi );
}
}

0 comments on commit 4850f52

Please sign in to comment.