From ad5348f32d930904ea8df4f643970b1b633df445 Mon Sep 17 00:00:00 2001 From: Michael Minella Date: Thu, 13 Oct 2016 12:42:39 -0500 Subject: [PATCH] Updated reply channel in the MessageChannePartitionHandler When using the `MessageChannelPartitionHandler`, the reply channel is an instance variable. However, when the handle method is called, if a reply channel is not injected, it will create a new one. If multiple jobs recycle the same instance of the partition handler, you may end up having messages crossed/lost. This PR removes the creation of the reply channel for each call to the handle method and documents that this comonent should be step scoped. Resolves BATCH-2288 --- .../MessageChannelPartitionHandler.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java index a971a0ce5d..db7356fe07 100644 --- a/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java +++ b/spring-batch-integration/src/main/java/org/springframework/batch/integration/partition/MessageChannelPartitionHandler.java @@ -54,6 +54,10 @@ * repository, we can poll the store to determine the state without the need of the slaves to formally respond. * * + * Note: The reply channel for this is instance based. Sharing this component across + * multiple step instances may result in the crossing of messages. It's recommended that + * this component be step or job scoped. + * * @author Dave Syer * @author Will Schipp * @author Michael Minella @@ -102,6 +106,11 @@ public void afterPropertiesSet() throws Exception { jobExplorerFactoryBean.afterPropertiesSet(); jobExplorer = jobExplorerFactoryBean.getObject(); } + + if (!pollRepositoryForResults && replyChannel == null) { + replyChannel = new QueueChannel(); + }//end if + } /** @@ -209,15 +218,9 @@ public Collection handle(StepExecutionSplitter stepExecutionSplit int count = 0; - PollableChannel currentReplyChannel = replyChannel; - - if (!pollRepositoryForResults && currentReplyChannel == null) { - currentReplyChannel = new QueueChannel(); - }//end if - for (StepExecution stepExecution : split) { Message request = createMessage(count++, split.size(), new StepExecutionRequest( - stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), currentReplyChannel); + stepName, stepExecution.getJobExecutionId(), stepExecution.getId()), replyChannel); if (logger.isDebugEnabled()) { logger.debug("Sending request: " + request); } @@ -225,7 +228,7 @@ public Collection handle(StepExecutionSplitter stepExecutionSplit } if(!pollRepositoryForResults) { - return receiveReplies(currentReplyChannel); + return receiveReplies(replyChannel); } else { return pollReplies(masterStepExecution, split);