Skip to content

Commit

Permalink
BSE-7710 - [TSK-3436] Concurrent access to valueMetaPluginClasses
Browse files Browse the repository at this point in the history
  • Loading branch information
petrprochy committed May 5, 2023
1 parent 626ae5b commit f19deb7
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 44 deletions.
5 changes: 3 additions & 2 deletions core/src/main/java/org/pentaho/di/core/database/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2023 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -203,9 +203,10 @@ public class Database implements VariableSpace, LoggingObjectInterface, Closeabl

private static void initValueMetaPluginClasses() {
try {
valueMetaPluginClasses = ValueMetaFactory.getValueMetaPluginClasses();
final List<ValueMetaInterface> valueMetaPluginClasses = ValueMetaFactory.getValueMetaPluginClasses();
// Reverse the sort list
valueMetaPluginClasses.sort( ( o1, o2 ) -> ( Integer.compare( o1.getType(), o2.getType() ) ) * -1 );
Database.valueMetaPluginClasses = valueMetaPluginClasses;
} catch ( Exception e ) {
throw new IllegalStateException( "Unable to get list of instantiated value meta plugin classes", e );
}
Expand Down
142 changes: 100 additions & 42 deletions core/src/test/java/org/pentaho/di/core/database/DatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2023 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,6 +22,51 @@

package org.pentaho.di.core.database;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.database.DataSourceProviderInterface.DatasourceType;
import org.pentaho.di.core.exception.KettleDatabaseBatchException;
import org.pentaho.di.core.exception.KettleDatabaseException;
import org.pentaho.di.core.exception.KettlePluginException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaNumber;
import org.pentaho.di.core.row.value.ValueMetaPluginType;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.junit.rules.RestorePDIEnvironment;
import org.springframework.mock.jndi.SimpleNamingContextBuilder;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactoryBuilder;
import javax.naming.spi.NamingManager;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -30,11 +75,11 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.AdditionalMatchers.or;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.AdditionalMatchers.aryEq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
Expand All @@ -45,46 +90,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.sql.BatchUpdateException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.Properties;

import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import javax.naming.spi.InitialContextFactoryBuilder;
import javax.naming.spi.NamingManager;
import javax.sql.DataSource;

import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleClientEnvironment;
import org.pentaho.di.core.database.DataSourceProviderInterface.DatasourceType;
import org.pentaho.di.core.exception.KettleDatabaseBatchException;
import org.pentaho.di.core.exception.KettleDatabaseException;
import org.pentaho.di.core.logging.LogLevel;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaNumber;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.junit.rules.RestorePDIEnvironment;
import org.springframework.mock.jndi.SimpleNamingContextBuilder;

@SuppressWarnings( "deprecation" )
public class DatabaseTest {

Expand Down Expand Up @@ -959,5 +964,58 @@ public void testGetTableFieldsMetaTrueProperty() throws Exception {
verify( db, times( 1 ) ).getTableFieldsMetaByDbMeta( any(), any() );
}

@Test
public void concurrentValueMetaModification() throws Exception {
AtomicBoolean stopMod = new AtomicBoolean();
Runnable mod = () -> {
while (!stopMod.get()) {
final PluginRegistry reg = PluginRegistry.getInstance();
final PluginInterface plugin = reg.getPlugin( ValueMetaPluginType.class, "10" ); // Timestamp type
try {
reg.registerPlugin( ValueMetaPluginType.class, plugin );
} catch ( KettlePluginException e ) {
throw new RuntimeException( e );
}
}
};
new Thread(mod).start();

DatabaseMeta meta = new DatabaseMeta();
final Database db = spy( new Database( log, meta ) );
db.setConnection( conn );

final Statement stm = mock( Statement.class );
when( conn.createStatement() ).thenReturn( stm );
when( stm.executeQuery( any( String.class ) ) ).thenReturn( rs );
when( rs.getMetaData() ).thenReturn( rsMetaData );
when( rsMetaData.getColumnCount() ).thenReturn( 2 );

// id BIGSERIAL, SQL type: BIGINT
when( rsMetaData.getColumnLabel( 1 ) ).thenReturn( "id_directory" );
when( rsMetaData.getColumnType( 1 ) ).thenReturn( Types.BIGINT );
when( rsMetaData.isSigned( 1 ) ).thenReturn( true );
when( rsMetaData.getColumnTypeName( 1 ) ).thenReturn( "bigserial" );
when( rsMetaData.getPrecision( 1 ) ).thenReturn( 19 );

// log_date TIMESTAMP, SQL type: TIMESTAMP
when( rsMetaData.getColumnLabel( 2 ) ).thenReturn( "log_date" );
when( rsMetaData.getColumnType( 2 ) ).thenReturn( Types.TIMESTAMP );
when( rsMetaData.isSigned( 2 ) ).thenReturn( false );
when( rsMetaData.getColumnTypeName( 2 ) ).thenReturn( "timestamp" );
when( rsMetaData.getPrecision( 2 ) ).thenReturn( 29 );
when( rsMetaData.getScale( 2 ) ).thenReturn( 6 );

int i = 0;
try {
while ( i < 5000 ) {
assertNotNull( db.openQuery( sql ) );
assertEquals( db.getReturnRowMeta().getValueMeta( 0 ).getType(), ValueMetaInterface.TYPE_INTEGER );
assertEquals( db.getReturnRowMeta().getValueMeta( 1 ).getType(), ValueMetaInterface.TYPE_TIMESTAMP );
i++;
}
} finally {
System.out.println( "Finished iteration: #" + i );
stopMod.set( true );
}
}
}

0 comments on commit f19deb7

Please sign in to comment.