Skip to content

Commit

Permalink
async processing
Browse files Browse the repository at this point in the history
  • Loading branch information
sungil-yu committed May 28, 2023
1 parent 777ca90 commit accf454
Show file tree
Hide file tree
Showing 11 changed files with 248 additions and 23 deletions.
1 change: 1 addition & 0 deletions exampleBatch/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation 'org.springframework.boot:spring-boot-starter-batch'
implementation 'org.springframework.boot:spring-boot-starter-data-jdbc'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'org.springframework.batch:spring-batch-integration'
compileOnly 'org.projectlombok:lombok'
runtimeOnly 'com.h2database:h2'
runtimeOnly 'com.mysql:mysql-connector-j'
Expand Down
6 changes: 6 additions & 0 deletions exampleBatch/output/2023년_5월_일별_주문_금액.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
total_amount, date
10000000,2023-05-01
2000000000,2023-05-02
3000000000,2023-05-03
5000000000,2023-05-04
10000000000,2023-05-05
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,28 @@
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@SpringBootApplication
@EnableBatchProcessing
public class ExampleBatchApplication {
public static void main(String[] args) {
SpringApplication.run(ExampleBatchApplication.class, args);
System.exit(SpringApplication.exit(SpringApplication.run(ExampleBatchApplication.class, args)));
}

@Bean
@Primary
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(10);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setThreadNamePrefix("batch-thread-");
taskExecutor.initialize();
return taskExecutor;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.springframework.batch.core.JobExecutionListener;

import java.time.LocalDate;
import java.util.Collection;
import java.util.List;

@RequiredArgsConstructor
Expand All @@ -21,7 +22,8 @@ public void beforeJob(JobExecution jobExecution) {
@Override
public void afterJob(JobExecution jobExecution) {

List<User> users = userRepository.findAllByUpdatedDate(LocalDate.now());

Collection<User> users = userRepository.findAllByUpdatedDate(LocalDate.now());

long time = jobExecution.getEndTime().getTime() - jobExecution.getStartTime().getTime();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@RequiredArgsConstructor
public class SaveUserTasklet implements Tasklet {

private final int SIZE = 100;
private final int SIZE = 10000;
private final UserRepository userRepository;

@Override
Expand All @@ -37,7 +37,7 @@ private List<User> createUsers() {
users.add(User.builder()
.orders(Collections.singletonList(Orders.builder()
.amount(1_000)
.createdDate(LocalDate.of(2020, 11, 1))
.createdDate(LocalDate.of(2023, 5, 1))
.build()))
.name("test username" + i)
.build());
Expand All @@ -47,7 +47,7 @@ private List<User> createUsers() {
users.add(User.builder()
.orders(Collections.singletonList(Orders.builder()
.amount(200_000)
.createdDate(LocalDate.of(2020, 11, 2))
.createdDate(LocalDate.of(2023, 5, 2))
.build()))
.name("test username" + i)
.build());
Expand All @@ -57,7 +57,7 @@ private List<User> createUsers() {
users.add(User.builder()
.orders(Collections.singletonList(Orders.builder()
.amount(300_000)
.createdDate(LocalDate.of(2020, 11, 3))
.createdDate(LocalDate.of(2023, 5, 3))
.build()))
.name("test username" + i)
.build());
Expand All @@ -67,7 +67,7 @@ private List<User> createUsers() {
users.add(User.builder()
.orders(Collections.singletonList(Orders.builder()
.amount(500_000)
.createdDate(LocalDate.of(2020, 11, 4))
.createdDate(LocalDate.of(2023, 5, 4))
.build()))
.name("test username" + i)
.build());
Expand All @@ -77,7 +77,7 @@ private List<User> createUsers() {
users.add(User.builder()
.orders(Collections.singletonList(Orders.builder()
.amount(1_000_000)
.createdDate(LocalDate.of(2020, 12, 5))
.createdDate(LocalDate.of(2023, 5, 5))
.build()))
.name("test username" + i)
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class User {
@Enumerated(javax.persistence.EnumType.STRING)
private Level level = Level.NORMAL;

@OneToMany(cascade = CascadeType.PERSIST)
@OneToMany(cascade = CascadeType.PERSIST, fetch = FetchType.EAGER)
@JoinColumn(name = "user_id")
private List<Orders> orders = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,13 @@ public class UserConfig {
private final UserRepository userRepository;
private final EntityManagerFactory entityManagerFactory;
private final DataSource dataSource;
private final int CHUNKSIZE = 1000;

private final int CHUNKSIZE = 100;
private final String JOB_NAME = "userJob";

@Bean
@Bean(JOB_NAME)
public Job userJob() throws Exception {
return this.jobBuilderFactory.get("userJob")
return this.jobBuilderFactory.get(JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(this.saveUserStep())
.next(this.userLevelUpStep())
Expand All @@ -63,10 +64,10 @@ public Job userJob() throws Exception {
.build();
}

@Bean
@Bean(JOB_NAME + "_orderStatisticsStep")
@JobScope
public Step orderStatisticsStep(@Value("#{jobParameters[date]}") String date) throws Exception {
return this.stepBuilderFactory.get("orderStatisticsStep")
return this.stepBuilderFactory.get(JOB_NAME + "_orderStatisticsStep")
.<OrderStatistics, OrderStatistics>chunk(CHUNKSIZE)
.reader(orderStatisticsItemReader(date))
.writer(orderStatisticsItemWriter(date))
Expand All @@ -90,7 +91,7 @@ private ItemWriter<? super OrderStatistics> orderStatisticsItemWriter(String dat
.resource(new FileSystemResource("output/" + filename))
.lineAggregator(lineAggregator)
.headerCallback(writer -> writer.write("total_amount, date"))
.name("orderStatisticsItemWriter")
.name(JOB_NAME + "_orderStatisticsItemWriter")
.build();

itemWriter.afterPropertiesSet();
Expand All @@ -116,7 +117,7 @@ private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String d
.date(LocalDate.parse(rs.getString(2), DateTimeFormatter.ISO_DATE))
.build())
.pageSize(CHUNKSIZE)
.name("orderStatisticsItemReader")
.name(JOB_NAME + "_orderStatisticsItemReader")
.selectClause("sum(amount), created_date")
.fromClause("orders")
.whereClause("created_date >= :startDate and created_date <= :endDate")
Expand All @@ -130,16 +131,16 @@ private ItemReader<? extends OrderStatistics> orderStatisticsItemReader(String d
return itemReader;
}

@Bean
@Bean(JOB_NAME + "_saveUserStep")
public Step saveUserStep() {
return this.stepBuilderFactory.get("saveUserStep")
return this.stepBuilderFactory.get(JOB_NAME + "_saveUserStep")
.tasklet(new SaveUserTasklet(userRepository))
.build();
}

@Bean
@Bean(JOB_NAME + "_userLevelUpStep")
public Step userLevelUpStep() throws Exception {
return stepBuilderFactory.get("userLevelUpStep")
return stepBuilderFactory.get(JOB_NAME + "_userLevelUpStep")
.<User, User>chunk(CHUNKSIZE)
.reader(itemReader())
.processor(itemProcessor())
Expand Down Expand Up @@ -167,7 +168,7 @@ private ItemReader<? extends User> itemReader() throws Exception {
.queryString("select u from User u")
.entityManagerFactory(entityManagerFactory)
.pageSize(CHUNKSIZE)
.name("userItemReader")
.name(JOB_NAME + "_userItemReader")
.build();

itemReader.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@

public interface UserRepository extends JpaRepository<User, Long> {

List<User> findAllByUpdatedDate(LocalDate now);
Collection<User> findAllByUpdatedDate(LocalDate now);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepE

String key = jobExecution.getJobParameters().getString(this.key);

System.out.println("---------------------key" + key);
if (!StringUtils.hasText(key)){
return FlowExecutionStatus.COMPLETED;
}
Expand Down

0 comments on commit accf454

Please sign in to comment.