Skip to content

Commit

Permalink
PDI-12555 - changes for VWloadMonitor correct work
Browse files Browse the repository at this point in the history
  • Loading branch information
YuryBY committed Jan 14, 2015
1 parent 1351ea2 commit 6e3b757
Showing 1 changed file with 11 additions and 6 deletions.
Expand Up @@ -152,7 +152,6 @@ public boolean execute( IngresVectorwiseLoaderMeta meta ) throws KettleException
// any output?
data.outputLogger = new StreamLogger( log, data.sqlProcess.getInputStream(), "OUT_SQL" );


// Where do we send the data to? --> To STDIN of the sql process
//
data.sqlOutputStream = data.sqlProcess.getOutputStream();
Expand All @@ -161,7 +160,7 @@ public boolean execute( IngresVectorwiseLoaderMeta meta ) throws KettleException
logWriteThread = new Thread( logWriter, "IngresVecorWiseStepLogWriter" );
logWriteThread.start();

vwLoadMonitor = new VWloadMonitor( data.sqlProcess, data.outputLogger, logWriteThread );
vwLoadMonitor = new VWloadMonitor( data.sqlProcess, logWriter, logWriteThread );
vwLoadMonitorThread = new Thread( vwLoadMonitor );
vwLoadMonitorThread.start();

Expand Down Expand Up @@ -771,12 +770,12 @@ void checkExcn() throws Exception {

public class VWloadMonitor implements Runnable {
private Process vwloadProcess;
private StreamLogger outputLogger;
private LogWriter logWriter;
private Thread outputLoggerThread;

VWloadMonitor( Process loadProcess, StreamLogger outputLogger, Thread outputLoggerThread ) {
VWloadMonitor( Process loadProcess, LogWriter logWriter, Thread outputLoggerThread ) {
this.vwloadProcess = loadProcess;
this.outputLogger = outputLogger;
this.logWriter = logWriter;
this.outputLoggerThread = outputLoggerThread;
}

Expand Down Expand Up @@ -808,7 +807,7 @@ private Long[] tryToParseVWloadResultMessage() throws InterruptedException, IOEx
outputLoggerThread.join();
Long[] result = new Long[3];
if ( meta.isUsingVwload() ) {
String lastLine = outputLogger.getLastLine();
String lastLine = logWriter.getLastInputStreamLine();
Scanner sc = null;
try {
sc = new Scanner( lastLine );
Expand Down Expand Up @@ -920,6 +919,7 @@ String substitute( String input, String regex, String substitution ) {
class LogWriter implements Runnable {
final InputStream is;
boolean isErrorsOccured;
String lastLine;

public LogWriter( InputStream outStream ) {
this.is = outStream;
Expand All @@ -937,6 +937,7 @@ private void printLog() {
String line = null;
String ingresErrorRegex = ".*E_[A-Z]{1,2}[0-9]{3,4}.*";
while ( ( line = br.readLine() ) != null ) {
lastLine = line;
if ( !line.matches( ingresErrorRegex ) ) {
log.logBasic( LogLevelEnum.OUT.getPredicateMessage() + line );
} else {
Expand All @@ -952,6 +953,10 @@ private void printLog() {
boolean isErrorsOccured() {
return isErrorsOccured;
}

String getLastInputStreamLine() {
return lastLine;
}
}

/**
Expand Down

0 comments on commit 6e3b757

Please sign in to comment.