Skip to content

Commit

Permalink
[PDI-13759] excel input step when using apache POI streaming does not…
Browse files Browse the repository at this point in the history
… release file xlsx in case of error
  • Loading branch information
Sergey_Travin authored and Sergey_Travin committed Jun 19, 2015
1 parent af82d87 commit a13f917
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 71 deletions.
1 change: 1 addition & 0 deletions build.xml
Expand Up @@ -758,6 +758,7 @@
haltonerror="false"/> haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.excelinput.PoiWorkBookTest" <test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.excelinput.PoiWorkBookTest"
haltonerror="false"/> haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.excelinput.StaxWorkBookTest" haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.execsqlrow.ExecSQLRowTest" <test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.execsqlrow.ExecSQLRowTest"
haltonerror="false"/> haltonerror="false"/>
<test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.getxmldata.GetXMLDataTest" <test todir="${testreports.xml.dir}" name="org.pentaho.di.trans.steps.getxmldata.GetXMLDataTest"
Expand Down
Expand Up @@ -33,12 +33,16 @@
import org.apache.poi.ss.usermodel.Sheet; import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook; import org.apache.poi.ss.usermodel.Workbook;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.spreadsheet.KSheet; import org.pentaho.di.core.spreadsheet.KSheet;
import org.pentaho.di.core.spreadsheet.KWorkbook; import org.pentaho.di.core.spreadsheet.KWorkbook;
import org.pentaho.di.core.vfs.KettleVFS; import org.pentaho.di.core.vfs.KettleVFS;


