Skip to content

Commit

Permalink
[BACKLOG-8265] Changes in engine to support MDI to the Hadoop File Input
Browse files Browse the repository at this point in the history
  • Loading branch information
ecuellar committed May 24, 2016
1 parent 1bb9f3e commit 10ed949
Show file tree
Hide file tree
Showing 4 changed files with 334 additions and 10 deletions.
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -113,6 +113,24 @@ protected void addInjectionProperty( Injection metaInj, BeanLevelInfo leaf ) {
gr.groupProperties.add( prop );
}

public String getDescription( String name ) {
String description = BaseMessages.getString( clazz, clazzAnnotation.localizationPrefix() + name );
if ( description != null && description.startsWith( "!" ) && description.endsWith( "!" ) ) {
Class baseClass = clazz.getSuperclass();
while ( baseClass != null ) {
InjectionSupported baseAnnotation = (InjectionSupported) baseClass.getAnnotation( InjectionSupported.class );
if ( baseAnnotation != null ) {
description = BaseMessages.getString( baseClass, baseAnnotation.localizationPrefix() + name );
if ( description != null && !description.startsWith( "!" ) && !description.endsWith( "!" ) ) {
return description;
}
}
baseClass = baseClass.getSuperclass();
}
}
return description;
}

public class Property {
private final String name;
private final String groupName;
Expand Down Expand Up @@ -141,7 +159,7 @@ public String getGroupName() {
}

public String getDescription() {
return BaseMessages.getString( clazz, clazzAnnotation.localizationPrefix() + name );
return BeanInjectionInfo.this.getDescription( name );
}
}

Expand All @@ -162,7 +180,7 @@ public List<Property> getGroupProperties() {
}

public String getDescription() {
return BaseMessages.getString( clazz, clazzAnnotation.localizationPrefix() + name );
return BeanInjectionInfo.this.getDescription( name );
}
}
}
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -22,17 +22,26 @@

package org.pentaho.di.trans.steps.fileinput.text;

import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.commons.vfs2.FileObject;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleFileException;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.playlist.FilePlayListAll;
import org.pentaho.di.core.playlist.FilePlayListReplay;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepDataInterface;
import org.pentaho.di.trans.step.StepInterface;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.fileinput.BaseFileInputField;
import org.pentaho.di.trans.steps.fileinput.BaseFileInputStep;
import org.pentaho.di.trans.steps.fileinput.IBaseFileInputReader;

