Skip to content

Commit

Permalink
[PDI-13961] - Spoon firstly tries to conect via JNDI although connect…
Browse files Browse the repository at this point in the history
…ion type is JDBC

- introduce a new method for distinguishing the source where to look for
  data sources
- remove some legacy code, that is not used anymore
- add tests
  • Loading branch information
Andrey Khayrutdinov committed Aug 28, 2015
1 parent 0168dcc commit 21313c1
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 105 deletions.
Expand Up @@ -38,6 +38,19 @@ public interface DataSourceProviderInterface {
* @param datasourceName
* @return javax.sql.DataSource
*/
public DataSource getNamedDataSource( String datasourceName ) throws DataSourceNamingException;
DataSource getNamedDataSource( String datasourceName ) throws DataSourceNamingException;

/**
* Returns the named data source of respecting its <code>type</code>
*
* @param datasourceName name of the desired data source
* @param type data source's type
* @return named data source
* @throws DataSourceNamingException
*/
DataSource getNamedDataSource( String datasourceName, DatasourceType type ) throws DataSourceNamingException;

enum DatasourceType {
JNDI, POOLED
}
}
148 changes: 44 additions & 104 deletions core/src/org/pentaho/di/core/database/Database.java
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -54,8 +54,6 @@
import java.util.Properties;
import java.util.Set;

import javax.sql.DataSource;

import org.apache.commons.vfs2.FileObject;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.Counter;
Expand All @@ -64,6 +62,7 @@
import org.pentaho.di.core.ProgressMonitorListener;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.database.DataSourceProviderInterface.DatasourceType;
import org.pentaho.di.core.database.map.DatabaseConnectionMap;
import org.pentaho.di.core.database.util.DatabaseLogExceptionFactory;
import org.pentaho.di.core.database.util.DatabaseUtil;
Expand Down Expand Up @@ -370,7 +369,8 @@ public synchronized void connect( String group, String partitionId ) throws Kett
}
}

