Skip to content

Commit

Permalink
[PDI-3314] - very slow database lookup if "not all equal conditions" …
Browse files Browse the repository at this point in the history
…and "load all rows at start"

- change DatabaseLookupData
  - introduce Cache interface and DefaultCache as its default implementation
  - copy existing code to DefaultCache
  - fix the rest to let it be compiled
  • Loading branch information
Andrey Khayrutdinov committed Jan 11, 2016
1 parent da2de70 commit 2b6d041
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 164 deletions.
Expand Up @@ -118,7 +118,7 @@ private synchronized Object[] lookupValues( RowMetaInterface inputRowMeta, Objec

// First, check if we looked up before
if ( meta.isCached() ) {
add = getRowFromCache( data.lookupMeta, lookupRow );
add = data.cache.getRowFromCache( data.lookupMeta, lookupRow );
if ( add != null ) {
cacheHit = true;
}
Expand Down Expand Up @@ -205,7 +205,7 @@ private synchronized Object[] lookupValues( RowMetaInterface inputRowMeta, Objec
// If we already loaded all data into the cache, storing more makes no sense.
//
if ( meta.isCached() && cache_now && !meta.isLoadingAllDataInCache() && data.allEquals ) {
storeRowInCache( data.lookupMeta, lookupRow, add );
data.cache.storeRowInCache( meta, data.lookupMeta, lookupRow, add );
}

for ( int i = 0; i < data.returnMeta.size(); i++ ) {
Expand All @@ -215,142 +215,6 @@ private synchronized Object[] lookupValues( RowMetaInterface inputRowMeta, Objec
return outputRow;
}

@VisibleForTesting
void storeRowInCache( RowMetaInterface lookupMeta, Object[] lookupRow, Object[] add ) {

RowMetaAndData rowMetaAndData = new RowMetaAndData( lookupMeta, lookupRow );
// DEinspanjer 2009-02-01 XXX: I want to write a test case to prove this point before checking in.
// /* Don't insert a row with a duplicate key into the cache. It doesn't seem
// * to serve a useful purpose and can potentially cause the step to return
// * different values over the life of the transformation (if the source DB rows change)
// * Additionally, if using the load all data feature, re-inserting would reverse the order
// * specified in the step.
// */
// if (!data.look.containsKey(rowMetaAndData)) {
// data.look.put(rowMetaAndData, new TimedRow(add));
// }
data.look.put( rowMetaAndData, new TimedRow( add ) );

// See if we have to limit the cache_size.
// Sample 10% of the rows in the cache.
// Remove everything below the second lowest date.
// That should on average remove more than 10% of the entries
// It's not exact science, but it will be faster than the old algorithm

// DEinspanjer 2009-02-01: If you had previously set a cache size and then turned on load all, this
// method would throw out entries if the previous cache size wasn't big enough.
if ( !meta.isLoadingAllDataInCache() && meta.getCacheSize() > 0 && data.look.size() > meta.getCacheSize() ) {
List<RowMetaAndData> keys = new ArrayList<RowMetaAndData>( data.look.keySet() );
List<Date> samples = new ArrayList<Date>();
int incr = keys.size() / 10;
if ( incr == 0 ) {
incr = 1;
}
for ( int k = 0; k < keys.size(); k += incr ) {
RowMetaAndData key = keys.get( k );
TimedRow timedRow = data.look.get( key );
samples.add( timedRow.getLogDate() );
}

Collections.sort( samples );

if ( samples.size() > 1 ) {
Date smallest = samples.get( 1 );

// Everything below the smallest date goes away...
for ( RowMetaAndData key : keys ) {
TimedRow timedRow = data.look.get( key );

if ( timedRow.getLogDate().compareTo( smallest ) < 0 ) {
data.look.remove( key );
}
}
}
}
}

@VisibleForTesting
Object[] getRowFromCache( RowMetaInterface lookupMeta, Object[] lookupRow ) throws KettleException {
if ( data.allEquals ) {
// only do the hashtable lookup when all equals otherwise conditions >, <, <> will give wrong results
TimedRow timedRow = data.look.get( new RowMetaAndData( data.lookupMeta, lookupRow ) );
if ( timedRow != null ) {
return timedRow.getRow();
}
} else { // special handling of conditions <,>, <> etc.
if ( !data.hasDBCondition ) { // e.g. LIKE not handled by this routine, yet
// TODO: find an alternative way to look up the data based on the condition.
// Not all conditions are "=" so we are going to have to evaluate row by row
// A sorted list or index might be a good solution here...
//
for ( RowMetaAndData key : data.look.keySet() ) {
// Now verify that the key is matching our conditions...
//
boolean match = true;
int lookupIndex = 0;
for ( int i = 0; i < data.conditions.length && match; i++ ) {
ValueMetaInterface cmpMeta = lookupMeta.getValueMeta( lookupIndex );
Object cmpData = lookupRow[ lookupIndex ];
ValueMetaInterface keyMeta = key.getValueMeta( i );
Object keyData = key.getData()[ i ];

switch( data.conditions[ i ] ) {
case DatabaseLookupMeta.CONDITION_EQ:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) == 0 );
break;
case DatabaseLookupMeta.CONDITION_NE:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) != 0 );
break;
case DatabaseLookupMeta.CONDITION_LT:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) > 0 );
break;
case DatabaseLookupMeta.CONDITION_LE:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) >= 0 );
break;
case DatabaseLookupMeta.CONDITION_GT:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) < 0 );
break;
case DatabaseLookupMeta.CONDITION_GE:
match = ( cmpMeta.compare( cmpData, keyMeta, keyData ) <= 0 );
break;
case DatabaseLookupMeta.CONDITION_IS_NULL:
match = keyMeta.isNull( keyData );
break;
case DatabaseLookupMeta.CONDITION_IS_NOT_NULL:
match = !keyMeta.isNull( keyData );
break;
case DatabaseLookupMeta.CONDITION_BETWEEN:
// Between key >= cmp && key <= cmp2
ValueMetaInterface cmpMeta2 = lookupMeta.getValueMeta( lookupIndex + 1 );
Object cmpData2 = lookupRow[ lookupIndex + 1 ];
match = ( keyMeta.compare( keyData, cmpMeta, cmpData ) >= 0 );
if ( match ) {
match = ( keyMeta.compare( keyData, cmpMeta2, cmpData2 ) <= 0 );
}
lookupIndex++;
break;
// TODO: add LIKE operator (think of changing the hasDBCondition logic then)
default:
match = false;
data.hasDBCondition = true; // avoid looping in here the next time, also safety when a new condition
// will be introduced
break;

}
lookupIndex++;
}
if ( match ) {
TimedRow timedRow = data.look.get( key );
if ( timedRow != null ) {
return timedRow.getRow();
}
}
}
}
}
return null;
}

