Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BATCH-2686: add builders for master/worker beans in remote chunking #599

Closed
wants to merge 1 commit into from

Conversation

fmbenhassine
Copy link
Contributor

This PR resolves BATCH-2686.

Copy link
Member

@mminella mminella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial thoughts on the PR. I know we're going to discuss this tomorrow but I figured I'd share my gut reaction first.

private String inputChannelName = INPUT_CHANNEL_DEFAULT_NAME;
private String outputChannelName = OUTPUT_CHANNEL_DEFAULT_NAME;

public RemoteChunkingMasterStepBuilder<I, O> stepBuilder(SimpleStepBuilder<I, O> stepBuilder) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Method level javadoc
  • This feels clunky. Requiring a half built builder to be passed to another builder just doesn't feel right. I realize it's what I said in our conversation before so don't hate me for this comment ;) I'm wondering if there is some way to extend the existing builders and have our logic flow into the existing stuff without having to copy the entire set of builders used to construct a step.

return this;
}

public RemoteChunkingMasterStepBuilder<I, O> inputChannelName(String inputChannelName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

return this;
}

public RemoteChunkingMasterStepBuilder<I, O> outputChannelName(String outputChannelName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

return this;
}

public TaskletStep build() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

applicationContext.registerBeanDefinition(outputChannelName, repliesBeanDefinition);
}

MessagingTemplate messagingTemplate = new MessagingTemplate();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to allow users to inject their own MessagingTemplate. There are a lot of options that can be configured on it that we don't want to take away from a user.

return this;
}

public RemoteChunkingSlaveStepBuilder<I, O> itemWriter(ItemWriter<O> itemWriter) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

return this;
}

public RemoteChunkingSlaveStepBuilder<I, O> inputChannelName(String inputChannelName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

return this;
}

public RemoteChunkingSlaveStepBuilder<I, O> outputChannelName(String outputChannelName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

return this;
}

