Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithread with specific chunk size the reader distributes duplicates records across threads #112

Closed
manoop opened this issue Mar 18, 2023 · 2 comments

Comments

@manoop
Copy link

manoop commented Mar 18, 2023

Using the spring batch version 4.3.7 I have created a simple application which has USER entity. The RepositoryItemReader builds the reader using the JPA repository, and the step configuration is done with Chunk size 10, input and output as the same entity object (User).

In the Processor each item (User) is set with the status='PROCESSED'.

Writer writes the entity finally to the database.
There are total of 99 records.
When the batch starts it fails to complete with the error.

Workaround: To fix this issue I tried with the stepBuilder chunk with 2 different input and output, it works fine with no stuck situation.

I have the complete code in the GitHub for review.

https://github.com/manoop/batch/tree/bugfix/thread-transaction

12:59:47.946 [taskExecutor-1] ERROR org.hibernate.engine.jdbc.spi.SqlExceptionHelper - Deadlock found when trying to get lock; try restarting transaction

BATCH CONFIG

import com.trix.man.batch.logic.DeptFilterProcessor;
import com.trix.man.batch.logic.JdbcItemReader;
import com.trix.man.batch.logic.UserDBWriter;
import com.trix.man.batch.model.User;
import com.trix.man.batch.repository.UserRepository;
import org.springframework.batch.core.*;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.*;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.data.RepositoryItemReader;
import org.springframework.batch.item.data.builder.RepositoryItemReaderBuilder;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.data.domain.Sort;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

import javax.annotation.PreDestroy;
import javax.sql.DataSource;
import java.util.*;

@Configuration
public class BatchConfig {


    @Value("${taxis.batch.chunk.size}")
    private int chunkSize;

    @Value("${taxis.batch.thread.limit}")
    private int threadSize;

    @Value("${taxis.batch.thread.core.pool.size}")
    private int corePoolSize;

    @Value("${taxis.batch.thread.max.pool.size}")
    private int maxPoolSize;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    private static final String JOB_NAME = "taxis-batch";

    @Autowired
    JobOperator jobOperator;

    @Autowired
    private JobExplorer jobs;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private DataSource dataSource;

    @Autowired private UserRepository userRepository;

    @Autowired PlatformTransactionManager jpaTransactionManager;

    @Bean
    public Job job(){
        return jobBuilderFactory.get(JOB_NAME)
            .incrementer(new RunIdIncrementer())
                .start(chunkStep(taskExecutor()))
                .build();

    }

    @Bean
    public TaskExecutor simpleTaskExecutor(){
        return new SimpleAsyncTaskExecutor("taxis_batch");
    }


    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(corePoolSize);
        pool.setMaxPoolSize(maxPoolSize);
        pool.setWaitForTasksToCompleteOnShutdown(false);
        return pool;
    }

    @PreDestroy
    public void destroy() throws NoSuchJobException, NoSuchJobExecutionException, JobExecutionNotRunningException {
        /**Graceful shutdown**/
        jobs.getJobNames().forEach(name -> System.out.println("job name: {}"+name));
        Set<Long> executions = jobOperator.getRunningExecutions(JOB_NAME);
        jobOperator.stop(executions.iterator().next());
    }

    @Scheduled(fixedRate = 500000)
    public void run() throws Exception {
        JobExecution execution = jobLauncher.run(
                job(),
                new JobParametersBuilder().addLong("uniqueness", System.nanoTime()).toJobParameters()
        );
        System.out.println("Exit status: {}"+execution.getStatus());
    }

    @Bean
    public Step chunkStep(TaskExecutor taskExecutor) {
        Step step = stepBuilderFactory.get("taxis-load")
                .transactionManager(jpaTransactionManager)
                .<User, User>chunk(chunkSize)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .listener(itemWriteListener())
                .taskExecutor(taskExecutor)
                .throttleLimit(threadSize)
                .build();
        return step;
    }

    @StepScope
    @Bean
    public ItemWriter<User> writer() {
        return new UserDBWriter(userRepository);
    }


    @Bean
    public ItemWriteListener<User> itemWriteListener(){
        return new UserItemWriteListener();
    }


    @StepScope
    @Bean
    public ItemProcessor<User, User> processor() {
        final CompositeItemProcessor<User, User> processor = new CompositeItemProcessor<>();
        processor.setDelegates(Arrays.asList(new DeptFilterProcessor()));
        return processor;
    }
    

    @Bean
    @StepScope
    public RepositoryItemReader<User> reader() {
        Map<String, Sort.Direction> sorts = new HashMap<>();
        sorts.put("id", Sort.Direction.DESC);

        List<Object> methodArgs = new ArrayList<>();
        methodArgs.add("NEW");
        RepositoryItemReader itemReader = new RepositoryItemReaderBuilder()
                .repository(userRepository)
                .methodName("findByTempstatus")
                .arguments(methodArgs)
                .pageSize(10)
                .sorts(sorts)
                .saveState(false)
                .build();
        return itemReader;
    }

}

Processor

import com.trix.man.batch.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemProcessor;


public class DeptFilterProcessor implements ItemProcessor<User, User> {

    private static final Logger LOGGER = LoggerFactory.getLogger(DeptFilterProcessor.class);

    public DeptFilterProcessor() {
    }

    @Override
    public User process(User item) throws Exception {
        item.setStatus("PROCESSED");
        LOGGER.info("Proccessed item {}", item.getId());
        return item;
    }
}

Writer

import com.trix.man.batch.model.User;
import com.trix.man.batch.repository.UserRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ItemWriter;

import java.util.List;

public class UserDBWriter implements ItemWriter<User>{
    private static final Logger LOGGER = LoggerFactory.getLogger(UserDBWriter.class);
    private final UserRepository userRepository;
    public UserDBWriter(UserRepository userRepository) {
        this.userRepository = userRepository;
    }

    @Override
    public void write(final List<? extends User> items) throws Exception {
        for (User item : items) {
            LOGGER.info(" --> Writing: {}", item.getId());
        }
        userRepository.saveAll(items);
        userRepository.flush();
    }


}
@dgray16
Copy link
Collaborator

dgray16 commented Apr 1, 2023

I think this issue should be opened here: https://github.com/spring-projects/spring-batch

@fmbenhassine
Copy link
Contributor

@manoop This issue is not related to any extension in this repository. Please open a discussion here: https://github.com/spring-projects/spring-batch/discussions. If the bug is valid, we will transform the discussion to an issue on the issue tracker. Thank you.

@fmbenhassine fmbenhassine closed this as not planned Won't fix, can't repro, duplicate, stale Apr 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants