Skip to content

Commit

Permalink
Updated reply channel in the MessageChannePartitionHandler
Browse files Browse the repository at this point in the history
When using the `MessageChannelPartitionHandler`, the reply channel is an
instance variable.  However, when the handle method is called, if a
reply channel is not injected, it will create a new one.  If multiple
jobs recycle the same instance of the partition handler, you may end up
having messages crossed/lost.  This PR removes the creation of the reply
channel for each call to the handle method and documents that this
comonent should be step scoped.

Resolves BATCH-2288
  • Loading branch information
mminella committed Oct 13, 2016
1 parent 0c46633 commit ad5348f
Showing 1 changed file with 11 additions and 8 deletions.
Expand Up @@ -54,6 +54,10 @@
* repository, we can poll the store to determine the state without the need of the slaves to formally respond.</li>
* </ul>
*
* Note: The reply channel for this is instance based. Sharing this component across
* multiple step instances may result in the crossing of messages. It's recommended that
* this component be step or job scoped.
*
* @author Dave Syer
* @author Will Schipp
* @author Michael Minella
Expand Down Expand Up @@ -102,6 +106,11 @@ public void afterPropertiesSet() throws Exception {
jobExplorerFactoryBean.afterPropertiesSet();
jobExplorer = jobExplorerFactoryBean.getObject();
}

if (!pollRepositoryForResults && replyChannel == null) {
replyChannel = new QueueChannel();
}//end if

}

/**
Expand Down Expand Up @@ -209,23 +218,17 @@ public Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplit

int count = 0;

PollableChannel currentReplyChannel = replyChannel;

if (!pollRepositoryForResults && currentReplyChannel == null) {
currentReplyChannel = new QueueChannel();
}//end if

for (StepExecution stepExecution : split) {
Message<StepExecutionRequest> request = createMessage(count++, split.size(), new StepExecutionRequest(
stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), currentReplyChannel);
stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel);
if (logger.isDebugEnabled()) {
logger.debug("Sending request: " + request);
}
messagingGateway.send(request);
}

if(!pollRepositoryForResults) {
return receiveReplies(currentReplyChannel);
return receiveReplies(replyChannel);
}
else {
return pollReplies(masterStepExecution, split);
Expand Down

0 comments on commit ad5348f

Please sign in to comment.