private synchronized void shareConnectionWith( String partitionId, Database anotherDb ) throws KettleDatabaseException {
private synchronized void shareConnectionWith( String partitionId, Database anotherDb )
throws KettleDatabaseException {
// inside synchronized block we can increment 'opened' directly
this.opened++;

Expand All @@ -388,7 +388,13 @@ private synchronized void shareConnectionWith( String partitionId, Database anot
}

/**
* Open the database connection.
* Open the database connection. The algorithm is:
* <ol>
* <li>If <code>databaseMeta.getAccessType()</code> returns
* <code>DatabaseMeta.TYPE_ACCESS_JNDI</code>, then the connection's datasource is looked up in JNDI </li>
* <li>If <code>databaseMeta.isUsingConnectionPool()</code>, then the connection's datasource is looked up in the pool</li>
* <li>otherwise, the connection is established via {@linkplain java.sql.DriverManager}</li>
* </ol>
*
* @param partitionId the partition ID in the cluster to connect to.
* @throws KettleDatabaseException if something went wrong.
Expand All @@ -399,37 +405,45 @@ public void normalConnect( String partitionId ) throws KettleDatabaseException {
}

try {
// First see if we use connection pooling...
//
// isUsingConnectionPool defaults to false for backward compatibility
//
// JNDI does pooling on it's own.
DataSourceProviderInterface dsp = DataSourceProviderFactory.getDataSourceProviderInterface();
if ( dsp == null ) {
// since DataSourceProviderFactory is initialised with new DatabaseUtil(),
// this assignment is correct
dsp = new DatabaseUtil();
}

if ( databaseMeta.isUsingConnectionPool() && databaseMeta.getAccessType() != DatabaseMeta.TYPE_ACCESS_JNDI ) {
if ( databaseMeta.getAccessType() == DatabaseMeta.TYPE_ACCESS_JNDI ) {
String jndiName = environmentSubstitute( databaseMeta.getDatabaseName() );
try {
this.connection = ConnectionPoolUtil.getConnection( log, databaseMeta, partitionId );
if ( getConnection().getAutoCommit() != isAutoCommit() ) {
setAutoCommit( isAutoCommit() );
}
} catch ( Exception e ) {
throw new KettleDatabaseException( "Error occurred while trying to connect to the database", e );
this.connection = dsp.getNamedDataSource( jndiName, DatasourceType.JNDI ).getConnection();
} catch ( DataSourceNamingException e ) {
log.logError( "Unable to find datasource by JNDI name: " + jndiName, e );
throw e;
}
} else if ( databaseMeta.getAccessType() == DatabaseMeta.TYPE_ACCESS_JNDI ) {
final String jndiName = environmentSubstitute( databaseMeta.getDatabaseName() );
try {
connectUsingJNDIDataSource( jndiName );
} catch ( KettleDatabaseException kde ) {
// This was a new path that was added. If in case we did not find this datasource in JNDI,
// we were throwing exception and exiting out of this method. We will attempt to load this datasource
// using the classs if JNDI lookup fail. This is how it was working in 5.3
if ( log.isDetailed() ) {
log.logDetailed( "Unable to find datasource using JNDI. Cause: " + kde.getLocalizedMessage() );
} else {
if ( databaseMeta.isUsingConnectionPool() ) {
String name = databaseMeta.getName();
try {
try {
this.connection = dsp.getNamedDataSource( name, DatasourceType.POOLED ).getConnection();
} catch ( UnsupportedOperationException e ) {
// DatabaseUtil doesn't support pooled DS,
// use legacy routine
this.connection = ConnectionPoolUtil.getConnection( log, databaseMeta, partitionId );
}
if ( getConnection().getAutoCommit() != isAutoCommit() ) {
setAutoCommit( isAutoCommit() );
}
} catch ( DataSourceNamingException e ) {
log.logError( "Unable to find pooled datasource by its name: " + name, e );
throw e;
}
connectUsingClass();
} else {
// using non-jndi and non-pooled connection -- just a simple JDBC
connectUsingClass( databaseMeta.getDriverClass(), partitionId );
}
} else {
connectUsingClass();
}

// See if we need to execute extra SQL statement...
String sql = environmentSubstitute( databaseMeta.getConnectSQL() );

Expand All @@ -446,57 +460,6 @@ public void normalConnect( String partitionId ) throws KettleDatabaseException {
}
}

/**
* Initialize by getting the connection from a javax.sql.DataSource. This method uses the DataSourceProviderFactory to
* get the provider of DataSource objects.
*
* @param dataSourceName
* @throws KettleDatabaseException
*/
private void connectUsingNamedDataSource( String dataSourceName ) throws KettleDatabaseException {
connection = null;
DataSource dataSource =
DataSourceProviderFactory.getDataSourceProviderInterface().getNamedDataSource( dataSourceName );
if ( dataSource != null ) {
try {
connection = dataSource.getConnection();
} catch ( SQLException e ) {
throw new KettleDatabaseException( "Invalid named connection " + dataSourceName + " : " + e.getMessage() );
}
if ( connection == null ) {
throw new KettleDatabaseException( "Invalid named connection " + dataSourceName );
}
} else {
throw new KettleDatabaseException( "Invalid named connection " + dataSourceName );
}
}

/**
* Initialize by getting the connection from a javax.sql.DataSource.
* <p/>
* This method now _does_not_use the DataSourceProviderFactory to get the provider of DataSource objects.
*
* @param jndiName
* @throws KettleDatabaseException
*/
private void connectUsingJNDIDataSource( String jndiName ) throws KettleDatabaseException {
Connection connection = null;
DataSource dataSource = ( new DatabaseUtil() ).getNamedDataSource( jndiName );
if ( dataSource != null ) {
try {
connection = dataSource.getConnection();
} catch ( SQLException e ) {
throw new KettleDatabaseException( "Invalid JNDI connection " + jndiName + " : " + e.getMessage() );
}
if ( connection == null ) {
throw new KettleDatabaseException( "Invalid JNDI connection " + jndiName );
}
} else {
throw new KettleDatabaseException( "Invalid JNDI connection " + jndiName );
}
this.connection = connection;
}

/**
* Connect using the correct classname
*
Expand Down Expand Up @@ -4700,27 +4663,4 @@ public void setForcingSeparateLogging( boolean forcingSeparateLogging ) {
log.setForcingSeparateLogging( forcingSeparateLogging );
}
}

private void connectUsingClass() throws KettleDatabaseException {
// TODO connectUsingNamedDataSource can be called here but the current implementation of
// org.pentaho.platform.plugin.action.kettle.PlatformKettleDataSourceProvider can cause collision of name and
// JNDI name. See also [PDI-13633], [SP-1776].
// We will first try to find the connection in the named datasources. If we can't find it there,
// we will connect using the class.
try {
if ( log.isDetailed() ) {
log.logDetailed( "Attempting to find connection in Named Datasources" );
}
connectUsingNamedDataSource( environmentSubstitute( databaseMeta.getDatabaseName() ) );
} catch ( KettleDatabaseException kde ) {
if ( log.isDetailed() ) {
log.logDetailed( "Unable to find datasource in Named Datasources."
+ " Finally will try to attempt connecting using class " );
}
connectUsingClass( databaseMeta.getDriverClass(), partitionId );
}
if ( log.isDetailed() ) {
log.logDetailed( "Connected to database." );
}
}
}
15 changes: 15 additions & 0 deletions core/src/org/pentaho/di/core/database/util/DatabaseUtil.java
Expand Up @@ -133,4 +133,19 @@ public DataSource getNamedDataSource( String datasourceName ) throws DataSourceN
throw new DataSourceNamingException( ex );
}
}

@Override
public DataSource getNamedDataSource( String datasourceName, DatasourceType type )
throws DataSourceNamingException {
if ( type != null ) {
switch( type ) {
case JNDI:
return getNamedDataSource( datasourceName );
case POOLED:
throw new UnsupportedOperationException(
getClass().getName() + " does not support providing pooled data sources" );
}
}
throw new IllegalArgumentException( "Unsupported data source type: " + type );
}
}
58 changes: 58 additions & 0 deletions core/test-src/org/pentaho/di/core/database/DatabaseUnitTest.java
Expand Up @@ -24,6 +24,8 @@

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.pentaho.di.core.database.DataSourceProviderInterface.DatasourceType.JNDI;
import static org.pentaho.di.core.database.DataSourceProviderInterface.DatasourceType.POOLED;

import java.sql.BatchUpdateException;
import java.sql.Connection;
Expand All @@ -48,6 +50,8 @@
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.variables.VariableSpace;

import javax.sql.DataSource;

public class DatabaseUnitTest {

static LoggingObjectInterface log = new SimpleLoggingObject( "junit", LoggingObjectType.GENERAL, null );
Expand Down Expand Up @@ -451,4 +455,58 @@ private static Connection mockConnection( DatabaseMetaData dbMetaData ) throws S
when( connection.getMetaData() ).thenReturn( dbMetaData );
return connection;
}


@Test
public void usesCustomDsProviderIfSet_Pooling() throws Exception {
DatabaseMeta meta = new DatabaseMeta();
meta.setUsingConnectionPool( true );
testUsesCustomDsProviderIfSet( meta );
}

@Test
public void usesCustomDsProviderIfSet_Jndi() throws Exception {
DatabaseMeta meta = new DatabaseMeta();
meta.setAccessType( DatabaseMeta.TYPE_ACCESS_JNDI );
testUsesCustomDsProviderIfSet( meta );
}

private DataSourceProviderInterface testUsesCustomDsProviderIfSet( DatabaseMeta meta ) throws Exception {
Connection connection = mock( Connection.class );
DataSource ds = mock( DataSource.class );
when( ds.getConnection() ).thenReturn( connection );
when( ds.getConnection( anyString(), anyString() ) ).thenReturn( connection );

DataSourceProviderInterface provider = mock( DataSourceProviderInterface.class );
when( provider.getNamedDataSource( anyString(), any( DataSourceProviderInterface.DatasourceType.class ) ) )
.thenReturn( ds );

Database db = new Database( log, meta );

final DataSourceProviderInterface existing =
DataSourceProviderFactory.getDataSourceProviderInterface();
try {
DataSourceProviderFactory.setDataSourceProviderInterface( provider );
db.normalConnect( null );
} finally {
DataSourceProviderFactory.setDataSourceProviderInterface( existing );
}

assertEquals( connection, db.getConnection() );
return provider;
}


@Test
public void jndiAccessTypePrevailsPooled() throws Exception {
// this test is a guard of Database.normalConnect() contract:
// it firstly tries to use JNDI name
DatabaseMeta meta = new DatabaseMeta();
meta.setAccessType( DatabaseMeta.TYPE_ACCESS_JNDI );
meta.setUsingConnectionPool( true );

DataSourceProviderInterface provider = testUsesCustomDsProviderIfSet( meta );
verify( provider ).getNamedDataSource( anyString(), eq( JNDI ) );
verify( provider, never() ).getNamedDataSource( anyString(), eq( POOLED ) );
}
}

0 comments on commit 21313c1

Please sign in to comment.