Skip to content

Commit

Permalink
PDI-11215: Cannot Edit Multiway Merge Join After Deleting an Input Hop.
Browse files Browse the repository at this point in the history
  • Loading branch information
rpbouman committed Jun 19, 2015
1 parent a285d16 commit 0006b9e
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 145 deletions.
175 changes: 106 additions & 69 deletions engine/src/org/pentaho/di/trans/steps/multimerge/MultiMergeJoin.java
Expand Up @@ -22,13 +22,9 @@

package org.pentaho.di.trans.steps.multimerge;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;

import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.exception.KettleException;
Expand All @@ -39,9 +35,11 @@
import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepIOMetaInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
Expand Down Expand Up @@ -72,101 +70,134 @@ public MultiMergeJoin( StepMeta stepMeta, StepDataInterface stepDataInterface, i
super( stepMeta, stepDataInterface, copyNr, transMeta, trans );
}

private void processFirstRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
private boolean processFirstRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (MultiMergeJoinMeta) smi;
data = (MultiMergeJoinData) sdi;
// Find the RowSet to read from
//
String[] prevStepNames = getTransMeta().getPrevStepNames( getStepname() );
Set<String> infoStepNameSet = new HashSet<String>();
if ( prevStepNames != null ) {
Collections.addAll( infoStepNameSet, prevStepNames );
}
String[] infoStepNames = meta.getStepIOMeta().getInfoStepnames();
infoStepNameSet.retainAll( Arrays.asList( infoStepNames ) );

int streamSize = infoStepNameSet.size();
String[] inputStepNames = infoStepNameSet.toArray( new String[streamSize] );
TransMeta transMeta = getTransMeta();
TransHopMeta transHopMeta;

StepIOMetaInterface stepIOMeta = meta.getStepIOMeta();
List<StreamInterface> infoStreams = stepIOMeta.getInfoStreams();
StreamInterface stream;
StepMeta toStepMeta = meta.getParentStepMeta();
StepMeta fromStepMeta;

ArrayList<String> inputStepNameList = new ArrayList<String>();
String[] inputStepNames = meta.getInputSteps();
String inputStepName;

for ( int i = 0; i < infoStreams.size(); i++ ) {
inputStepName = inputStepNames[i];
stream = infoStreams.get( i );
fromStepMeta = stream.getStepMeta();
if ( fromStepMeta == null ) {
//should not arrive here, shoud typically have been caught by init.
throw new KettleException(
BaseMessages.getString( PKG, "MultiMergeJoin.Log.UnableToFindReferenceStream", inputStepName ) );
}
//check the hop
transHopMeta = transMeta.findTransHop( fromStepMeta, toStepMeta, true );
//there is no hop: this is unexpected.
if ( transHopMeta == null ) {
//should not arrive here, shoud typically have been caught by init.
throw new KettleException(
BaseMessages.getString( PKG, "MultiMergeJoin.Log.UnableToFindReferenceStream", inputStepName ) );
} else if ( transHopMeta.isEnabled() ) {
inputStepNameList.add( inputStepName );
} else {
logDetailed( BaseMessages.getString( PKG, "MultiMergeJoin.Log.IgnoringStep", inputStepName ) );
}
}

int streamSize = inputStepNameList.size();
if ( streamSize == 0 ) {
return false;
}

String keyField;
String[] keyFields;

