Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-Threaded Job PR Suggestion #300

Closed
ipropper opened this issue Oct 26, 2017 · 15 comments
Closed

Multi-Threaded Job PR Suggestion #300

ipropper opened this issue Oct 26, 2017 · 15 comments
Labels

Comments

@ipropper
Copy link
Contributor

ipropper commented Oct 26, 2017

Hi,

in reference to #299

May I suggest an alternative to the fork-join method? These are the issues I have been having with the fork-join method:

  1. It breaks up your job into multiple jobs.

  2. It does not have the constant memory guarantee of a single batch job. A single batch job will only process a batch size of N, so you will only have N records in memory at any given time. With the fork join model, the memory grows somewhat unexpectedly. If your fork threads are much slower than your file reading thread (as is the case for me) then the memory can grow very fast. I ran the fork-join tutorial with a million line file, changing the following code:

private static Job buildForkJob(String jobName, File tweets, List<BlockingQueue<Record>> workQueues)
            throws FileNotFoundException {
        return aNewJob()
                .named(jobName)
                .reader(new FlatFileRecordReader(tweets))
                .writer(new RoundRobinBlockingQueueRecordWriter(workQueues))
                .jobListener(new PoisonRecordBroadcaster(workQueues))
                .build();
    }

    private static Job buildWorkerJob(String jobName, BlockingQueue<Record> workQueue, BlockingQueue<Record> joinQueue) {
        return aNewJob()
                .named(jobName)
                .reader(new BlockingQueueRecordReader(workQueue))
                .processor(new TweetProcessor(jobName))
                .processor(x -> {
                    Thread.sleep(1000);
                    return x;
                })
                .writer(new BlockingQueueRecordWriter(joinQueue))
                .build();
    }

    private static Job buildJoinJob(String jobName, File out, BlockingQueue<Record> joinQueue) throws IOException {
        return aNewJob()
                .named(jobName)
                .reader(new BlockingQueueRecordReader(joinQueue, NB_WORKERS))
                .filter(new PoisonRecordFilter())
                .writer(new FileRecordWriter(out))
                .build();
    }

image

I stopped the process early, but as you can see the memory kept growing. I would not expect a 100 size batch to use 1.5 GB of data!

To solve this issue, I would like to either create a new job type or update the existing BatchJob class.

The current BatchJob code reads as follows:

while (moreRecords() && !isInterrupted()) {
                Batch batch = readAndProcessBatch();
                writeBatch(batch);
            }

and I would like to implement something more like this (pseudo code):

 while (moreRecords() && !isInterrupted()) {
                Batch batch = readBatch();
                for(RecordProcessor processor : recordProcessorList){
                    if(BatchProcessor.class.isAssignableFrom(processor.class)){
                        batch = (BatchProcessor) processor.processBatch(batch);
                    }
                    else {
                        Batch newBatch = new Batch();
                        for(Record record : batch){
                            newBatch.addRecord(processor.processRecord(record));
                        }
                        batch = newBatch;
                    }
                }
                writeBatch(batch);
            }

The BatchProcessor class allows a processor to run on a batch of records. With this I could create a multiThreadedProccessor that can run on batches. I think this code could provide the following:

  • allow for a new type of processor that can process batches instead of individual records
  • keep constant memory
  • keep all multi threaded code within the same job
  • keep records in a consistent order

Do you think this is a bad idea? Are there any major issues that I am not addressing? If you think this is a good idea, may I attempt a PR?

@ipropper
Copy link
Contributor Author

ipropper commented Oct 27, 2017

I think I could also update the queue writers in fork-join so they block if the queue reaches a certain size, but I don't think that addresses these issues:

  • allow for a new type of processor that can process batches instead of individual records
  • keep all multi threaded code within the same job
  • keep records in a consistent order

@fmbenhassine
Copy link
Member

Hi @ipropper

Thank you for this analysis! I didn't have time yet to look at this issue but I will definitely try to do it this weekend and get back to you asap.

Kr
Mahmoud

@fmbenhassine
Copy link
Member

Hi @ipropper

It breaks up your job into multiple jobs.