private void determineFieldsTypesQueryingDb() throws KettleException {
final String[] keyFields = meta.getTableKeyField();
data.keytypes = new int[ keyFields.length ];
Expand Down Expand Up @@ -466,14 +330,6 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
data.outputRowMeta = getInputRowMeta().clone();
meta.getFields( data.outputRowMeta, getStepname(), null, null, this, repository, metaStore );

if ( meta.isCached() ) {
if ( meta.getCacheSize() > 0 ) {
data.look = new LinkedHashMap<RowMetaAndData, TimedRow>( (int) ( meta.getCacheSize() * 1.5 ) );
} else {
data.look = new LinkedHashMap<RowMetaAndData, TimedRow>();
}
}

data.db.setLookup(
environmentSubstitute( meta.getSchemaName() ), environmentSubstitute( meta.getTablename() ),
meta.getTableKeyField(), meta.getKeyCondition(), meta.getReturnValueField(),
Expand Down Expand Up @@ -516,6 +372,10 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws
}
}

if ( meta.isCached() ) {
data.cache = DefaultCache.newCache( data, meta.getCacheSize() );
}

determineFieldsTypesQueryingDb();

initNullIf();
Expand Down Expand Up @@ -627,7 +487,7 @@ private void loadAllTableDataIntoTheCache() throws KettleException {
}
// Store the data...
//
storeRowInCache( keyMeta, keyData, valueData );
data.cache.storeRowInCache( meta, keyMeta, keyData, valueData );
incrementLinesInput();
}
}
Expand Down Expand Up @@ -707,7 +567,7 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {

// Recover memory immediately, allow in-memory data to be garbage collected
//
data.look = null;
data.cache = null;

super.dispose( smi, sdi );
}
Expand Down
Expand Up @@ -22,11 +22,8 @@

package org.pentaho.di.trans.steps.databaselookup;

import java.util.LinkedHashMap;

import org.pentaho.di.core.RowMetaAndData;
import org.pentaho.di.core.TimedRow;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.step.BaseStepData;
import org.pentaho.di.trans.step.StepDataInterface;
Expand All @@ -36,7 +33,7 @@
* @since 24-jan-2005
*/
public class DatabaseLookupData extends BaseStepData implements StepDataInterface {
public LinkedHashMap<RowMetaAndData, TimedRow> look; // to store values in used to look up things...
public Cache cache;
public Database db;

public Object[] nullif; // Not found: default values...
Expand All @@ -58,4 +55,9 @@ public DatabaseLookupData() {
db = null;
}

public interface Cache {
Object[] getRowFromCache( RowMetaInterface lookupMeta, Object[] lookupRow ) throws KettleException;

void storeRowInCache( DatabaseLookupMeta meta, RowMetaInterface lookupMeta, Object[] lookupRow, Object[] add );
}
}

0 comments on commit 2b6d041

Please sign in to comment.