public RemoteChunkingSlaveStepBuilder<I, O> applicationContext(GenericApplicationContext applicationContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method level javadoc

* @since 4.1
* @author Mahmoud Ben Hassine
*/
public class RemoteChunkingSlaveStepBuilder<I, O> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this a worker step instead of a slave step?

@fmbenhassine fmbenhassine force-pushed the BATCH-2686 branch 7 times, most recently from 652c73c to 3e5bb95 Compare April 27, 2018 07:56
@fmbenhassine fmbenhassine changed the title BATCH-2686: add builders for master/slave steps in remote chunking BATCH-2686: add builders for master/slave beans in remote chunking Apr 27, 2018
@fmbenhassine fmbenhassine force-pushed the BATCH-2686 branch 3 times, most recently from 3a9d8d9 to 429f567 Compare April 30, 2018 14:18
@fmbenhassine fmbenhassine changed the title BATCH-2686: add builders for master/slave beans in remote chunking BATCH-2686: add builders for master/worker beans in remote chunking Apr 30, 2018
Copy link
Member

@mminella mminella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some minor updates. Also, please be sure to use this. when referring to fields (the use of that paradigm in this PR is inconsistent).

*/
public TaskletStep build() {

Assert.notNull(applicationContext, "A GenericApplicationContext must be provided");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no assertions on the channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

private DirectChannel outputChannel;

private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;
private static final long DEFAULT_THROTTLE_LIMIT = 6;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do these defaults come from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken from ChunkMessageChannelItemWriter

* @author Mahmoud Ben Hassine
*/

public class RemoteChunkingWorkerFlowBuilder<I, O> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to leave the word Flow out of the name of this class since a Flow in Spring Batch means something different from what we're building here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


@Bean
public IntegrationFlow integrationFlow(DirectChannel requests, DirectChannel replies) {
return workerFlowBuilder
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;

@Ignore("This test is ignored because a worker JVM should be running to process chunks")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we launch the other JVM?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can launch the other JVM but the issue is when to stop it? Since on the worker side we don't have a job/step, I could not use a listener for example to stop the JVM. It's technically possible to stop the JVM once all chunks received from the master are processed, but this technical code will "pollute" the example and may disturb the user from the main purpose of the sample.

I wrote a unit test on the master side to be consistent with other samples. But AFAIK, this is the first sample requiring two JVMs (the amqp sample does not wait for the other jvm, it just sends some messages to a queue and dies). Since it's a sample, it's probably better to write a main method on the master side too and remove this unit test. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd use @Before and @After to manage the worker. That keeps the management of the other JVMs separate from the test code.

* @since 4.1
* @author Mahmoud Ben Hassine
*/
public class RemoteChunkingMasterStepBuilderFactory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason to expose this or should we make this an inner class to the BatchIntegrationConfiguration?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used the same approach as StepBuilderFactory and JobBuilderFactory which are public. It could be an inner class of BatchIntegrationConfiguration, but in this case, when we autowire the beans, we would write something like:

@Autowired
private BatchIntegrationConfiguration.RemoteChunkingMasterStepBuilderFactory factory;

IMO, the user has not to know about the "internal" BatchIntegrationConfiguration (even if it's public) just like AbstractBatchConfiguration and its subclasses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough

import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.jsr.RetryListener;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong import I believe...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used in overridden method:

@Override
public RemoteChunkingMasterStepBuilder<I, O> listener(RetryListener listener) {
	super.listener(listener);
	return this;
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove that method. It's only used internally for our JSR implementation. JSR-352 does not support remote chunking or java configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I removed the method.

public BatchIntegrationConfiguration(
JobRepository jobRepository,
PlatformTransactionManager transactionManager,
GenericApplicationContext applicationContext) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous comment about BeanDefinitionRegistry.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

@mminella mminella May 2, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code here says otherwise... It looks like you fixed one spot and not the other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed, my mistake, sorry for that.

* }
*
* }
* </pre>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nicely done javadoc

*/
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = {RemoteChunkingMasterStepBuilderTest.BatchConfiguration.class})
public class RemoteChunkingMasterStepBuilderTest {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate to do this, but we'll need coverage for the dummy setters to make sure they are still there and that they don't modify the results...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought you were talking about setters of the current class: maxWaitTimeout, throttleLimit (not the ones overridden). So by fixed I meant here: ba2534e

Now for "dummy" setters, I added a test to cover them here: 61737be

import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.util.Assert;

import java.util.function.Function;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import statements are in the wrong order. You can configure this in your IDE. I can fix on merge. For record: https://github.com/spring-projects/spring-framework/wiki/Code-Style#import-statements

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 8faecf4

*/
public IntegrationFlow build() {

Assert.notNull(itemWriter, "An ItemWriter must be provided");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why no assertions on the channels?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 10271e1


@Bean
public RemoteChunkingMasterStepBuilderFactory remoteChunkingMasterStepBuilderFactory() {
return new RemoteChunkingMasterStepBuilderFactory(jobRepository, transactionManager, beanDefinitionRegistry);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe I made the comment before, please be sure to use this. when referring to class level fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added "this" keyword where I thought is appropriate in 0b6d812. But looks like you meant use the keyword systematically for every class level field. I don't see we use this convention in StepBuilderFactory and JobBuilderFactory for example. But I fixed this in 8fbc847

* @author Mahmoud Ben Hassine
*/
@Configuration
public class BatchIntegrationConfiguration implements ApplicationContextAware {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we've changed from an ApplicationContext to a BeanDefinitionRegistry, we probably shouldn't use ApplicationContextAware and it's mechanism for injection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In hindsight, we don't really need to register the RemoteChunkHandlerFactoryBean as a bean in the application context or bean definition registry. It is only required to have a ChunkProcessorChunkHandler which is already a SI message endpoint acting as a service activator (through annotations). So I used the RemoteChunkHandlerFactoryBean to create a ChunkProcessorChunkHandler and removed the dependency to the BeanDefinitionRegistry. Less is more!

Fixed in 94bd4d1.

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See previous note on import order. I can fix on merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 8faecf4

import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.transaction.PlatformTransactionManager;

import java.util.Arrays;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 8faecf4

import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.util.Arrays;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import order

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 8faecf4

@Before
public void setUp() {
workerApplicationContext = new AnnotationConfigApplicationContext(WorkerConfiguration.class);
System.out.println("Started a worker..");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should remove system.outs...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that's fine for the sample (not production code). Removed in 83d43d6


@After
public void tearDown() {
System.out.println("Stopping the worker..");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again the system.outs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed in 83d43d6

@ContextConfiguration(classes = {
JobRunnerConfiguration.class,
RemoteChunkingJobFunctionalTests.MasterConfiguration.class})
public class RemoteChunkingJobFunctionalTests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stupid question, but since this is all within the same JVM now...does it make sense to use a broker that can be bootstrapped within the test? Something like ActiveMQ (I believe we use that elsewhere in tests)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I updated the sample to use an embedded broker. See f21dbd5

public <I, O> RemoteChunkingMasterStepBuilder<I, O> get(String name) {
return new RemoteChunkingMasterStepBuilder<I, O>(name)
.repository(this.jobRepository)
.transactionManager(this.transactionManager);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking to this peace of code, I would say that I don't need this factory, the simple RemoteChunkingMasterStepBuilder is enough for me, since I really can inject any possible jobRepository and transactionManager.

However I guess I can override them since you return for me that RemoteChunkingMasterStepBuilder.
But again: I don't see too match benefit of this factory.

Just IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the beginning, we had only the builders. But then we thought it is better to add this factory to automatically set the job repository and the transaction manager (just like the StepBuilderFactory) and free the user from do it.

* private RemoteChunkingMasterStepBuilderFactory masterBuilderFactory;
*
* &#064;Autowired
* private RemoteChunkingWorkerBuilder workerBuilder;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, indeed, looking to this sample, it really would feel better to have RemoteChunkingMasterStepBuilder and RemoteChunkingWorkerBuilder - transparent and intuitive.
Otherwise if I would inject RemoteChunkingMasterStepBuilderFactory, I would expect similar for the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, there is a factory on the master side but not on the worker side.. Because the worker side does not know anything about SB artefacts (step, job repository, etc) except a dumb item processor and writer.

.itemProcessor(itemProcessor())
.itemWriter(itemWriter())
.inputChannel(requests) // requests received from the master
.outputChannel(replies) // replies sent to the master
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you need to add explain somewhere that all these channels must be configured as well.
We are in Spring Batch: that might not be so obvious for end-user what to do for Spring Integration side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does end-user have to provide writer, reader and processor?
I thought in this master-worker pattern everything is done by the RemoteChunkProcessor...

Thanks

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Never mind I see: itemReader is a source of data for this job - can be DB, file or any thing else.
The itemProcessor on the worker side is indeed the part we handle an item.

I can understand that itemWriter is for sinking data (e.g. DB, file etc.), but I can't understand where is the correlation with the replies? Do we send on our own after successful sinking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is the contract between the user and SB to provide at least where to read data from and where to write it (as well as the business logic of how to optionally process it). So I think this is the minimum minimorum of what the user has to provide. As a first step now, the user has also to provide SI channels and Adapters.

  • For channels, we tried to create them automatically in the beginning but we faced some lifecycle issues and we decided to leave them for the user as a first step.
  • For adapters, we can't take them form the user because we don't know upfront which middleware (s)he is using (jms, amqp, etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we send on our own after successful sinking?

The worker uses the ItemWriter to write data directly to the sink and the ChunkProcessorChunkHandler to send back instances of ChunkResponse (see return type of handkeChunk method) to the master.

Copy link
Member

@artembilan artembilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @benas , for your answers!
Now things are much cleaner for me.

So, I retract my objections and approve PR.

I think you with Michael still will find what to discuss here, but that's already out of my knowledge. From Spring Integration perspective everything is fine.

This commit adds a new annotation `@EnableBatchIntegration` that makes
it possible to autowire two beans in the application context:

* RemoteChunkingMasterStepBuilderFactory: used to create a master step
* RemoteChunkingWorkerBuilder: used to create a worker integration flow

The goal of these new APIs is to simplify the setup of remote chunking
job by automatically configuring infrastructure beans.

Resolves BATCH-2686
@fmbenhassine
Copy link
Contributor Author

Hi @mminella ,

This PR has been rebased on the latest master and updated with correct example of remote chunking.

Kr,
Mahmoud

Copy link
Member

@mminella mminella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll address these minor items as I merge.

@@ -944,6 +938,88 @@ when it receives chunks from the master.
For more information, see the section of the "Scalability" chapter on
link:$$http://docs.spring.io/spring-batch/reference/html/scalability.html#remoteChunking$$[Remote Chunking].

Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration`
annotation that can be used to simplify remote chunking setup. This annotation provides
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"simplify a remote chunking step"

* the output channel ("Outgoing requests") to send requests to workers
* the input channel ("Incoming replies") to receive replies from workers

There is no need anymore to explicitly configure the `ChunkMessageChannelItemWriter`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be worded so that in future versions it still sounds accurate. Because of this, it shouldn't have references to before and after the feature was added.

with the configured `ItemProcessor` and `ItemWriter`
* send replies on the output channel ("Outgoing replies") to the master

There is no need anymore to explicitly configure the `SimpleChunkProcessor`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above about the before/after

[source, java]
----
@EnableBatchIntegration
public class RemoteChunkingJobConfiguration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add @EnableBatchProcessing so that it's clear that both are required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indeed.

* <pre class="code">
* &#064;Configuration
* &#064;EnableBatchIntegration
* public class RemoteChunkingAppConfig {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add the @EnableBatchProcessing here since this by itself would fail (no transaction manager or job repository), correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.

.build();

// then
Assert.assertNotNull(taskletStep);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because the step was created doesn't mean that all the values passed above were set correctly. We need to assert each one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I thought the goal was just to make sure these setters are called somewhere so that they are covered (in case we remove on of them accidentally). I was too pragmatic there 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm almost done refactoring the test for this and will merge once I'm done.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants