Skip to content

Commit

Permalink
[PDI-15258] Step fails when no input received
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Tucker committed May 24, 2016
1 parent 1bb9f3e commit 945b3f6
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 331 deletions.
Expand Up @@ -166,6 +166,11 @@ protected void prepareToRowProcessing() throws KettleException, KettleStepExcept
} else {
data.readrow = getRow();
data.inputRowMeta = getInputRowMeta();
if ( data.inputRowMeta == null ) {
data.hasFirstRow = false;
return;
}
data.hasFirstRow = true;
data.outputRowMeta = data.inputRowMeta.clone();

// Check if source field is provided
Expand Down Expand Up @@ -320,6 +325,9 @@ public void fileCloseError( FileObject file, FileSystemException e ) {
* get final row for output
*/
private Object[] getOneOutputRow() throws KettleException {
if ( meta.isInFields() && !data.hasFirstRow ) {
return null;
}
Object[] rawReaderRow = null;
while ( ( rawReaderRow = data.readerRowSet.getRow() ) == null ) {
if ( data.inputs.hasNext() && data.readerRowSet.isDone() ) {
Expand Down Expand Up @@ -447,6 +455,7 @@ private void createReader() throws KettleException {
data.reader.setIgnoreMissingPath( meta.isIgnoreMissingPath() );
}

@Override
public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
meta = (JsonInputMeta) smi;
data = (JsonInputData) sdi;
Expand Down
Expand Up @@ -22,12 +22,9 @@

package org.pentaho.di.trans.steps.jsoninput;

import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.BitSet;
import java.util.Iterator;
import java.util.List;

import org.pentaho.di.core.RowSet;
import org.pentaho.di.core.row.RowMetaInterface;
Expand All @@ -43,17 +40,10 @@ public class JsonInputData extends BaseFileInputStepData implements StepDataInte
public Object[] previousRow;
public RowMetaInterface inputRowMeta;

@Deprecated
// is this used?
public int nr_repeats;
public boolean hasFirstRow;

public int nrInputFields;

@Deprecated
public int recordnr;

@Deprecated
public int nrrecords;
/**
* last row read
*/
Expand All @@ -62,32 +52,12 @@ public class JsonInputData extends BaseFileInputStepData implements StepDataInte

public int filenr;

@Deprecated //used?
public FileInputStream fr;
@Deprecated //used?
public BufferedInputStream is;

@Deprecated //used?
public String itemElement;
@Deprecated //used?
public int itemCount;
@Deprecated //used?
public int itemPosition;

/**
* output row counter
*/
public long rownr;
public int indexSourceField;

@Deprecated
public JsonReader jsonReader;
@Deprecated
public List<NJSONArray> resultList;

@Deprecated
public String stringToParse;

public Iterator<InputStream> inputs;
public IJsonReader reader;
public RowSet readerRowSet;
Expand All @@ -99,13 +69,9 @@ public JsonInputData() {
previousRow = null;
filenr = 0;

fr = null;
is = null;
indexSourceField = -1;

nrInputFields = -1;
recordnr = 0;
nrrecords = 0;

readrow = null;
totalpreviousfields = 0;
Expand Down
Expand Up @@ -135,7 +135,7 @@ public static class AdditionalFileOutputFields extends BaseFileInputStepMeta.Add

public void getFields( RowMetaInterface r, String name, RowMetaInterface[] info,
VariableSpace space, Repository repository, IMetaStore metaStore ) throws KettleStepException {
// TextFileInput is the same, this can be refactored further
// TextFileInput is the same, this can be refactored further
if ( shortFilenameField != null ) {
ValueMetaInterface v =
new ValueMetaString( space.environmentSubstitute( shortFilenameField ) );
Expand Down Expand Up @@ -387,6 +387,7 @@ public void setInputFields( JsonInputField[] inputFields ) {
/**
* @deprecated use {@link#getExcludeFileMask()}
*/
@Deprecated
public String[] getExludeFileMask() {
return getExcludeFileMask();
}
Expand Down Expand Up @@ -558,19 +559,23 @@ public String[] getIncludeSubFolders() {
return inputFiles.includeSubFolders;
}

@Override
public void loadXML( Node stepnode, List<DatabaseMeta> databases, IMetaStore metaStore ) throws KettleXMLException {
readData( stepnode );
}

@Override
public JsonInputMeta clone() {
JsonInputMeta clone = (JsonInputMeta) super.clone();
// comp classes handled by super
clone.setFileName( getFileName() );
clone.setFileMask( getFileMask() );
clone.setExcludeFileMask( getExcludeFileMask() );
return clone;
}

@Override
public String getXML() {
StringBuffer retval = new StringBuffer( 400 );
StringBuilder retval = new StringBuilder( 400 );

retval.append( " " ).append( XMLHandler.addTagValue( "include", includeFilename ) );
retval.append( " " ).append( XMLHandler.addTagValue( "include_field", filenameField ) );
Expand Down Expand Up @@ -633,6 +638,7 @@ public String getRequiredFilesDesc( String tt ) {
}
}

@Override
public String getRequiredFilesCode( String tt ) {
if ( tt == null ) {
return RequiredFilesCode[0];
Expand Down Expand Up @@ -714,6 +720,7 @@ private void initArrayFields( int nrfiles, int nrfields ) {
getInputFiles().allocate( nrfiles, nrfields );
}

@Override
public void setDefault() {
additionalFileOutputFields = new AdditionalFileOutputFields();

Expand Down Expand Up @@ -785,6 +792,7 @@ public void getFields( RowMetaInterface rowMeta, String name, RowMetaInterface[]
additionalFileOutputFields.getFields( rowMeta, name, info, space, repository, metaStore );
}

@Override
public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, List<DatabaseMeta> databases )
throws KettleException {

Expand Down Expand Up @@ -847,6 +855,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
setPathField( rep.getStepAttributeString( id_step, "pathFieldName" ) );
setIsHiddenField( rep.getStepAttributeString( id_step, "hiddenFieldName" ) );
setLastModificationDateField( rep.getStepAttributeString( id_step, "lastModificationTimeFieldName" ) );
setUriField( rep.getStepAttributeString( id_step, "uriNameFieldName" ) );
setRootUriField( rep.getStepAttributeString( id_step, "rootUriNameFieldName" ) );
setExtensionField( rep.getStepAttributeString( id_step, "extensionFieldName" ) );
setSizeField( rep.getStepAttributeString( id_step, "sizeFieldName" ) );
Expand All @@ -856,6 +865,7 @@ public void readRep( Repository rep, IMetaStore metaStore, ObjectId id_step, Lis
}
}

@Override
public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transformation, ObjectId id_step )
throws KettleException {
try {
Expand Down Expand Up @@ -910,6 +920,7 @@ public void saveRep( Repository rep, IMetaStore metaStore, ObjectId id_transform
rep.saveStepAttribute( id_transformation, id_step, "uriNameFieldName", getUriField() );
rep.saveStepAttribute( id_transformation, id_step, "rootUriNameFieldName", getRootUriField() );
rep.saveStepAttribute( id_transformation, id_step, "extensionFieldName", getExtensionField() );
rep.saveStepAttribute( id_transformation, id_step, "sizeFieldName", getSizeField() );
} catch ( Exception e ) {
throw new KettleException( BaseMessages.getString(
PKG, "JsonInputMeta.Exception.ErrorSavingToRepository", "" + id_step ), e );
Expand All @@ -921,6 +932,7 @@ public FileInputList getFiles( VariableSpace space ) {
space, getFileName(), getFileMask(), getExcludeFileMask(), getFileRequired(), includeSubFolderBoolean() );
}

@Override
public void check( List<CheckResultInterface> remarks, TransMeta transMeta, StepMeta stepMeta, RowMetaInterface prev,
String[] input, String[] output, RowMetaInterface info, VariableSpace space, Repository repository,
IMetaStore metaStore ) {
Expand Down Expand Up @@ -977,15 +989,18 @@ public void check( List<CheckResultInterface> remarks, TransMeta transMeta, Step
}
}

@Override
public StepInterface getStep( StepMeta stepMeta, StepDataInterface stepDataInterface, int cnr, TransMeta tr,
Trans trans ) {
return new JsonInput( stepMeta, stepDataInterface, cnr, tr, trans );
}

@Override
public StepDataInterface getStepData() {
return new JsonInputData();
}

@Override
public boolean supportsErrorHandling() {
return true;
}
Expand All @@ -1007,6 +1022,7 @@ public boolean supportsErrorHandling() {
*
* @return the filename of the exported resource
*/
@Override
public String exportResources( VariableSpace space, Map<String, ResourceDefinition> definitions,
ResourceNamingInterface resourceNamingInterface, Repository repository, IMetaStore metaStore ) throws KettleException {
try {
Expand Down

0 comments on commit 945b3f6

Please sign in to comment.