Skip to content

Commit

Permalink
[PDI-13884] Access Output Truncate Table option
Browse files Browse the repository at this point in the history
Adds the ability to truncate an Access table before writing new rows
  • Loading branch information
Matt Tucker committed May 31, 2016
1 parent 57a7776 commit 0997b09
Show file tree
Hide file tree
Showing 8 changed files with 403 additions and 32 deletions.
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -41,8 +41,6 @@
import org.pentaho.di.trans.step.StepMeta; import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface; import org.pentaho.di.trans.step.StepMetaInterface;


import com.healthmarketscience.jackcess.Database;

/** /**
* Writes rows to a database table. * Writes rows to a database table.
* *
Expand Down Expand Up @@ -75,7 +73,7 @@ public boolean processRow( StepMetaInterface smi, StepDataInterface sdi ) throws


if ( first && meta.isDoNotOpenNewFileInit() ) { if ( first && meta.isDoNotOpenNewFileInit() ) {
try { try {
if ( !OpenFile() ) { if ( !openFile() ) {
return false; return false;
} }


Expand Down Expand Up @@ -126,15 +124,15 @@ private boolean writeToTable( Object[] rowData ) throws KettleValueException {
if ( data.table == null ) { if ( data.table == null ) {
if ( meta.isTableCreated() ) { if ( meta.isTableCreated() ) {
// Create the table // Create the table
data.columns = AccessOutputMeta.getColumns( data.outputRowMeta ); data.createTable( realTablename, data.outputRowMeta );
data.db.createTable( realTablename, data.columns );
data.table = data.db.getTable( realTablename );
} else { } else {
logError( BaseMessages.getString( PKG, "AccessOutput.Error.TableDoesNotExist", realTablename ) ); logError( BaseMessages.getString( PKG, "AccessOutput.Error.TableDoesNotExist", realTablename ) );
setErrors( 1 ); setErrors( 1 );
stopAll(); stopAll();
return false; return false;
} }
} else if ( meta.isTableTruncated() ) {
data.truncateTable();
} }
// All OK: we have an open database and a table to write to. // All OK: we have an open database and a table to write to.
// //
Expand All @@ -157,11 +155,11 @@ private boolean writeToTable( Object[] rowData ) throws KettleValueException {
data.rows.add( columnValues ); data.rows.add( columnValues );
if ( meta.getCommitSize() > 0 ) { if ( meta.getCommitSize() > 0 ) {
if ( data.rows.size() >= meta.getCommitSize() ) { if ( data.rows.size() >= meta.getCommitSize() ) {
data.table.addRows( data.rows ); data.addRowsToTable( data.rows );
data.rows.clear(); data.rows.clear();
} }
} else { } else {
data.table.addRow( columnValues ); data.addRowToTable( columnValues );
} }
} catch ( IOException e ) { } catch ( IOException e ) {
logError( BaseMessages.getString( logError( BaseMessages.getString(
Expand All @@ -182,7 +180,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
if ( super.init( smi, sdi ) ) { if ( super.init( smi, sdi ) ) {
if ( !meta.isDoNotOpenNewFileInit() ) { if ( !meta.isDoNotOpenNewFileInit() ) {
try { try {
return OpenFile(); return openFile();


} catch ( Exception e ) { } catch ( Exception e ) {
logError( "An error occurred intialising this step: " + e.getMessage() ); logError( "An error occurred intialising this step: " + e.getMessage() );
Expand All @@ -196,7 +194,7 @@ public boolean init( StepMetaInterface smi, StepDataInterface sdi ) {
return false; return false;
} }


private boolean OpenFile() throws Exception { boolean openFile() throws Exception {
data.oneFileOpened = true; data.oneFileOpened = true;
String realFilename = environmentSubstitute( meta.getFilename() ); String realFilename = environmentSubstitute( meta.getFilename() );
if ( log.isBasic() ) { if ( log.isBasic() ) {
Expand All @@ -208,13 +206,13 @@ private boolean OpenFile() throws Exception {
// First open or create the access file // First open or create the access file
if ( !file.exists() ) { if ( !file.exists() ) {
if ( meta.isFileCreated() ) { if ( meta.isFileCreated() ) {
data.db = Database.create( file ); data.createDatabase( file );
} else { } else {
logError( BaseMessages.getString( PKG, "AccessOutput.InitError.FileDoesNotExist", realFilename ) ); logError( BaseMessages.getString( PKG, "AccessOutput.InitError.FileDoesNotExist", realFilename ) );
return false; return false;
} }
} else { } else {
data.db = Database.open( file ); data.openDatabase( file );
} }


// Add the filename to the result object... // Add the filename to the result object...
Expand All @@ -236,14 +234,14 @@ public void dispose( StepMetaInterface smi, StepDataInterface sdi ) {
try { try {
// Put the last records in the table as well! // Put the last records in the table as well!
if ( data.table != null ) { if ( data.table != null ) {
data.table.addRows( data.rows ); data.addRowsToTable( data.rows );
} }


// Just for good measure. // Just for good measure.
data.rows.clear(); data.rows.clear();


if ( data.db != null ) { if ( data.db != null ) {
data.db.close(); data.closeDatabase();
} }
} catch ( IOException e ) { } catch ( IOException e ) {
logError( "Error closing the database: " + e.toString() ); logError( "Error closing the database: " + e.toString() );
Expand Down
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand All @@ -22,6 +22,8 @@


package org.pentaho.di.trans.steps.accessoutput; package org.pentaho.di.trans.steps.accessoutput;


import java.io.File;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


Expand All @@ -30,6 +32,7 @@
import org.pentaho.di.trans.step.StepDataInterface; import org.pentaho.di.trans.step.StepDataInterface;


import com.healthmarketscience.jackcess.Column; import com.healthmarketscience.jackcess.Column;
import com.healthmarketscience.jackcess.Cursor;
import com.healthmarketscience.jackcess.Database; import com.healthmarketscience.jackcess.Database;
import com.healthmarketscience.jackcess.Table; import com.healthmarketscience.jackcess.Table;


Expand All @@ -40,7 +43,6 @@
public class AccessOutputData extends BaseStepData implements StepDataInterface { public class AccessOutputData extends BaseStepData implements StepDataInterface {
public Database db; public Database db;
public Table table; public Table table;
public List<Column> columns;
public List<Object[]> rows; public List<Object[]> rows;
public RowMetaInterface outputRowMeta; public RowMetaInterface outputRowMeta;
public boolean oneFileOpened; public boolean oneFileOpened;
Expand All @@ -51,4 +53,39 @@ public AccessOutputData() {
oneFileOpened = false; oneFileOpened = false;
} }


void createDatabase( File databaseFile ) throws IOException {
db = Database.create( databaseFile );
}

void openDatabase( File databaseFile ) throws IOException {
db = Database.open( databaseFile );
}

void closeDatabase() throws IOException {
db.close();
}

void createTable( String tableName, RowMetaInterface rowMeta ) throws IOException {
List<Column> columns = AccessOutputMeta.getColumns( rowMeta );
db.createTable( tableName, columns );
table = db.getTable( tableName );
}

void addRowToTable( Object... row ) throws IOException {
table.addRow( row );
}

void addRowsToTable( List<Object[]> rows ) throws IOException {
table.addRows( rows );
}

void truncateTable() throws IOException {
if ( table == null ) {
return;
}
Cursor tableRows = Cursor.createCursor( table );
while ( tableRows.moveToNextRow() ) {
tableRows.deleteCurrentRow();
}
}
} }
Expand Up @@ -2,7 +2,7 @@
* *
* Pentaho Data Integration * Pentaho Data Integration
* *
* Copyright (C) 2002-2013 by Pentaho : http://www.pentaho.com * Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
* *
******************************************************************************* *******************************************************************************
* *
Expand Down Expand Up @@ -35,12 +35,14 @@
import org.pentaho.di.core.Const; import org.pentaho.di.core.Const;
import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettlePluginException;
import org.pentaho.di.core.exception.KettleStepException;
import org.pentaho.di.core.exception.KettleValueException; import org.pentaho.di.core.exception.KettleValueException;
import org.pentaho.di.core.exception.KettleXMLException; import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.row.RowMeta; import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface; import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.ValueMeta;
import org.pentaho.di.core.row.ValueMetaInterface; import org.pentaho.di.core.row.ValueMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaFactory;
import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.vfs.KettleVFS; import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.core.xml.XMLHandler; import org.pentaho.di.core.xml.XMLHandler;
Expand Down Expand Up @@ -272,7 +274,7 @@ public RowMetaInterface getRequiredFields( VariableSpace space ) throws KettleEx
} }
} }


public static final RowMetaInterface getLayout( Table table ) throws SQLException { public static final RowMetaInterface getLayout( Table table ) throws SQLException, KettleStepException {
RowMetaInterface row = new RowMeta(); RowMetaInterface row = new RowMeta();
List<Column> columns = table.getColumns(); List<Column> columns = table.getColumns();
for ( int i = 0; i < columns.size(); i++ ) { for ( int i = 0; i < columns.size(); i++ ) {
Expand Down Expand Up @@ -374,9 +376,13 @@ public static final RowMetaInterface getLayout( Table table ) throws SQLExceptio
break; break;
} }


ValueMetaInterface v = new ValueMeta( column.getName(), valtype ); ValueMetaInterface v;
try {
v = ValueMetaFactory.createValueMeta( column.getName(), valtype );
} catch ( KettlePluginException e ) {
throw new KettleStepException( e );
}
v.setLength( length, precision ); v.setLength( length, precision );

row.addValueMeta( v ); row.addValueMeta( v );
} }


Expand Down
Expand Up @@ -38,3 +38,5 @@ AccessOutputMeta.AddFileToResult.Tooltip=Check this if you wan to add filenames
AccessOutputDialog.DoNotOpenNewFileInit.Label=Do not create file at start AccessOutputDialog.DoNotOpenNewFileInit.Label=Do not create file at start
AccessOutputDialog.DoNotOpenNewFileInit.Tooltip=Check this if you don''t want to create file at transformation starts.\n PDI will create file at first row received. AccessOutputDialog.DoNotOpenNewFileInit.Tooltip=Check this if you don''t want to create file at transformation starts.\n PDI will create file at first row received.


AccessOutputDialog.TruncateTable.Label=Truncate table
AccessOutputDialog.TruncateTable.Tooltip=Remove all existing rows in table before inserting new rows
@@ -0,0 +1,107 @@
/*! ******************************************************************************
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2016 by Pentaho : http://www.pentaho.com
*
*******************************************************************************
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package org.pentaho.di.trans.steps.accessoutput;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.junit.Before;
import org.junit.Test;
import org.pentaho.di.core.row.RowMeta;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.core.row.value.ValueMetaInteger;
import org.pentaho.di.core.row.value.ValueMetaString;

public class AccessOutputDataTest {

AccessOutputData data;
File mdbFile;

@Before
public void setUp() throws IOException {
data = new AccessOutputData();
mdbFile = File.createTempFile( "PDI_AccessOutputDataTest", ".mdb" );
mdbFile.deleteOnExit();
}

RowMetaInterface generateRowMeta() {
RowMetaInterface row = new RowMeta();
row.addValueMeta( new ValueMetaInteger( "id" ) );
row.addValueMeta( new ValueMetaString( "UUID" ) );
return row;
}

List<Object[]> generateRowData( int rowCount ) {
List<Object[]> rows = new ArrayList<Object[]>();
for ( int i = 0; i < rowCount; i++ ) {
rows.add( new Object[]{ i, UUID.randomUUID().toString() } );
}
return rows;
}

@Test
public void testCreateDatabase() throws IOException {
assertNull( data.db );
data.createDatabase( mdbFile );
assertNotNull( data.db );
assertTrue( mdbFile.exists() );

assertNull( data.table );
data.truncateTable();
assertNull( data.table );

data.closeDatabase();
}

@Test
public void testCreateTable() throws IOException {
data.createDatabase( mdbFile );
data.createTable( "thisSampleTable", generateRowMeta() );
assertTrue( data.db.getTableNames().contains( "thisSampleTable" ) );
data.closeDatabase();
}

@Test
public void testTruncateTable() throws IOException {
data.createDatabase( mdbFile );
data.createTable( "TruncatingThisTable", generateRowMeta() );

data.addRowsToTable( generateRowData( 10 ) );
assertEquals( 10, data.table.getRowCount() );

data.truncateTable();
assertEquals( 0, data.table.getRowCount() );

data.addRowToTable( generateRowData( 1 ).get( 0 ) );
assertEquals( 1, data.table.getRowCount() );
data.closeDatabase();
}
}
Expand Up @@ -21,6 +21,10 @@
******************************************************************************/ ******************************************************************************/
package org.pentaho.di.trans.steps.accessoutput; package org.pentaho.di.trans.steps.accessoutput;