data.rowSets = new RowSet[streamSize];
RowSet rowSet;
Object[] row;
data.rows = new Object[streamSize][];
data.metas = new RowMetaInterface[streamSize];
data.rowLengths = new int[streamSize];
data.queue =
new PriorityQueue<MultiMergeJoinData.QueueEntry>( streamSize, new MultiMergeJoinData.QueueComparator(
data ) );
MultiMergeJoinData.QueueComparator comparator = new MultiMergeJoinData.QueueComparator( data );
data.queue = new PriorityQueue<MultiMergeJoinData.QueueEntry>( streamSize, comparator );
data.results = new ArrayList<List<Object[]>>( streamSize );
MultiMergeJoinData.QueueEntry queueEntry;
data.queueEntries = new MultiMergeJoinData.QueueEntry[streamSize];
data.drainIndices = new int[streamSize];
for ( int i = 0; i < streamSize; i++ ) {
data.keyNrs = new int[streamSize][];
data.dummy = new Object[streamSize][];

RowMetaInterface rowMeta;
data.outputRowMeta = new RowMeta();
for ( int i = 0, j = 0; i < inputStepNames.length; i++ ) {
inputStepName = inputStepNames[i];
data.queueEntries[i] = new MultiMergeJoinData.QueueEntry();
data.queueEntries[i].index = i;
data.results.add( new ArrayList<Object[]>() );
data.rowSets[i] = findInputRowSet( inputStepName );
if ( data.rowSets[i] == null ) {
throw new KettleException( BaseMessages.getString(
PKG, "MultiMergeJoin.Exception.UnableToFindSpecifiedStep", inputStepName ) );
}
data.rows[i] = getRowFrom( data.rowSets[i] );
if ( data.rows[i] == null ) {
data.metas[i] = getTransMeta().getStepFields( inputStepName );
} else {
data.queueEntries[i].row = data.rows[i];
data.metas[i] = data.rowSets[i].getRowMeta();
if ( !inputStepNameList.contains( inputStepName ) ) {
//ignore step with disabled hop.
continue;
}

data.rowLengths[i] = data.metas[i].size();
}
queueEntry = new MultiMergeJoinData.QueueEntry();
queueEntry.index = j;
data.queueEntries[j] = queueEntry;

//
data.outputRowMeta = new RowMeta();
for ( int i = 0; i < streamSize; i++ ) {
data.outputRowMeta.mergeRowMeta( data.metas[i].clone() );
}
data.results.add( new ArrayList<Object[]>() );

data.keyNrs = new int[streamSize][];
rowSet = findInputRowSet( inputStepName );
if ( rowSet == null ) {
throw new KettleException( BaseMessages.getString(
PKG, "MultiMergeJoin.Exception.UnableToFindSpecifiedStep", inputStepName ) );
}
data.rowSets[j] = rowSet;

for ( int j = 0; j < streamSize; j++ ) {
if ( data.rows[j] != null ) {
/*
* // Find the key indexes: data.keyNrs[j] = new int[meta.getKeyFields().length]; for (int
* i=0;i<meta.getKeyFields().length; i++) { data.keyNrs[j][i] =
* data.metas[j].indexOfValue(meta.getKeyFields()[i]); if (data.keyNrs[j][i]<0) { String message =
* BaseMessages.getString(PKG,
* "MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream",meta.getKeyFields()[i]); logError(message);
* throw new KettleStepException(message); } }
*/
String[] keyFields = meta.getKeyFields()[j].split( "," );
data.keyNrs[j] = new int[keyFields.length];
for ( int i = 0; i < keyFields.length; i++ ) {
data.keyNrs[j][i] = data.metas[j].indexOfValue( keyFields[i] );
if ( data.keyNrs[j][i] < 0 ) {
row = getRowFrom( rowSet );
data.rows[j] = row;
if ( row == null ) {
rowMeta = getTransMeta().getStepFields( inputStepName );
data.metas[j] = rowMeta;
} else {
queueEntry.row = row;
rowMeta = rowSet.getRowMeta();

keyField = meta.getKeyFields()[i];
String[] keyFieldParts = keyField.split( "," );
String keyFieldPart;
data.keyNrs[j] = new int[keyFieldParts.length];
for ( int k = 0; k < keyFieldParts.length; k++ ) {
keyFieldPart = keyFieldParts[k];
data.keyNrs[j][k] = rowMeta.indexOfValue( keyFieldPart );
if ( data.keyNrs[j][k] < 0 ) {
String message =
BaseMessages.getString( PKG, "MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream", meta
.getKeyFields()[i] );
BaseMessages.getString( PKG, "MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream", keyFieldPart, inputStepName );
logError( message );
throw new KettleStepException( message );
}
}
data.metas[j] = rowMeta;
data.queue.add( data.queueEntries[j] );
}
data.outputRowMeta.mergeRowMeta( rowMeta.clone() );
data.rowLengths[j] = rowMeta.size();
data.dummy[j] = RowDataUtil.allocateRowData( rowMeta.size() );
j++;
}

data.dummy = new Object[streamSize][];
for ( int i = 0; i < streamSize; i++ ) {
// Calculate dummy... defaults to null
data.dummy[i] = RowDataUtil.allocateRowData( data.metas[i].size() );
}
return true;
}

public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws KettleException {
meta = (MultiMergeJoinMeta) smi;
data = (MultiMergeJoinData) sdi;

if ( first ) {
processFirstRow( smi, sdi );
if ( !processFirstRow( smi, sdi ) ) {
setOutputDone();
return false;
}
first = false;
}

Expand Down Expand Up @@ -352,10 +383,16 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
data = (MultiMergeJoinData) sdi;

if ( super.init( smi, sdi ) ) {
List<StreamInterface> infoStreams = meta.getStepIOMeta().getInfoStreams();
StepIOMetaInterface stepIOMeta = meta.getStepIOMeta();
String[] inputStepNames = meta.getInputSteps();
String inputStepName;
List<StreamInterface> infoStreams = stepIOMeta.getInfoStreams();
StreamInterface stream;
for ( int i = 0; i < infoStreams.size(); i++ ) {
if ( infoStreams.get( i ).getStepMeta() == null ) {
logError( BaseMessages.getString( PKG, "MultiMergeJoin.Log.BothTrueAndFalseNeeded" ) );
inputStepName = inputStepNames[i];
stream = infoStreams.get( i );
if ( stream.getStepMeta() == null ) {
logError( BaseMessages.getString( PKG, "MultiMergeJoin.Log.UnableToFindReferenceStream", inputStepName ) );
return false;
}
}
Expand Down
Expand Up @@ -3,24 +3,24 @@
#
#Wed Jul 06 09:28:28 CEST 2011
MultiMergeJoinDialog.InputNeedSort.DialogTitle=Warning\!
MultiMergeJoin.LineNumber=linenr
MultiMergeJoin.LineNumber=linenr
MultiMergeJoin.Exception.UnableToFindSpecifiedStep=Unable to find specified source step with name ''{0}''.
MultiMergeJoinDialog.InputNeedSort.Option2=Please, don''t show this warning anymore.
MultiMergeJoinDialog.Type.Label=Join Type\:
MultiMergeJoinDialog.InputNeedSort.Option1=I understand
MultiMergeJoinMeta.Exception.UnableToLoadStepInfo=Unable to load step info from XML
MultiMergeJoin.Exception.DuplicateFieldnamesInResult=The result of this merge join would contain duplicate fieldnames in the result (fieldname\={0}). At this time, this is not supported. Please change the names of the input fields.
MultiMergeJoinMeta.Exception.FlagFieldNotSpecified=The flag field is not specified.
MultiMergeJoin.Log.BothTrueAndFalseNeeded=Exactly two input streams must be supplied
MultiMergeJoin.Log.UnableToFindReferenceStream=Unable to find reference stream [{0}]
MultiMergeJoin.Exception.InvalidKeyLayoutDetected=Invalid layout detected in input streams, keys to join have to be of the same type in both streams
MultiMergeJoinMeta.Exception.UnableToSaveStepInfo=Unable to save step information to the repository for id_step\=
MultiMergeJoinMeta.InputStep=Input Step
MultiMergeJoinMeta.SelectKeys=Select Keys
MultiMergeJoin.Log.DataInfo=ONE\: {0} / TWO\:
MultiMergeJoin.Log.DataInfo=ONE\: {0} / TWO\:
MultiMergeJoinDialog.Keys.Label=Keys for input steps\:
MultiMergeJoinDialog.KeyFields.Button=\ Get key fields
MultiMergeJoinDialog.Stepname.Label=Step name
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Unable to find field [{0}] in reference stream.
MultiMergeJoinDialog.KeyFields.Button=\ Get key fields
MultiMergeJoinDialog.Stepname.Label=Step name
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Unable to find field [{0}] in reference stream [{1}].
MultiMergeJoinMeta.InfoStream.SecondStream.Description=Right hand side stream of the join
MultiMergeJoin.InfoStream.Description=Description
MultiMergeJoin.Log.InvalidJoinType=Invalid join type {0}
Expand All @@ -30,7 +30,7 @@ MultiMergeJoinDialog.Keys=Keys
MultiMergeJoinMeta.InfoStream.Description=Input stream of the join
MultiMergeJoinDialog.Shell.Label=Multiway Merge Join
MultiMergeJoinDialog.ErrorGettingFields.DialogTitle=Error getting fields
MultiMergeJoinDialog.ErrorGettingFields.DialogMessage=Unable to get the fields because of an error\:
MultiMergeJoinDialog.ErrorGettingFields.DialogMessage=Unable to get the fields because of an error\:
MultiMergeJoinMeta.CheckResult.StepNotVerified=This step is not yet verified\: not yet implemented.
MultiMergeJoinMeta.JoinKeys=Join Keys
MultiMergeJoinDialog.ColumnInfo.KeyField=Key field
Expand Up @@ -9,14 +9,14 @@ MultiMergeJoinDialog.InputNeedSort.Option2=Ne plus afficher ce message
MultiMergeJoinDialog.Type.Label=Type jointure
MultiMergeJoinDialog.InputNeedSort.Option1=J''ai compris
MultiMergeJoinMeta.Exception.UnableToLoadStepInfo=Erreur lors du chargement des informations de l''\u00E9tape depuis le fichier XML.
MultiMergeJoin.Log.BothTrueAndFalseNeeded=Deux flux entrants doivent \u00EAtre s\u00E9lectionn\u00E9s.
MultiMergeJoin.Log.UnableToFindReferenceStream=Le flux de r\u00E9f\u00E9rence [{0}] est introuvable.
MultiMergeJoinMeta.Exception.UnableToSaveStepInfo=Erreur lors de la sauvegarde dans le r\u00E9f\u00E9rentiel, des informations de l''\u00E9tape identifi\u00E9e par l''id\=
MultiMergeJoinMeta.InputStep=Etape source
MultiMergeJoinMeta.SelectKeys=S\u00E9lection de cl\u00E9s
MultiMergeJoin.Log.DataInfo=UN\: {0} / DEUX\:
MultiMergeJoinDialog.KeyFields.Button=R\u00E9cup\u00E9rer champs cl\u00E9s
MultiMergeJoinDialog.Stepname.Label=Nom \u00E9tape
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Le champ [{0}] est introuvable dans le flux de r\u00E9f\u00E9rence.
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Le champ [{0}] est introuvable dans le flux de r\u00E9f\u00E9rence [{1}].
MultiMergeJoin.InfoStream.Description=Description
MultiMergeJoin.Log.InvalidJoinType=Type de jointure {0} invalide
MultiMergeJoinMeta.Exception.UnexpectedErrorReadingStepInfo=Erreur lors de la lecture depuis le r\u00E9f\u00E9rentiel, des informations de l''\u00E9tape
Expand Down
Expand Up @@ -9,14 +9,14 @@ MultiMergeJoinDialog.InputNeedSort.Option2=Per favore, non mostrare pi\u00F9 que
MultiMergeJoinDialog.Type.Label=Tipo di join\:
MultiMergeJoinDialog.InputNeedSort.Option1=Ho capito
MultiMergeJoinMeta.Exception.UnableToLoadStepInfo=Impossibile caricare le informazioni del passo da XML
MultiMergeJoin.Log.BothTrueAndFalseNeeded=Devono essere forniti esattamente due input stream.
MultiMergeJoin.Log.UnableToFindReferenceStream=Impossibile trovare il stream di riferimento [{0}].
MultiMergeJoinMeta.Exception.UnableToSaveStepInfo=Impossibile salvare le informazioni chiave del passo nel repository per id_step\=
MultiMergeJoinMeta.InputStep=Passo di input
MultiMergeJoinMeta.SelectKeys=Seleziona le chiavi
MultiMergeJoin.Log.DataInfo=UNO\: {0} / DUE\:
MultiMergeJoin.Log.DataInfo=UNO\: {0} / DUE\:
MultiMergeJoinDialog.KeyFields.Button=Preleva campi chiave
MultiMergeJoinDialog.Stepname.Label=Nome del passo
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Impossibile trovare il campo [{0}] nello stream di riferimento.
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Impossibile trovare il campo [{0}] nello stream di riferimento [{1}].
MultiMergeJoin.InfoStream.Description=Descrizione
MultiMergeJoin.Log.InvalidJoinType=Tipo di join invalido {0}
MultiMergeJoinMeta.Exception.UnexpectedErrorReadingStepInfo=Errore inatteso durante la lettura delle informazioni del passo dal repository
Expand Down
Expand Up @@ -3,24 +3,24 @@
#
#Wed Jul 06 09:28:28 CEST 2011
MultiMergeJoinDialog.InputNeedSort.DialogTitle=\u8b66\u544a\!
MultiMergeJoin.LineNumber=linenr
MultiMergeJoin.LineNumber=linenr
MultiMergeJoin.Exception.UnableToFindSpecifiedStep=Unable to find specified source step with name ''{0}''.
MultiMergeJoinDialog.InputNeedSort.Option2=\u4eca\u5f8c\u3053\u306e\u8b66\u544a\u3092\u8868\u793a\u3057\u306a\u3044
MultiMergeJoinDialog.Type.Label=Join\u30bf\u30a4\u30d7\:
MultiMergeJoinDialog.InputNeedSort.Option1=ok
MultiMergeJoinMeta.Exception.UnableToLoadStepInfo=Unable to load step info from XML
MultiMergeJoin.Exception.DuplicateFieldnamesInResult=The result of this merge join would contain duplicate fieldnames in the result (fieldname\={0}). At this time, this is not supported. Please change the names of the input fields.
MultiMergeJoinMeta.Exception.FlagFieldNotSpecified=The flag field is not specified.
MultiMergeJoin.Log.BothTrueAndFalseNeeded=Exactly two input streams must be supplied
MultiMergeJoin.Log.UnableToFindReferenceStream=Unable to find reference stream [{0}]
MultiMergeJoin.Exception.InvalidKeyLayoutDetected=Invalid layout detected in input streams, keys to join have to be of the same type in both streams
MultiMergeJoinMeta.Exception.UnableToSaveStepInfo=Unable to save step information to the repository for id_step\=
MultiMergeJoinMeta.InputStep=Input Step
MultiMergeJoinMeta.SelectKeys=Select Keys
MultiMergeJoin.Log.DataInfo=ONE\: {0} / TWO\:
MultiMergeJoin.Log.DataInfo=ONE\: {0} / TWO\:
MultiMergeJoinDialog.Keys.Label=\u5165\u529b\u30b9\u30c6\u30c3\u30d7\u306e\u30ad\u30fc\:
MultiMergeJoinDialog.KeyFields.Button=\ \u30ad\u30fc\u30d5\u30a3\u30fc\u30eb\u30c9\u53d6\u5f97
MultiMergeJoinDialog.KeyFields.Button=\ \u30ad\u30fc\u30d5\u30a3\u30fc\u30eb\u30c9\u53d6\u5f97
MultiMergeJoinDialog.Stepname.Label=\u30b9\u30c6\u30c3\u30d7\u540d
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Unable to find field [{0}] in reference stream.
MultiMergeJoin.Exception.UnableToFindFieldInReferenceStream=Unable to find field [{0}] in reference stream [{1}].
MultiMergeJoinMeta.InfoStream.SecondStream.Description=Right hand side stream of the join
MultiMergeJoin.InfoStream.Description=Description
MultiMergeJoin.Log.InvalidJoinType=Invalid join type {0}
Expand All @@ -30,7 +30,7 @@ MultiMergeJoinDialog.Keys=\u30ad\u30fc
MultiMergeJoinMeta.InfoStream.Description=Input stream of the join
MultiMergeJoinDialog.Shell.Label=\u591a\u91cd\u30de\u30fc\u30b8\u30b8\u30e7\u30a4\u30f3
MultiMergeJoinDialog.ErrorGettingFields.DialogTitle=\u30d5\u30a3\u30fc\u30eb\u30c9\u53d6\u5f97\u30a8\u30e9\u30fc
MultiMergeJoinDialog.ErrorGettingFields.DialogMessage=\u30a8\u30e9\u30fc\u306e\u70ba\u3001\u30d5\u30a3\u30fc\u30eb\u30c9\u3092\u53d6\u5f97\u3059\u308b\u4e8b\u304c\u51fa\u6765\u307e\u305b\u3093\:
MultiMergeJoinDialog.ErrorGettingFields.DialogMessage=\u30a8\u30e9\u30fc\u306e\u70ba\u3001\u30d5\u30a3\u30fc\u30eb\u30c9\u3092\u53d6\u5f97\u3059\u308b\u4e8b\u304c\u51fa\u6765\u307e\u305b\u3093\:
MultiMergeJoinMeta.CheckResult.StepNotVerified=This step is not yet verified\: not yet implemented.
MultiMergeJoinMeta.JoinKeys=Join Keys
MultiMergeJoinDialog.ColumnInfo.KeyField=\u30ad\u30fc\u30d5\u30a3\u30fc\u30eb\u30c9
Expand Down

0 comments on commit 0006b9e

Please sign in to comment.