Expand Down Expand Up @@ -93,4 +102,289 @@ public boolean init() {
public boolean isWaitingForData() {
return true;
}

public static final String[] guessStringsFromLine( VariableSpace space, LogChannelInterface log, String line,
TextFileInputMeta inf, String delimiter, String enclosure, String escapeCharacter ) throws
KettleException {
List<String> strings = new ArrayList<String>();

String pol; // piece of line

try {
if ( line == null ) {
return null;
}

if ( inf.content.fileType.equalsIgnoreCase( "CSV" ) ) {

// Split string in pieces, only for CSV!

int pos = 0;
int length = line.length();
boolean dencl = false;

int len_encl = ( enclosure == null ? 0 : enclosure.length() );
int len_esc = ( escapeCharacter == null ? 0 : escapeCharacter.length() );

while ( pos < length ) {
int from = pos;
int next;

boolean encl_found;
boolean contains_escaped_enclosures = false;
boolean contains_escaped_separators = false;

// Is the field beginning with an enclosure?
// "aa;aa";123;"aaa-aaa";000;...
if ( len_encl > 0 && line.substring( from, from + len_encl ).equalsIgnoreCase( enclosure ) ) {
if ( log.isRowLevel() ) {
log.logRowlevel( BaseMessages.getString( PKG, "TextFileInput.Log.ConvertLineToRowTitle" ), BaseMessages
.getString( PKG, "TextFileInput.Log.ConvertLineToRow", line.substring( from, from + len_encl ) ) );
}
encl_found = true;
int p = from + len_encl;

boolean is_enclosure =
len_encl > 0 && p + len_encl < length && line.substring( p, p + len_encl )
.equalsIgnoreCase( enclosure );
boolean is_escape =
len_esc > 0 && p + len_esc < length
&& line.substring( p, p + len_esc ).equalsIgnoreCase( escapeCharacter );

boolean enclosure_after = false;

// Is it really an enclosure? See if it's not repeated twice or escaped!
if ( ( is_enclosure || is_escape ) && p < length - 1 ) {
String strnext = line.substring( p + len_encl, p + 2 * len_encl );
if ( strnext.equalsIgnoreCase( enclosure ) ) {
p++;
enclosure_after = true;
dencl = true;

// Remember to replace them later on!
if ( is_escape ) {
contains_escaped_enclosures = true;
}
}
}

// Look for a closing enclosure!
while ( ( !is_enclosure || enclosure_after ) && p < line.length() ) {
p++;
enclosure_after = false;
is_enclosure =
len_encl > 0 && p + len_encl < length && line.substring( p, p + len_encl ).equals( enclosure );
is_escape =
len_esc > 0 && p + len_esc < length && line.substring( p, p + len_esc ).equals( escapeCharacter );

// Is it really an enclosure? See if it's not repeated twice or escaped!
if ( ( is_enclosure || is_escape ) && p < length - 1 ) {

String strnext = line.substring( p + len_encl, p + 2 * len_encl );
if ( strnext.equals( enclosure ) ) {
p++;
enclosure_after = true;
dencl = true;

// Remember to replace them later on!
if ( is_escape ) {
contains_escaped_enclosures = true; // remember
}
}
}
}

if ( p >= length ) {
next = p;
} else {
next = p + len_encl;
}

if ( log.isRowLevel() ) {
log.logRowlevel( BaseMessages.getString( PKG, "TextFileInput.Log.ConvertLineToRowTitle" ), BaseMessages
.getString( PKG, "TextFileInput.Log.EndOfEnclosure", "" + p ) );
}
} else {
encl_found = false;
boolean found = false;
int startpoint = from;
// int tries = 1;
do {
next = line.indexOf( delimiter, startpoint );

// See if this position is preceded by an escape character.
if ( len_esc > 0 && next - len_esc > 0 ) {
String before = line.substring( next - len_esc, next );

if ( escapeCharacter.equals( before ) ) {
// take the next separator, this one is escaped...
startpoint = next + 1;
// tries++;
contains_escaped_separators = true;
} else {
found = true;
}
} else {
found = true;
}
} while ( !found && next >= 0 );
}
if ( next == -1 ) {
next = length;
}

if ( encl_found ) {
pol = line.substring( from + len_encl, next - len_encl );
if ( log.isRowLevel() ) {
log.logRowlevel( BaseMessages.getString( PKG, "TextFileInput.Log.ConvertLineToRowTitle" ), BaseMessages
.getString( PKG, "TextFileInput.Log.EnclosureFieldFound", "" + pol ) );
}
} else {
pol = line.substring( from, next );
if ( log.isRowLevel() ) {
log.logRowlevel( BaseMessages.getString( PKG, "TextFileInput.Log.ConvertLineToRowTitle" ), BaseMessages
.getString( PKG, "TextFileInput.Log.NormalFieldFound", "" + pol ) );
}
}

if ( dencl ) {
StringBuilder sbpol = new StringBuilder( pol );
int idx = sbpol.indexOf( enclosure + enclosure );
while ( idx >= 0 ) {
sbpol.delete( idx, idx + enclosure.length() );
idx = sbpol.indexOf( enclosure + enclosure );
}
pol = sbpol.toString();
}

// replace the escaped enclosures with enclosures...
if ( contains_escaped_enclosures ) {
String replace = escapeCharacter + enclosure;
String replaceWith = enclosure;

pol = Const.replace( pol, replace, replaceWith );
}

// replace the escaped separators with separators...
if ( contains_escaped_separators ) {
String replace = escapeCharacter + delimiter;
String replaceWith = delimiter;

pol = Const.replace( pol, replace, replaceWith );
}

// Now add pol to the strings found!
strings.add( pol );

pos = next + delimiter.length();
}
if ( pos == length ) {
if ( log.isRowLevel() ) {
log.logRowlevel( BaseMessages.getString( PKG, "TextFileInput.Log.ConvertLineToRowTitle" ), BaseMessages
.getString( PKG, "TextFileInput.Log.EndOfEmptyLineFound" ) );
}
strings.add( "" );
}
} else {
// Fixed file format: Simply get the strings at the required positions...
for ( int i = 0; i < inf.inputFiles.inputFields.length; i++ ) {
BaseFileInputField field = inf.inputFiles.inputFields[i];

int length = line.length();

if ( field.getPosition() + field.getLength() <= length ) {
strings.add( line.substring( field.getPosition(), field.getPosition() + field.getLength() ) );
} else {
if ( field.getPosition() < length ) {
strings.add( line.substring( field.getPosition() ) );
} else {
strings.add( "" );
}
}
}
}
} catch ( Exception e ) {
throw new KettleException( BaseMessages.getString( PKG, "TextFileInput.Log.Error.ErrorConvertingLine", e
.toString() ), e );
}

return strings.toArray( new String[strings.size()] );
}

public static final String getLine( LogChannelInterface log, InputStreamReader reader, int formatNr,
StringBuilder line ) throws KettleFileException {
EncodingType type = EncodingType.guessEncodingType( reader.getEncoding() );
return getLine( log, reader, type, formatNr, line );
}

public static final String getLine( LogChannelInterface log, InputStreamReader reader, EncodingType encodingType,
int formatNr, StringBuilder line ) throws KettleFileException {
int c = 0;
line.setLength( 0 );
try {
switch ( formatNr ) {
case TextFileInputMeta.FILE_FORMAT_DOS:
while ( c >= 0 ) {
c = reader.read();

if ( encodingType.isReturn( c ) || encodingType.isLinefeed( c ) ) {
c = reader.read(); // skip \n and \r
if ( !encodingType.isReturn( c ) && !encodingType.isLinefeed( c ) ) {
// make sure its really a linefeed or cariage return
// raise an error this is not a DOS file
// so we have pulled a character from the next line
throw new KettleFileException( BaseMessages.getString( PKG, "TextFileInput.Log.SingleLineFound" ) );
}
return line.toString();
}
if ( c >= 0 ) {
line.append( (char) c );
}
}
break;
case TextFileInputMeta.FILE_FORMAT_UNIX:
while ( c >= 0 ) {
c = reader.read();

if ( encodingType.isLinefeed( c ) || encodingType.isReturn( c ) ) {
return line.toString();
}
if ( c >= 0 ) {
line.append( (char) c );
}
}
break;
case TextFileInputMeta.FILE_FORMAT_MIXED:
// in mixed mode we suppose the LF is the last char and CR is ignored
// not for MAC OS 9 but works for Mac OS X. Mac OS 9 can use UNIX-Format
while ( c >= 0 ) {
c = reader.read();

if ( encodingType.isLinefeed( c ) ) {
return line.toString();
} else if ( !encodingType.isReturn( c ) ) {
if ( c >= 0 ) {
line.append( (char) c );
}
}
}
break;
default:
break;
}
} catch ( KettleFileException e ) {
throw e;
} catch ( Exception e ) {
if ( line.length() == 0 ) {
throw new KettleFileException( BaseMessages.getString( PKG, "TextFileInput.Log.Error.ExceptionReadingLine", e
.toString() ), e );
}
return line.toString();
}
if ( line.length() > 0 ) {
return line.toString();
}

return null;
}
}

0 comments on commit 10ed949

Please sign in to comment.