public class PoiWorkbook implements KWorkbook { public class PoiWorkbook implements KWorkbook {


private LogChannelInterface log;

private Workbook workbook; private Workbook workbook;
private String filename; private String filename;
private String encoding; private String encoding;
Expand All @@ -50,7 +54,7 @@ public class PoiWorkbook implements KWorkbook {
public PoiWorkbook( String filename, String encoding ) throws KettleException { public PoiWorkbook( String filename, String encoding ) throws KettleException {
this.filename = filename; this.filename = filename;
this.encoding = encoding; this.encoding = encoding;

this.log = KettleLogStore.getLogChannelInterfaceFactory().create( this );
try { try {
FileObject fileObject = KettleVFS.getFileObject( filename ); FileObject fileObject = KettleVFS.getFileObject( filename );
if ( fileObject instanceof LocalFile ) { if ( fileObject instanceof LocalFile ) {
Expand Down Expand Up @@ -101,7 +105,7 @@ public void close() {
opcpkg.revert(); opcpkg.revert();
} }
} catch ( IOException ex ) { } catch ( IOException ex ) {
// Ignore errors log.logError( "Could not close workbook", ex );
} }
} }


Expand Down
Expand Up @@ -112,9 +112,6 @@ public StaxPoiSheet( XSSFReader reader, String sheetName, String sheetID ) {


@Override @Override
public KCell[] getRow( int rownr ) { public KCell[] getRow( int rownr ) {

// convert 0 based index to 1 based
rownr += 1;
try { try {
while ( sheetReader.hasNext() ) { while ( sheetReader.hasNext() ) {
int event = sheetReader.next(); int event = sheetReader.next();
Expand All @@ -126,14 +123,23 @@ public KCell[] getRow( int rownr ) {
} }


KCell[] cells = new StaxPoiCell[numCols]; KCell[] cells = new StaxPoiCell[numCols];
boolean richedRowEnd = false;
for ( int i = 0; i < numCols; i++ ) { for ( int i = 0; i < numCols; i++ ) {
// go to the "c" <cell> tag // go to the "c" <cell> tag
while ( sheetReader.hasNext() ) { while ( sheetReader.hasNext() ) {
if ( event == XMLStreamConstants.START_ELEMENT && sheetReader.getLocalName().equals( "c" ) ) { if ( event == XMLStreamConstants.START_ELEMENT && sheetReader.getLocalName().equals( "c" ) ) {
break; break;
} }
//if we have empty cell than we could reach and of row before fill all cells, so break all cycle
if ( event == XMLStreamConstants.END_ELEMENT && sheetReader.getLocalName().equals( "row" ) ) {
richedRowEnd = true;
break;
}
event = sheetReader.next(); event = sheetReader.next();
} }
if ( richedRowEnd ) {
break;
}
String cellLocation = sheetReader.getAttributeValue( null, "r" ); String cellLocation = sheetReader.getAttributeValue( null, "r" );
int columnIndex = StaxUtil.extractColumnNumber( cellLocation ) - 1; int columnIndex = StaxUtil.extractColumnNumber( cellLocation ) - 1;
String cellType = sheetReader.getAttributeValue( null, "t" ); String cellType = sheetReader.getAttributeValue( null, "t" );
Expand Down Expand Up @@ -191,7 +197,7 @@ public KCell getCell( int colnr, int rownr ) {
} }


public void close() throws IOException, XMLStreamException { public void close() throws IOException, XMLStreamException {
sheetStream.close();
sheetReader.close(); sheetReader.close();
sheetStream.close();
} }
} }
Expand Up @@ -26,24 +26,28 @@


package org.pentaho.di.trans.steps.excelinput.staxpoi; package org.pentaho.di.trans.steps.excelinput.staxpoi;


import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;


import javax.xml.stream.XMLInputFactory; import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants; import javax.xml.stream.XMLStreamConstants;
import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader; import javax.xml.stream.XMLStreamReader;


import org.apache.poi.openxml4j.opc.OPCPackage; import org.apache.poi.openxml4j.opc.OPCPackage;
import org.apache.poi.xssf.eventusermodel.XSSFReader; import org.apache.poi.xssf.eventusermodel.XSSFReader;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LogChannelInterface;
import org.pentaho.di.core.spreadsheet.KSheet; import org.pentaho.di.core.spreadsheet.KSheet;
import org.pentaho.di.core.spreadsheet.KWorkbook; import org.pentaho.di.core.spreadsheet.KWorkbook;


public class StaxPoiWorkbook implements KWorkbook { public class StaxPoiWorkbook implements KWorkbook {


private LogChannelInterface log;

private XSSFReader reader; private XSSFReader reader;


// maintain the mapping of the sheet name to its ID // maintain the mapping of the sheet name to its ID
Expand All @@ -52,54 +56,66 @@ public class StaxPoiWorkbook implements KWorkbook {
// mapping of the sheet object with its ID/Name // mapping of the sheet object with its ID/Name
private Map<String, StaxPoiSheet> openSheetsMap; private Map<String, StaxPoiSheet> openSheetsMap;


private OPCPackage opcpkg;

public StaxPoiWorkbook() { public StaxPoiWorkbook() {
openSheetsMap = new HashMap<String, StaxPoiSheet>(); openSheetsMap = new HashMap<String, StaxPoiSheet>();
this.log = KettleLogStore.getLogChannelInterfaceFactory().create( this );
} }


public StaxPoiWorkbook( String filename, String encoding ) throws KettleException { public StaxPoiWorkbook( String filename, String encoding ) throws KettleException {
this(); this();
try { try {
OPCPackage pkg = OPCPackage.open( filename ); opcpkg = OPCPackage.open( filename );
openFile( pkg, encoding ); openFile( opcpkg, encoding );
} catch ( Exception e ) { } catch ( Exception e ) {
throw new KettleException( e ); throw new KettleException( e );
} }

} }


public StaxPoiWorkbook( InputStream inputStream, String encoding ) throws KettleException { public StaxPoiWorkbook( InputStream inputStream, String encoding ) throws KettleException {
this(); this();
try { try {
OPCPackage pkg = OPCPackage.open( inputStream ); opcpkg = OPCPackage.open( inputStream );
openFile( pkg, encoding ); openFile( opcpkg, encoding );
} catch ( Exception e ) { } catch ( Exception e ) {
throw new KettleException( e ); throw new KettleException( e );
} }
} }


private void openFile( OPCPackage pkg, String encoding ) throws KettleException { private void openFile( OPCPackage pkg, String encoding ) throws KettleException {
InputStream workbookData = null;
XMLStreamReader workbookReader = null;
try { try {
reader = new XSSFReader( pkg ); reader = new XSSFReader( pkg );
sheetNameIDMap = new HashMap<String, String>(); sheetNameIDMap = new HashMap<String, String>();
List<String> sheetList = new ArrayList<String>();
InputStream workbookData;
workbookData = reader.getWorkbookData(); workbookData = reader.getWorkbookData();
XMLInputFactory factory = XMLInputFactory.newInstance(); XMLInputFactory factory = XMLInputFactory.newInstance();
XMLStreamReader workbookReader = factory.createXMLStreamReader( workbookData ); workbookReader = factory.createXMLStreamReader( workbookData );
while ( workbookReader.hasNext() ) { while ( workbookReader.hasNext() ) {
if ( workbookReader.next() == XMLStreamConstants.START_ELEMENT if ( workbookReader.next() == XMLStreamConstants.START_ELEMENT && workbookReader.getLocalName().equals( "sheet" ) ) {
&& workbookReader.getLocalName().equals( "sheet" ) ) {
String sheetName = workbookReader.getAttributeValue( null, "name" ); String sheetName = workbookReader.getAttributeValue( null, "name" );
String sheetID = String sheetID = workbookReader.getAttributeValue( "http://schemas.openxmlformats.org/officeDocument/2006/relationships", "id" );
workbookReader.getAttributeValue(
"http://schemas.openxmlformats.org/officeDocument/2006/relationships", "id" );
sheetList.add( sheetName );
sheetNameIDMap.put( sheetName, sheetID ); sheetNameIDMap.put( sheetName, sheetID );
} }
} }
workbookData.close();
} catch ( Exception e ) { } catch ( Exception e ) {
throw new KettleException( e ); throw new KettleException( e );
} finally {
if ( workbookReader != null ) {
try {
workbookReader.close();
} catch ( XMLStreamException e ) {
throw new KettleException( e );
}
}
if ( workbookData != null ) {
try {
workbookData.close();
} catch ( IOException e ) {
throw new KettleException( e );
}
}
} }
} }


Expand Down Expand Up @@ -129,10 +145,16 @@ public void close() {
for ( StaxPoiSheet sheet : openSheetsMap.values() ) { for ( StaxPoiSheet sheet : openSheetsMap.values() ) {
try { try {
sheet.close(); sheet.close();
} catch ( Exception e ) { } catch ( IOException e ) {
e.printStackTrace(); log.logError( "Could not close workbook", e );
} catch ( XMLStreamException e ) {
log.logError( "Could not close xmlstream", e );
} }
} }
if ( opcpkg != null ) {
//We should not save change in xlsx because it is input step.
opcpkg.revert();
}
} }


@Override @Override
Expand Down
57 changes: 47 additions & 10 deletions test/org/pentaho/di/trans/steps/excelinput/PoiWorkBookTest.java
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2015 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand All @@ -22,21 +22,61 @@


package org.pentaho.di.trans.steps.excelinput; package org.pentaho.di.trans.steps.excelinput;


import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;


import java.io.File; import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Date; import java.util.Date;


import junit.framework.TestCase; import org.junit.Test;

import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.spreadsheet.KCell; import org.pentaho.di.core.spreadsheet.KCell;
import org.pentaho.di.core.spreadsheet.KCellType; import org.pentaho.di.core.spreadsheet.KCellType;
import org.pentaho.di.core.spreadsheet.KSheet; import org.pentaho.di.core.spreadsheet.KSheet;
import org.pentaho.di.core.spreadsheet.KWorkbook; import org.pentaho.di.core.spreadsheet.KWorkbook;
import org.apache.commons.io.FileUtils;

public class PoiWorkBookTest {

@Test
public void testReadData() throws KettleException {
readData();
}


public class PoiWorkBookTest extends TestCase { @Test
public void testRead() throws Exception { public void testFileDoesNotChange() throws KettleException, IOException {
File fileBeforeRead = new File( "testfiles/sample-file.xlsx" ); File fileBeforeRead = new File( "testfiles/sample-file.xlsx" );
readData();
File fileAfterRead = new File( "testfiles/sample-file.xlsx" );
assertTrue( FileUtils.contentEquals(fileBeforeRead, fileAfterRead ) );
}

@Test
public void testResourceFree() throws Exception {
FileLock lock = null;
RandomAccessFile randomAccessFile = null;
try {
readData();
File fileAfterRead = new File( "testfiles/sample-file.xlsx" );
randomAccessFile = new RandomAccessFile( fileAfterRead, "rw" );
FileChannel fileChannel = randomAccessFile.getChannel();
lock = fileChannel.tryLock();
// check that we could lock file
assertTrue( lock.isValid() );
} finally {
if ( lock != null ) {
lock.release();
}
if ( randomAccessFile != null ) {
randomAccessFile.close();
}
}
}

private void readData() throws KettleException {
KWorkbook workbook = WorkbookFactory.getWorkbook( SpreadSheetType.POI, "testfiles/sample-file.xlsx", null ); KWorkbook workbook = WorkbookFactory.getWorkbook( SpreadSheetType.POI, "testfiles/sample-file.xlsx", null );
int numberOfSheets = workbook.getNumberOfSheets(); int numberOfSheets = workbook.getNumberOfSheets();
assertEquals( 3, numberOfSheets ); assertEquals( 3, numberOfSheets );
Expand Down Expand Up @@ -85,13 +125,10 @@ public void testRead() throws Exception {


try { try {
sheet1.getRow( 5 ); sheet1.getRow( 5 );
throw new Exception( "No out of bounds exception thrown when expected" ); fail( "No out of bounds exception thrown when expected" );
} catch ( ArrayIndexOutOfBoundsException e ) { } catch ( ArrayIndexOutOfBoundsException e ) {
// OK! // OK!
} }

workbook.close(); workbook.close();
File fileAfterRead = new File( "testfiles/sample-file.xlsx" );
assertEquals( fileBeforeRead, fileAfterRead );
} }
} }

0 comments on commit a13f917

Please sign in to comment.