Skip to content

Commit

Permalink
[PDI-14232] "Log line timeout (days)" not deleting old log table entr…
Browse files Browse the repository at this point in the history
…ies (job logging)

- cleaning up log recodes ( due to log record timeout property ) after writing new info
- tests written
  • Loading branch information
Ivan Nikolaichuk authored and Ivan Nikolaichuk committed Jan 11, 2016
1 parent da2de70 commit f250b78
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 28 deletions.
51 changes: 33 additions & 18 deletions engine/src/org/pentaho/di/job/Job.java
Expand Up @@ -1177,24 +1177,7 @@ private boolean endProcessing() throws KettleJobException {
JobLogTable jobLogTable = jobMeta.getJobLogTable(); JobLogTable jobLogTable = jobMeta.getJobLogTable();
if ( jobLogTable.isDefined() ) { if ( jobLogTable.isDefined() ) {


String tableName = jobMeta.getJobLogTable().getActualTableName(); writeLogTableInformation( jobLogTable, status );
DatabaseMeta logcon = jobMeta.getJobLogTable().getDatabaseMeta();

Database ldb = new Database( this, logcon );
ldb.shareVariablesWith( this );
try {
ldb.connect();
ldb.setCommit( logCommitSize );
ldb.writeLogRecord( jobMeta.getJobLogTable(), status, this, null );
} catch ( KettleDatabaseException dbe ) {
addErrors( 1 );
throw new KettleJobException( "Unable to end processing by writing log record to table " + tableName, dbe );
} finally {
if ( !ldb.isAutoCommit() ) {
ldb.commitLog( true, jobMeta.getJobLogTable() );
}
ldb.disconnect();
}
} }


return true; return true;
Expand All @@ -1203,6 +1186,38 @@ private boolean endProcessing() throws KettleJobException {
} }
} }


/**
* Writes information to Job Log table.
* Cleans old records, in case job is finished.
*
*/
protected void writeLogTableInformation( JobLogTable jobLogTable, LogStatus status )
throws KettleJobException, KettleDatabaseException {
boolean cleanLogRecords = status.equals( LogStatus.END );
String tableName = jobLogTable.getActualTableName();
DatabaseMeta logcon = jobLogTable.getDatabaseMeta();

Database ldb = createDataBase( logcon );
ldb.shareVariablesWith( this );
try {
ldb.connect();
ldb.setCommit( logCommitSize );
ldb.writeLogRecord( jobLogTable, status, this, null );

if ( cleanLogRecords ) {
ldb.cleanupLogRecords( jobLogTable );
}

} catch ( KettleDatabaseException dbe ) {
addErrors( 1 );
throw new KettleJobException( "Unable to end processing by writing log record to table " + tableName, dbe );
} finally {
if ( !ldb.isAutoCommit() ) {
ldb.commitLog( true, jobLogTable );
}
ldb.disconnect();
}
}
/** /**
* Write log channel information. * Write log channel information.
* *
Expand Down
60 changes: 50 additions & 10 deletions engine/test-src/org/pentaho/di/job/JobTest.java
Expand Up @@ -22,36 +22,76 @@


package org.pentaho.di.job; package org.pentaho.di.job;


import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.pentaho.di.core.database.Database; import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta; import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleException; import org.pentaho.di.core.logging.BaseLogTable;
import org.pentaho.di.core.logging.JobEntryLogTable; import org.pentaho.di.core.logging.JobEntryLogTable;
import org.pentaho.di.core.logging.JobLogTable;
import org.pentaho.di.core.logging.LogStatus;
import org.pentaho.di.core.logging.LogTableField;
import org.pentaho.di.core.variables.VariableSpace; import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.trans.HasDatabasesInterface; import org.pentaho.di.trans.HasDatabasesInterface;


import java.util.ArrayList;

import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;


public class JobTest { public class JobTest {
private final static String STRING_DEFAULT = "<def>";
private Job mockedJob;
private Database mockedDataBase;
private VariableSpace mockedVariableSpace;
private HasDatabasesInterface hasDatabasesInterface;


@Before
public void init() {
mockedDataBase = mock( Database.class );
mockedJob = mock( Job.class );
mockedVariableSpace = mock( VariableSpace.class );
hasDatabasesInterface = mock( HasDatabasesInterface.class );

when( mockedJob.createDataBase( any( DatabaseMeta.class ) ) ).thenReturn( mockedDataBase );
}


@Test @Test
public void recordsCleanUpMethodIsCalled() throws KettleException { public void recordsCleanUpMethodIsCalled_JobEntryLogTable() throws Exception {
Database mockedDataBase = mock( Database.class );
Job job = mock( Job.class );


JobEntryLogTable jobEntryLogTable = JobEntryLogTable.getDefault( mock( VariableSpace.class ), mock( HasDatabasesInterface.class ) ); JobEntryLogTable jobEntryLogTable = JobEntryLogTable.getDefault( mockedVariableSpace, hasDatabasesInterface );
jobEntryLogTable.setConnectionName( "connection" ); setAllTableParamsDefault( jobEntryLogTable );


JobMeta jobMeta = new JobMeta( ); JobMeta jobMeta = new JobMeta( );
jobMeta.setJobEntryLogTable( jobEntryLogTable ); jobMeta.setJobEntryLogTable( jobEntryLogTable );


when( job.createDataBase( any( DatabaseMeta.class ) ) ).thenReturn( mockedDataBase ); when( mockedJob.getJobMeta() ).thenReturn( jobMeta );
when( job.getJobMeta() ).thenReturn( jobMeta ); doCallRealMethod().when( mockedJob ).writeJobEntryLogInformation();
doCallRealMethod().when( job ).writeJobEntryLogInformation();


job.writeJobEntryLogInformation(); mockedJob.writeJobEntryLogInformation();


verify( mockedDataBase ).cleanupLogRecords( jobEntryLogTable ); verify( mockedDataBase ).cleanupLogRecords( jobEntryLogTable );
} }

@Test
public void recordsCleanUpMethodIsCalled_JobLogTable() throws Exception {
JobLogTable jobLogTable = JobLogTable.getDefault( mockedVariableSpace, hasDatabasesInterface );
setAllTableParamsDefault( jobLogTable );

doCallRealMethod().when( mockedJob ).writeLogTableInformation( jobLogTable, LogStatus.END );

mockedJob.writeLogTableInformation( jobLogTable, LogStatus.END );

verify( mockedDataBase ).cleanupLogRecords( jobLogTable );
}

public void setAllTableParamsDefault( BaseLogTable table ) {
table.setSchemaName( STRING_DEFAULT );
table.setConnectionName( STRING_DEFAULT );
table.setTimeoutInDays( STRING_DEFAULT );
table.setTableName( STRING_DEFAULT );
table.setFields( new ArrayList<LogTableField>() );
}

} }

0 comments on commit f250b78

Please sign in to comment.