diff --git a/subprojects/spring-batch-redis-core/src/main/java/com/redis/spring/batch/reader/ScanSizeEstimator.java b/subprojects/spring-batch-redis-core/src/main/java/com/redis/spring/batch/reader/ScanSizeEstimator.java index 55838682..27d84db1 100644 --- a/subprojects/spring-batch-redis-core/src/main/java/com/redis/spring/batch/reader/ScanSizeEstimator.java +++ b/subprojects/spring-batch-redis-core/src/main/java/com/redis/spring/batch/reader/ScanSizeEstimator.java @@ -87,7 +87,7 @@ public long getAsLong() { List> typeFutures = keys.stream().map(commands::type).collect(Collectors.toList()); connection.flushCommands(); List types = OperationExecutor.getAll(connection.getTimeout(), typeFutures); - Predicate matchPredicate = BatchUtils.globPredicate(keyPattern); + Predicate matchPredicate = matchPredicate(); Predicate typePredicate = typePredicate(); int total = 0; int matchCount = 0; @@ -117,6 +117,13 @@ public long getAsLong() { return UNKNOWN_SIZE; } + private Predicate matchPredicate() { + if (StringUtils.hasLength(keyPattern)) { + return BatchUtils.globPredicate(keyPattern); + } + return s -> true; + } + private Predicate typePredicate() { if (StringUtils.hasLength(keyType)) { return keyType::equalsIgnoreCase; diff --git a/subprojects/spring-batch-redis-infrastructure/src/main/java/com/redis/spring/batch/common/JobFactory.java b/subprojects/spring-batch-redis-infrastructure/src/main/java/com/redis/spring/batch/common/JobFactory.java index 5c63da9f..b9945f5f 100644 --- a/subprojects/spring-batch-redis-infrastructure/src/main/java/com/redis/spring/batch/common/JobFactory.java +++ b/subprojects/spring-batch-redis-infrastructure/src/main/java/com/redis/spring/batch/common/JobFactory.java @@ -1,5 +1,7 @@ package com.redis.spring.batch.common; +import java.util.Optional; + import org.hsqldb.jdbc.JDBCDataSource; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; @@ -33,6 +35,8 @@ public class JobFactory implements JobLauncher, InitializingBean { + private static final String STEP_ERROR = "Error executing step %s: %s"; + private JobRepository jobRepository; private PlatformTransactionManager platformTransactionManager; private JobLauncher jobLauncher; @@ -117,26 +121,33 @@ public JobExecution runAsync(Job job, JobParameters jobParameters) throws JobExe } public static JobExecution checkJobExecution(JobExecution jobExecution) throws JobExecutionException { - if (jobExecution.getExitStatus().getExitCode().equals(ExitStatus.FAILED.getExitCode())) { - for (StepExecution stepExecution : jobExecution.getStepExecutions()) { - ExitStatus exitStatus = stepExecution.getExitStatus(); - if (exitStatus.getExitCode().equals(ExitStatus.FAILED.getExitCode())) { - String message = String.format("Error executing step %s in job %s: %s", stepExecution.getStepName(), - jobExecution.getJobInstance().getJobName(), exitStatus.getExitDescription()); - if (stepExecution.getFailureExceptions().isEmpty()) { - throw new JobExecutionException(message); - } - throw new JobExecutionException(message, stepExecution.getFailureExceptions().get(0)); - } + if (isFailed(jobExecution.getExitStatus())) { + Optional stepExecutionException = jobExecution.getStepExecutions().stream() + .filter(e -> isFailed(e.getExitStatus())).map(JobFactory::stepExecutionException).findAny(); + if (stepExecutionException.isPresent()) { + throw stepExecutionException.get(); } if (jobExecution.getAllFailureExceptions().isEmpty()) { throw new JobExecutionException(String.format("Error executing job %s: %s", - jobExecution.getJobInstance().getJobName(), jobExecution.getExitStatus().getExitDescription())); + jobExecution.getJobInstance(), jobExecution.getExitStatus().getExitDescription())); } } return jobExecution; } + private static JobExecutionException stepExecutionException(StepExecution stepExecution) { + String message = String.format(STEP_ERROR, stepExecution.getStepName(), + stepExecution.getExitStatus().getExitDescription()); + if (stepExecution.getFailureExceptions().isEmpty()) { + return new JobExecutionException(message); + } + return new JobExecutionException(message, stepExecution.getFailureExceptions().get(0)); + } + + private static boolean isFailed(ExitStatus exitStatus) { + return exitStatus.getExitCode().equals(ExitStatus.FAILED.getExitCode()); + } + public static ThreadPoolTaskExecutor threadPoolTaskExecutor(int threads) { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(threads); diff --git a/subprojects/spring-batch-redis-infrastructure/src/test/java/com/redis/spring/batch/common/JobFactoryTests.java b/subprojects/spring-batch-redis-infrastructure/src/test/java/com/redis/spring/batch/common/JobFactoryTests.java index 1a0c9216..2931383b 100644 --- a/subprojects/spring-batch-redis-infrastructure/src/test/java/com/redis/spring/batch/common/JobFactoryTests.java +++ b/subprojects/spring-batch-redis-infrastructure/src/test/java/com/redis/spring/batch/common/JobFactoryTests.java @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobExecutionException; import org.springframework.batch.core.JobInstance; @@ -22,7 +23,9 @@ class JobFactoryTests { @Test void testCheckExecution() throws JobExecutionException { JobExecution jobExecution = new JobExecution(10L); - Assertions.assertEquals(10L, JobFactory.checkJobExecution(jobExecution).getId()); + jobExecution.setExitStatus(ExitStatus.FAILED); + Assertions.assertThrows(JobExecutionException.class, () -> JobFactory.checkJobExecution(jobExecution)); + Assertions.assertEquals(10L, jobExecution.getId()); } private class SimpleJobRepository implements JobRepository { diff --git a/subprojects/spring-batch-redis-util/src/main/java/com/redis/spring/batch/util/BatchUtils.java b/subprojects/spring-batch-redis-util/src/main/java/com/redis/spring/batch/util/BatchUtils.java index 2880f2ec..5a6196ac 100644 --- a/subprojects/spring-batch-redis-util/src/main/java/com/redis/spring/batch/util/BatchUtils.java +++ b/subprojects/spring-batch-redis-util/src/main/java/com/redis/spring/batch/util/BatchUtils.java @@ -9,7 +9,6 @@ import java.util.function.Supplier; import org.springframework.util.FileCopyUtils; -import org.springframework.util.StringUtils; import com.hrakaroo.glob.GlobPattern; import com.redis.lettucemod.RedisModulesClient; @@ -54,11 +53,6 @@ public static Function toStringValueFunction(RedisCodec cod return encode.andThen(StringCodec.UTF8::decodeValue); } - public static Function byteArrayKeyFunction(RedisCodec codec) { - Function encode = ByteArrayCodec.INSTANCE::encodeKey; - return encode.andThen(codec::decodeKey); - } - public static Function toByteArrayKeyFunction(RedisCodec codec) { Function encode = codec::encodeKey; return encode.andThen(ByteArrayCodec.INSTANCE::decodeKey); @@ -90,9 +84,6 @@ public static StatefulRedisModulesConnection connection(RedisModule } public static Predicate globPredicate(String match) { - if (!StringUtils.hasLength(match)) { - return s -> true; - } return GlobPattern.compile(match)::matches; } }