Skip to content

Commit

Permalink
Fix for issue apache#2868 : hop-run exits too quickly to properly han…
Browse files Browse the repository at this point in the history
…dle all finished listeners
  • Loading branch information
mattcasters committed Apr 27, 2023
1 parent b17a0c6 commit 6665e1b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 30 deletions.
12 changes: 8 additions & 4 deletions engine/src/main/java/org/apache/hop/pipeline/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -1328,10 +1328,6 @@ public void firePipelineExecutionFinishedListeners() throws HopException {
badGuys.add(e);
}
}
if (pipelineWaitUntilFinishedBlockingQueue != null) {
// Signal for the waitUntilFinished blocker...
pipelineWaitUntilFinishedBlockingQueue.add(new Object());
}
if (!badGuys.isEmpty()) {
// FIFO
throw new HopException(badGuys.get(0));
Expand All @@ -1348,6 +1344,14 @@ public void firePipelineExecutionFinishedListeners() throws HopException {
log, this, HopExtensionPoint.PipelineCompleted.id, this);
}

public void pipelineCompleted() throws HopException {
// Signal for the waitUntilFinished blocker.
// This will cause the wait loop in the waitUntilFinished() method to stop.
//
if (pipelineWaitUntilFinishedBlockingQueue != null) {
pipelineWaitUntilFinishedBlockingQueue.add(new Object());
}
}
/**
* Fires the start-event listeners (if any are registered).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

package org.apache.hop.pipeline.engines.local;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import org.apache.commons.lang.StringUtils;
import org.apache.hop.core.Const;
import org.apache.hop.core.IExtensionData;
Expand Down Expand Up @@ -52,14 +59,6 @@
import org.apache.hop.pipeline.transform.IRowListener;
import org.apache.hop.pipeline.transform.TransformMetaDataCombi;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;

@PipelineEnginePlugin(
id = "Local",
name = "Hop local pipeline engine",
Expand All @@ -70,6 +69,7 @@ public class LocalPipelineEngine extends Pipeline implements IPipelineEngine<Pip

private ExecutionInfoLocation executionInfoLocation;
private Timer transformExecutionInfoTimer;
private TimerTask transformExecutionInfoTimerTask;

private Map<String, List<IExecutionDataSamplerStore>> samplerStoresMap;

Expand Down Expand Up @@ -383,7 +383,7 @@ public void startTransformExecutionInfoTimer() throws HopException {

final IExecutionInfoLocation iLocation = executionInfoLocation.getExecutionInfoLocation();
//
TimerTask sampleTask =
TimerTask transformExecutionInfoTimerTask =
new TimerTask() {
@Override
public void run() {
Expand Down Expand Up @@ -429,7 +429,7 @@ public void run() {
// Schedule the task to run regularly
//
transformExecutionInfoTimer = new Timer();
transformExecutionInfoTimer.schedule(sampleTask, delay, interval);
transformExecutionInfoTimer.schedule(transformExecutionInfoTimerTask, delay, interval);
}

/**
Expand Down Expand Up @@ -463,49 +463,62 @@ public void lookupExecutionInformationLocation() throws HopException {
@Override
public void pipelineCompleted() throws HopException {
stopTransformExecutionInfoTimer();
super.pipelineCompleted();
}

public void stopTransformExecutionInfoTimer() {
try {
if (transformExecutionInfoTimer != null) {
if (transformExecutionInfoTimerTask != null) {
transformExecutionInfoTimerTask.cancel();
}
transformExecutionInfoTimer.cancel();
transformExecutionInfoTimer.purge();
transformExecutionInfoTimer = null;
}

if (executionInfoLocation == null) {
return;
}

IExecutionInfoLocation iLocation = executionInfoLocation.getExecutionInfoLocation();
String dataProfileName = resolve(pipelineRunConfiguration.getExecutionDataProfileName());
if (StringUtils.isNotEmpty(dataProfileName)) {
// Register the collected transform data for the last time
//
ExecutionDataBuilder dataBuilder =
ExecutionDataBuilder.fromAllTransformData(
LocalPipelineEngine.this, samplerStoresMap, true);
iLocation.registerData(dataBuilder.build());
}

// Register one final last state of the pipeline
//
ExecutionState executionState =
ExecutionStateBuilder.fromExecutor(LocalPipelineEngine.this, -1).build();
IPipelineEngine pipelineEngine = LocalPipelineEngine.this;

ExecutionStateBuilder stateBuilder = ExecutionStateBuilder.fromExecutor(pipelineEngine, -1);
ExecutionState executionState = stateBuilder.build();
iLocation.updateExecutionState(executionState);

// Update the state of all the transforms one final time
//
for (IEngineComponent component : getComponents()) {
ExecutionState transformState =
ExecutionStateBuilder.fromTransform(LocalPipelineEngine.this, component)
.build();
ExecutionStateBuilder.fromTransform(LocalPipelineEngine.this, component).build();
iLocation.updateExecutionState(transformState);
}

String dataProfileName = resolve(pipelineRunConfiguration.getExecutionDataProfileName());
if (StringUtils.isNotEmpty(dataProfileName)) {
// Register the collected transform data for the last time
//
ExecutionDataBuilder dataBuilder =
ExecutionDataBuilder.fromAllTransformData(
LocalPipelineEngine.this, samplerStoresMap, true);
iLocation.registerData(dataBuilder.build());
}
} catch (Throwable e) {
log.logError("Error handling writing final pipeline state to location (non-fatal)", e);
} finally {
// We're now certain all listeners fired. We can close the location.
//
executionInfoLocation.getExecutionInfoLocation().close();
} catch (Exception e) {
log.logError("Error handling writing final pipeline state to location (non-fatal)", e);
try {
executionInfoLocation.getExecutionInfoLocation().close();
} catch (Exception e) {
log.logError(
"Error closing execution information location: " + executionInfoLocation.getName(), e);
}
}
}

Expand Down

0 comments on commit 6665e1b

Please sign in to comment.