Correct. I thought it would be good to have each part of the process as a basic job and then compose them to create a fork/join job. But this is probably not a good decision. I like the approch of @DanieleS82 in #299 with a template method. It would be great to have three Job implementations in easy batch: BatchJob, ForkJoinJob, SingleTaskJob and probably other implementations for common use cases.

If your fork threads are much slower than your file reading thread (as is the case for me) then the memory can grow very fast

Indeed. There is no mecanism to handle back pressure in easy batch. But I was thinking of a ThrottlingRecordReader like in #290 . What do you think?

To solve this issue, I would like to either create a new job type or update the existing BatchJob class.

I would go for creating a new job type as said previously, something like ForkJoinJob.

I would like to implement something more like this (pseudo code):
The BatchProcessor class allows a processor to run on a batch of records. With this I could create a multiThreadedProccessor that can run on batches

That was the case in v4 where all the pipeline was operating on batches and not records. But this approach has many issues (see change log of v5 and especially #211 ).

I think I could also update the queue writers in fork-join so they block if the queue reaches a certain size

I'm not sure I understand this point. The writers can continuously take elements from the queue and write them. By design we don't want them to block when the queue is full (or reaches a certain size). In contrast, they will block when the queue is empty. And that is the reason why we use a BlockingQueue in the example. Sorry if I didn't get the point.

I hope I answered all your questions 😄

Kr
Mahmoud

@ipropper
Copy link
Contributor Author

ipropper commented Nov 1, 2017

Hi,

@benas Thanks for getting back to me, you're always prompt with responses!

Correct. I thought it would be good to have each part of the process as a basic job and then compose them to create a fork/join job. But this is probably not a good decision. I like the approch of @DanieleS82 in #299 with a template method. It would be great to have three Job implementations in easy batch: BatchJob, ForkJoinJob, SingleTaskJob and probably other implementations for common use cases.

Am I correct in assuming the batch job use case is the one I proposed? If so, I would like to take a crack at it and send you a PR.

I also implemented #299 in my project, so I definitely see the value 😄 . In my impl I abstracted the queues away from the user (for better or worse). If you want me to look at #299 and suggest some edits I would be happy to do so.

Indeed. There is no mechanism to handle back pressure in easy batch. But I was thinking of a
ThrottlingRecordReader like in #290 . What do you think?

This sounds good to me, however I am not sure this meets my use case, I will elaborate at the bottom.

I'm not sure I understand this point. The writers can continuously take elements from the queue and write them. By design we don't want them to block when the queue is full (or reaches a certain size). In contrast, they will block when the queue is empty. And that is the reason why we use a BlockingQueue in the example. Sorry if I didn't get the point.

Yeah let me show you a quick code snippet (pseudo code)

    @Override
    public void writeRecords(Batch batch) throws Exception {
        //write records to queues in round-robin fashion
       int batch_size = batch.size()
        for (Record record : batch) {
            BlockingQueue<Record> queue = queues.get(nextQueue++ % queuesNumber);
            while(queue.size() >= batch_size){
                       thread.sleep(5);
            }
            queue.put(record);
        }
    }

The while loop, and batch size are the only new code added.

I added this because I believe batches are used to have only a few records in memory at a time instead of an entire file. With the original blockingQueue writer it was possible to have most of the file in memory because the queues were unbounded. If the whole file could be stored in a blockingQueue, why have batch sizes?

Throttling the read like in issue #290 might alleviate this stress, but it does not solve it. It creates a magic number situation. We need to play with duration to get a non-stressed solution. A high duration might be good, however if one day the 3rd party processes records quickly, we loose performance. On the flip side a low duration might lead to too many records in memory (say the 3rd party was particularly slow that day). The fixed size queue means that new records are only being processed as others finish. Additionally, it can be used in conjunction with throttling. Do you still feel this is a bad idea?

@fmbenhassine
Copy link
Member

Am I correct in assuming the batch job use case is the one I proposed?

Not fully, I was referring the current BatchJob class which will be made public.
The approach you proposed is the workflow of v4. As said previously, this was abandoned.

I also implemented #299 in my project, so I definitely see the value 😄 . In my impl I abstracted the queues away from the user (for better or worse).

Great! In hindsight, my approach of implementing fork/join using multiple jobs in not correct. Because as a user, I want my data source to be processed in parallel transparently and have one job report in the end. So it should be a single job, not multiple ones.

The idea of template method is the way to go. Unfortunately, the implementation in #299 was inspired by the tutorial which uses multiples jobs. There is nothing wrong with it, but it uses multiple jobs.

If you have an alternative (that you implemented in your project), please open a separate PR.

Yeah let me show you a quick code snippet

I see. But I think we are talking about different topologies.
For me, a multi-threaded job would be something like this:

multi-threaded-job

Reading and Writing data is done in serial, processing is done in parallel by multiple threads. records dispatching and collecting should be abstracted from the user and the degree of parallelism should a parameter:

Job job = new MultiThreadedJobBuilder()
   .reader(new FlatFileRecordReader("input.txt"))
   .processor(new MyProcessor1())
   .processor(new MyProcessor2())
   .writer(new FileRecordWriter("output.txt"))
   .degreeOfParallelism(4)
   .build();

Of course, processors should be thread-safe.
In this case, even when the reader is faster than processors, the blocking reading queue will handle the pressure. Same for writing data, when a batch is ready to be written, the job implementation delegates writing to the writer.

Do you see?

Kr
Mahmoud

@ipropper
Copy link
Contributor Author

ipropper commented Nov 5, 2017

I like this API:

Job job = new MultiThreadedJobBuilder()
   .reader(new FlatFileRecordReader("input.txt"))
   .processor(new MyProcessor1())
   .processor(new MyProcessor2())
   .writer(new FileRecordWriter("output.txt"))
   .degreeOfParallelism(4)
   .build();

How would you feel about the reader reading batchSize records and then blocking until the writer has received (or written) that batch? This way the model can still work as you suggested, and the records can preserve their original ordering (once batch size is processed, sort the batch and send to the writer). Preserving order could be a boolean flag.

@fmbenhassine
Copy link
Member

fmbenhassine commented Nov 5, 2017

How would you feel about the reader reading batchSize records and then blocking until the writer has received that batch?

Again this is the behaviour of v4:

batch-processing-v4

The reader will read batchSize records and then block until the writer has received that batch.
You can still use v4 if you want this behaviour.

@ipropper
Copy link
Contributor Author

ipropper commented Nov 5, 2017

I am sorry that I am being annoying, but v4 is not the master branch. Therefore, support for bugs etc. is less likely.

All I am hoping to do is to contribute something to the easy-batch that:

  1. keeps ordering of records
  2. has constant memory
  3. is multithreaded

The current fork-join implementation does not do 1 and 2 because the BlockingQueue class does not keep constant memory, and multi-threaded jobs move records out of order (outlined in the first post) . I believe I can produce something that does 1,2 and 3 using your model:

https://user-images.githubusercontent.com/1210553/32417196-0028ccfe-c256-11e7-9e0c-c600fd7a14f8.png

and this API:

Job job = new MultiThreadedJobBuilder()
   .reader(new FlatFileRecordReader("input.txt"))
   .processor(new MyProcessor1())
   .processor(new MyProcessor2())
   .writer(new FileRecordWriter("output.txt"))
   .degreeOfParallelism(4)
   .build();

I would be happy to create a version of this API that does not maintain order and does not keep fixed memory (i.e. the readerQueue only blocks when syncing threads) . If you think that contribution is worth while, let me know. Otherwise close this thread so I stop bugging you 😄

@fmbenhassine
Copy link
Member

fmbenhassine commented Nov 5, 2017

I am sorry that I am being annoying

No, you are not annoying, you are spending your time helping me. And thank you!

v4 is not the master branch. Therefore, support for bugs etc. is less likely.

I do confirm

keeps ordering of records

Keeping record ordering makes no sense in a parallel setup.
This is not the goal in first place.
We can't decide to go parallel if there is a relationship between records

I would be happy to create versions of this API that do not maintain order and do not keep fixed memory (i.e. the readerQueue does not block ever)

You are welcome!

But I think we still need to use a bounded reading queue. Or else we will end up in having the whole data source in memory..

Also watch out for this tricky case : https://github.com/j-easy/easy-batch/wiki/readers#jdbcrecord-caveat

Jdbc records cannot be distributed to workers.. Since their payload depends on the connection (which may be closed before records are processed).

I also implemented #299 in my project, so I definitely see the value 😄 . In my impl I abstracted the queues away from the user (for better or worse).

Are you willing to share this in a gist? I'm curious 😄

@ipropper
Copy link
Contributor Author

ipropper commented Nov 5, 2017

Hi yes this is what I had, but i don't think this is the way to go. Keep in mind this gist does not compile, I had to abstract some of my custom code.

https://gist.github.com/ipropper/da22825e33c0635dbc8acdab12f4e7a5

The API you mentioned is much better. I will get working on something like your API.

@fmbenhassine fmbenhassine added this to the v5.3.0 milestone Nov 8, 2017
@fmbenhassine
Copy link
Member

Great! let me know if you have a working prototype.

I will also try to come up with something in the meantime.

I'm releasing v5.2 in the coming few days, so let me plan this for v5.3.

@DanieleS82
Copy link
Contributor

Hi,
about this topic, I have a doubt about. JobBuilder already allow to specify more then one "processor" element (or others types) and execute them in insert order. How change this behavior writing MultiThreadedJobBuilder how you suggested? It's not equal to solution we saw in #299 ?
Thinking a solution, I'll suppose a component that internally start X thread and each thread execute the same "processor" (or other type) passed in costruction phase.
It's only an high level idea, but what do you think?

@ipropper
Copy link
Contributor Author

ipropper commented Nov 9, 2017

Hi, This is a prototype of the muti-threaded batch:

#305

It hasn't been fully tested, the base idea:

  • MultiThreadedBatch extends the Batch class, only overriding the readAndProcess() function
  • the metrics and job report classes are made thread safe so they can work with single and multi-threaded batches.

What are your thoughts on this model? I can totally decouple multi-threaded and single threaded batch, or I could also build multithreading directly into the batchJob class so there is no need for a multithreaded batch class.

Additionally, reading happens separately from processing records, but i could change that to make it happen simultaneously.

@fmbenhassine
Copy link
Member

fmbenhassine commented Nov 27, 2017

@ipropper

What are your thoughts on this model?

I've added a couple of notes on this in your PR.


@DanieleS82

I have a doubt about. JobBuilder already allow to specify more then one "processor" element (or others types) and execute them in insert order. How change this behavior writing MultiThreadedJobBuilder how you suggested? It's not equal to solution we saw in #299 ?

Excellent point! The whole pipeline should be executed in parallel, as shown in the previous diagram.

Internally, a CompositeRecordProcessor is used. This component is what should be executed in parallel. It is very important to have the same result as BatchJob when the degree of parallelism is equal to 1.

As a user, when I specify my logic, executing the job in serial or parallel should yield the same result. Only the speed might be better with a parallel version (which is not always the case BTW).

Think of it like parallel streams in java 8. Using collection.stream().. should give the same result as collection.parallelStream()..

Thinking a solution, I'll suppose a component that internally start X thread and each thread execute the same "processor" (or other type) passed in costruction phase.
It's only an high level idea, but what do you think?

Yes, each thread should execute the whole pipeline. This is also important for the semantics of PipelineListener, which should be invoked each time the pipeline is executed (so as many times as each thread executes the pipeline)

@fmbenhassine fmbenhassine removed this from the v5.3.0 milestone Jan 5, 2020
@fmbenhassine
Copy link
Member

I was re-reading this thread and found it really interesting! Many thanks to all for sharing these nice ideas 👍 I have already explained my point of view in the previous messages, so I'm not going to repeat it here.

In hindsight, I would not introduce a multi-threaded job implementation to avoid all the joy of thread-safety and what not.. The implementation complexity and the maintenance burden are higher than the added value of this feature. I do believe Easy Batch jobs are lightweight Callable objects that can be composed to create more complex workflows (with parallel jobs, loops, conditionals, etc) either manually (like shown in the tutorials) or using a workflow engine like Easy Flows.

This feature can be implemented in a fork if needed. OSS FTW!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

3 participants