import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
Expand All @@ -43,21 +47,32 @@ public void setUpLoadSave() throws Exception {
KettleEnvironment.init(); KettleEnvironment.init();
PluginRegistry.init( true ); PluginRegistry.init( true );
List<String> attributes = List<String> attributes =
Arrays.asList( "filename", "fileCreated", "tablename", "tableCreated", "tableTruncated", "commitSize", "addToResultFiles", "DoNotOpenNewFileInit" ); Arrays.asList( "filename", "fileCreated", "tablename", "tableCreated", "tableTruncated", "commitSize", "addToResultFiles", "DoNotOpenNewFileInit" );


Map<String, String> getterMap = new HashMap<String, String>(); Map<String, String> getterMap = new HashMap<String, String>();
Map<String, String> setterMap = new HashMap<String, String>(); Map<String, String> setterMap = new HashMap<String, String>();


Map<String, FieldLoadSaveValidator<?>> attrValidatorMap = new HashMap<String, FieldLoadSaveValidator<?>>(); Map<String, FieldLoadSaveValidator<?>> attrValidatorMap = new HashMap<String, FieldLoadSaveValidator<?>>();

Map<String, FieldLoadSaveValidator<?>> typeValidatorMap = new HashMap<String, FieldLoadSaveValidator<?>>(); Map<String, FieldLoadSaveValidator<?>> typeValidatorMap = new HashMap<String, FieldLoadSaveValidator<?>>();


loadSaveTester = loadSaveTester =
new LoadSaveTester( testMetaClass, attributes, getterMap, setterMap, attrValidatorMap, typeValidatorMap ); new LoadSaveTester( testMetaClass, attributes, getterMap, setterMap, attrValidatorMap, typeValidatorMap );
} }


@Test @Test
public void testSerialization() throws KettleException { public void testSerialization() throws KettleException {
loadSaveTester.testSerialization(); loadSaveTester.testSerialization();
} }

@Test
public void testDefaults() {
AccessOutputMeta stepMeta = new AccessOutputMeta();
stepMeta.setDefault();
assertTrue( stepMeta.isFileCreated() );
assertTrue( stepMeta.isTableCreated() );
assertTrue( stepMeta.isAddToResultFiles() );
assertEquals( 500, stepMeta.getCommitSize() );
assertFalse( stepMeta.isTableTruncated() );
assertFalse( stepMeta.isDoNotOpenNewFileInit() );
}
} }

0 comments on commit 0997b09

Please sign in to comment.