Skip to content

Commit

Permalink
[BACKLOG-6496] - Database lookup step with "=" comparator fails on 6.1
Browse files Browse the repository at this point in the history
- use only those fields that are needed for look-up (fix an error introduced in PDI-3314)
- add a test
  • Loading branch information
Andrey Khayrutdinov committed Feb 8, 2016
1 parent d305a1e commit 44cfdb2
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 22 deletions.
Expand Up @@ -481,16 +481,18 @@ private void loadAllTableDataIntoTheCache() throws KettleException {
} }


private void putToDefaultCache( Database db, List<Object[]> rows ) { private void putToDefaultCache( Database db, List<Object[]> rows ) {
RowMetaInterface returnRowMeta = db.getReturnRowMeta(); final int keysAmount = meta.getStreamKeyField1().length;
RowMetaInterface prototype = copyValueMetasFrom( db.getReturnRowMeta(), keysAmount );

// Copy the data into 2 parts: key and value... // Copy the data into 2 parts: key and value...
// //
for ( Object[] row : rows ) { for ( Object[] row : rows ) {
int index = 0; int index = 0;
// not sure it is efficient to re-create the same on every row, // not sure it is efficient to re-create the same on every row,
// but this was done earlier, so I'm keeping this behaviour // but this was done earlier, so I'm keeping this behaviour
RowMetaInterface keyMeta = returnRowMeta.clone(); RowMetaInterface keyMeta = prototype.clone();
Object[] keyData = new Object[ meta.getStreamKeyField1().length ]; Object[] keyData = new Object[ keysAmount ];
for ( int i = 0; i < meta.getStreamKeyField1().length; i++ ) { for ( int i = 0; i < keysAmount; i++ ) {
keyData[ i ] = row[ index++ ]; keyData[ i ] = row[ index++ ];
} }
// RowMeta valueMeta = new RowMeta(); // RowMeta valueMeta = new RowMeta();
Expand All @@ -506,6 +508,16 @@ private void putToDefaultCache( Database db, List<Object[]> rows ) {
} }
} }


private RowMetaInterface copyValueMetasFrom( RowMetaInterface source, int n ) {
RowMeta result = new RowMeta();
for ( int i = 0; i < n; i++ ) {
// don't need cloning here,
// because value metas will be cloned during iterating through rows
result.addValueMeta( source.getValueMeta( i ) );
}
return result;
}

private void putToReadOnlyCache( Database db, List<Object[]> rows ) { private void putToReadOnlyCache( Database db, List<Object[]> rows ) {
ReadAllCache.Builder cacheBuilder = new ReadAllCache.Builder( data, rows.size() ); ReadAllCache.Builder cacheBuilder = new ReadAllCache.Builder( data, rows.size() );


Expand Down
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -52,13 +52,14 @@
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.ResultSetMetaData; import java.sql.ResultSetMetaData;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
Expand Down Expand Up @@ -112,9 +113,9 @@ private StepMockHelper<DatabaseLookupMeta, DatabaseLookupData> createMockHelper(
when( mockHelper.trans.findRowSet( anyString(), anyInt(), anyString(), anyInt() ) ).thenReturn( rowSet ); when( mockHelper.trans.findRowSet( anyString(), anyInt(), anyString(), anyInt() ) ).thenReturn( rowSet );


when( mockHelper.transMeta.findNextSteps( Matchers.any( StepMeta.class ) ) ) when( mockHelper.transMeta.findNextSteps( Matchers.any( StepMeta.class ) ) )
.thenReturn( singletonList( mock( StepMeta.class ) ) ); .thenReturn( Collections.singletonList( mock( StepMeta.class ) ) );
when( mockHelper.transMeta.findPreviousSteps( any( StepMeta.class ), anyBoolean() ) ) when( mockHelper.transMeta.findPreviousSteps( any( StepMeta.class ), anyBoolean() ) )
.thenReturn( singletonList( mock( StepMeta.class ) ) ); .thenReturn( Collections.singletonList( mock( StepMeta.class ) ) );


return mockHelper; return mockHelper;
} }
Expand Down Expand Up @@ -276,41 +277,82 @@ private DatabaseLookupData getCreatedData( boolean allEquals ) throws Exception
returnRowMeta.addValueMeta( new ValueMetaInteger() ); returnRowMeta.addValueMeta( new ValueMetaInteger() );
when( db.getReturnRowMeta() ).thenReturn( returnRowMeta ); when( db.getReturnRowMeta() ).thenReturn( returnRowMeta );


DatabaseMeta dbMeta = mock( DatabaseMeta.class );

StepMockHelper<DatabaseLookupMeta, DatabaseLookupData> mockHelper = createMockHelper(); StepMockHelper<DatabaseLookupMeta, DatabaseLookupData> mockHelper = createMockHelper();
DatabaseLookupMeta meta = createTestMeta();
DatabaseLookupData data = new DatabaseLookupData();

DatabaseLookup step = createSpiedStep( db, mockHelper, meta );
step.init( meta, data );



data.db = db;
data.keytypes = new int[] { ValueMetaInterface.TYPE_INTEGER };
if ( allEquals ) {
data.allEquals = true;
data.conditions = new int[] { DatabaseLookupMeta.CONDITION_EQ };
} else {
data.allEquals = false;
data.conditions = new int[] { DatabaseLookupMeta.CONDITION_LT };
}
step.processRow( meta, data );

return data;
}

private DatabaseLookupMeta createTestMeta() {
DatabaseLookupMeta meta = new DatabaseLookupMeta(); DatabaseLookupMeta meta = new DatabaseLookupMeta();
meta.setCached( true ); meta.setCached( true );
meta.setLoadingAllDataInCache( true ); meta.setLoadingAllDataInCache( true );
meta.setDatabaseMeta( dbMeta ); meta.setDatabaseMeta( mock( DatabaseMeta.class ) );
// it's ok here, we won't do actual work // it's ok here, we won't do actual work
meta.allocate( 1, 0 ); meta.allocate( 1, 0 );
meta.setStreamKeyField1( new String[] { "Test" } ); meta.setStreamKeyField1( new String[] { "Test" } );
return meta;
}


DatabaseLookupData data = new DatabaseLookupData(); private DatabaseLookup createSpiedStep( Database db,

StepMockHelper<DatabaseLookupMeta, DatabaseLookupData> mockHelper,
DatabaseLookupMeta meta ) throws KettleException {
DatabaseLookup step = spyLookup( mockHelper, db, meta.getDatabaseMeta() ); DatabaseLookup step = spyLookup( mockHelper, db, meta.getDatabaseMeta() );
doNothing().when( step ).determineFieldsTypesQueryingDb(); doNothing().when( step ).determineFieldsTypesQueryingDb();
doReturn( null ).when( step ).lookupValues( any( RowMetaInterface.class ), any( Object[].class ) ); doReturn( null ).when( step ).lookupValues( any( RowMetaInterface.class ), any( Object[].class ) );


RowMeta input = new RowMeta(); RowMeta input = new RowMeta();
input.addValueMeta( new ValueMetaInteger( "Test" ) ); input.addValueMeta( new ValueMetaInteger( "Test" ) );
step.setInputRowMeta( input ); step.setInputRowMeta( input );
step.init( meta, data ); return step;
}



@Test
public void createsReadDefaultCache_AndUsesOnlyNeededFieldsFromMeta() throws Exception {
Database db = mock( Database.class );
when( db.getRows( anyString(), anyInt() ) )
.thenReturn( Arrays.asList( new Object[] { 1L }, new Object[] { 2L } ) );

RowMeta returnRowMeta = new RowMeta();
returnRowMeta.addValueMeta( new ValueMetaInteger() );
returnRowMeta.addValueMeta( new ValueMetaInteger() );
when( db.getReturnRowMeta() ).thenReturn( returnRowMeta );

StepMockHelper<DatabaseLookupMeta, DatabaseLookupData> mockHelper = createMockHelper();
DatabaseLookupMeta meta = createTestMeta();
DatabaseLookupData data = new DatabaseLookupData();

DatabaseLookup step = createSpiedStep( db, mockHelper, meta );
step.init( meta, data );


data.db = db; data.db = db;
data.keytypes = new int[] { ValueMetaInterface.TYPE_INTEGER }; data.keytypes = new int[] { ValueMetaInterface.TYPE_INTEGER };
if ( allEquals ) { data.allEquals = true;
data.allEquals = true; data.conditions = new int[] { DatabaseLookupMeta.CONDITION_EQ };
data.conditions = new int[] { DatabaseLookupMeta.CONDITION_EQ };
} else {
data.allEquals = false;
data.conditions = new int[] { DatabaseLookupMeta.CONDITION_LT };
}
step.processRow( meta, data ); step.processRow( meta, data );


return data; data.lookupMeta = new RowMeta();
data.lookupMeta.addValueMeta( new ValueMetaInteger() );

assertNotNull( data.cache.getRowFromCache( data.lookupMeta, new Object[] { 1L } ) );
assertNotNull( data.cache.getRowFromCache( data.lookupMeta, new Object[] { 2L } ) );
} }
} }

0 comments on commit 44cfdb2

Please sign in to comment.