Skip to content

Commit

Permalink
refactor: Removed parameter from AbstractCommand.execute()
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Aug 24, 2024
1 parent 1726f54 commit 73ff68d
Show file tree
Hide file tree
Showing 52 changed files with 899 additions and 1,328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import picocli.CommandLine.Option;

@Command
public abstract class AbstractCommand<C extends ExecutionContext> extends BaseCommand implements Callable<Integer> {
public abstract class AbstractCommand extends BaseCommand implements Callable<Integer> {

@Option(names = "--help", usageHelp = true, description = "Show this help message and exit.")
private boolean helpRequested;
Expand All @@ -29,14 +29,11 @@ public Integer call() throws Exception {
setupLogging();
log = LoggerFactory.getLogger(getClass());
}
try (C context = executionContext()) {
context.afterPropertiesSet();
execute(context);
}
execute();
return 0;
}

protected abstract C executionContext();
protected abstract void execute() throws Exception;

private void setupLogging() {
Level level = logLevel();
Expand Down Expand Up @@ -73,8 +70,6 @@ private static void setBoolean(String property, boolean value) {
System.setProperty(property, String.valueOf(value));
}

protected abstract void execute(C context);

public LoggingArgs getLoggingArgs() {
return loggingArgs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,16 @@
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.core.step.skip.NeverSkipItemSkipPolicy;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
Expand All @@ -26,11 +31,14 @@
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.policy.AlwaysRetryPolicy;
import org.springframework.retry.policy.NeverRetryPolicy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.Assert;

import com.redis.spring.batch.JobUtils;
import com.redis.spring.batch.item.AbstractAsyncItemReader;
import com.redis.spring.batch.item.AbstractPollableItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;

Expand All @@ -39,7 +47,7 @@
import picocli.CommandLine.Option;

@Command
public abstract class AbstractJobCommand<C extends JobExecutionContext> extends AbstractCommand<C> {
public abstract class AbstractJobCommand extends AbstractCommand {

public static final String DEFAULT_JOB_REPOSITORY_NAME = "riot";

Expand All @@ -54,36 +62,38 @@ public abstract class AbstractJobCommand<C extends JobExecutionContext> extends
private PlatformTransactionManager transactionManager;
private JobLauncher jobLauncher;

@Override
protected C executionContext() {
C context = newExecutionContext();
context.setJobName(jobName());
context.setJobRepositoryName(jobRepositoryName);
context.setJobRepository(jobRepository);
context.setTransactionManager(transactionManager);
context.setJobLauncher(jobLauncher);
return context;
private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception {
TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.setTaskExecutor(new SyncTaskExecutor());
launcher.afterPropertiesSet();
return launcher;
}

protected abstract C newExecutionContext();
protected void configureAsyncReader(AbstractAsyncItemReader<?, ?> reader) {
reader.setJobRepository(jobRepository);
}

private String jobName() {
if (jobName == null) {
Assert.notNull(commandSpec, "Command spec not set");
return commandSpec.name();
}
return jobName;
private JobBuilder jobBuilder() {
return new JobBuilder(jobName, jobRepository);
}

@Override
protected void execute(C context) {
Job job = job(context);
JobExecution jobExecution;
try {
jobExecution = context.getJobLauncher().run(job, new JobParameters());
} catch (JobExecutionException e) {
throw new RiotException("Job failed", e);
protected void execute() throws Exception {
if (jobName == null) {
Assert.notNull(commandSpec, "Command spec not set");
jobName = commandSpec.name();
}
if (jobRepository == null) {
jobRepository = JobUtils.jobRepositoryFactoryBean(jobRepositoryName).getObject();
}
if (transactionManager == null) {
transactionManager = JobUtils.resourcelessTransactionManager();
}
if (jobLauncher == null) {
jobLauncher = taskExecutorJobLauncher();
}
JobExecution jobExecution = jobLauncher.run(job(), new JobParameters());
if (JobUtils.isFailed(jobExecution.getExitStatus())) {
for (StepExecution stepExecution : jobExecution.getStepExecutions()) {
if (JobUtils.isFailed(stepExecution.getExitStatus())) {
Expand All @@ -94,23 +104,23 @@ protected void execute(C context) {
}
}

private RiotException wrapException(List<Throwable> throwables) {
private JobExecutionException wrapException(List<Throwable> throwables) {
if (throwables.isEmpty()) {
return new RiotException("Job failed");
return new JobExecutionException("Job failed");
}
return new RiotException("Job failed", throwables.get(0));
return new JobExecutionException("Job failed", throwables.get(0));
}

protected Job job(C context, Step<?, ?>... steps) {
return job(context, Arrays.asList(steps));
protected Job job(Step<?, ?>... steps) {
return job(Arrays.asList(steps));
}

protected Job job(C context, Collection<Step<?, ?>> steps) {
protected Job job(Collection<Step<?, ?>> steps) {
Assert.notEmpty(steps, "At least one step must be specified");
Iterator<Step<?, ?>> iterator = steps.iterator();
SimpleJobBuilder job = context.jobBuilder().start(step(context, iterator.next()));
SimpleJobBuilder job = jobBuilder().start(step(iterator.next()));
while (iterator.hasNext()) {
job.next(step(context, iterator.next()));
job.next(step(iterator.next()));
}
return job.build();
}
Expand All @@ -119,41 +129,58 @@ protected boolean shouldShowProgress() {
return stepArgs.getProgressArgs().getStyle() != ProgressStyle.NONE;
}

protected abstract Job job(C context);
protected abstract Job job() throws Exception;

private <I, O> TaskletStep step(C context, Step<I, O> step) {
SimpleStepBuilder<I, O> builder = simpleStep(context, step);
private <I, O> TaskletStep step(Step<I, O> step) {
SimpleStepBuilder<I, O> builder = simpleStep(step);
if (stepArgs.getRetryPolicy() == RetryPolicy.NEVER && stepArgs.getSkipPolicy() == SkipPolicy.NEVER) {
return builder.build();
}
FaultTolerantStepBuilder<I, O> ftStep = JobUtils.faultTolerant(builder);
if (stepArgs.getSkipPolicy() == SkipPolicy.LIMIT) {
ftStep.skipLimit(stepArgs.getSkipLimit());
step.getSkip().forEach(ftStep::skip);
step.getNoSkip().forEach(ftStep::noSkip);
} else {
ftStep.skipPolicy(stepArgs.skipPolicy());
step.getSkip().forEach(ftStep::skip);
step.getNoSkip().forEach(ftStep::noSkip);
step.getRetry().forEach(ftStep::retry);
step.getNoRetry().forEach(ftStep::noRetry);
ftStep.retryLimit(stepArgs.getRetryLimit());
ftStep.retryPolicy(retryPolicy());
ftStep.skipLimit(stepArgs.getSkipLimit());
ftStep.skipPolicy(skipPolicy());
return ftStep.build();
}

private org.springframework.retry.RetryPolicy retryPolicy() {
switch (stepArgs.getRetryPolicy()) {
case ALWAYS:
return new AlwaysRetryPolicy();
case NEVER:
return new NeverRetryPolicy();
default:
return null;
}
if (stepArgs.getRetryPolicy() == RetryPolicy.LIMIT) {
ftStep.retryLimit(stepArgs.getRetryLimit());
step.getRetry().forEach(ftStep::retry);
step.getNoRetry().forEach(ftStep::noRetry);
} else {
ftStep.retryPolicy(stepArgs.retryPolicy());
}

private org.springframework.batch.core.step.skip.SkipPolicy skipPolicy() {
switch (stepArgs.getSkipPolicy()) {
case ALWAYS:
return new AlwaysSkipItemSkipPolicy();
case NEVER:
return new NeverSkipItemSkipPolicy();
default:
return null;
}
return ftStep.build();
}

@SuppressWarnings("removal")
private <I, O> SimpleStepBuilder<I, O> simpleStep(C context, Step<I, O> step) {
String stepName = context.getJobName() + "-" + step.getName();
private <I, O> SimpleStepBuilder<I, O> simpleStep(Step<I, O> step) {
String name = jobName + "-" + step.getName();
if (step.getReader() instanceof ItemStreamSupport) {
ItemStreamSupport support = (ItemStreamSupport) step.getReader();
Assert.notNull(support.getName(), "No name specified for reader in step " + stepName);
support.setName(stepName + "-" + support.getName());
Assert.notNull(support.getName(), "No name specified for reader in step " + name);
support.setName(name + "-" + support.getName());
}
log.info("Creating step {} with chunk size {}", stepName, stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = context.step(stepName, stepArgs.getChunkSize());
log.info("Creating step {} with chunk size {}", name, stepArgs.getChunkSize());
SimpleStepBuilder<I, O> builder = new StepBuilder(name, jobRepository).<I, O>chunk(stepArgs.getChunkSize(),
transactionManager);
builder.reader(reader(step));
builder.writer(writer(step));
builder.processor(step.getProcessor());
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public class PrintExceptionMessageHandler implements IExecutionExceptionHandler
public int handleExecutionException(Exception ex, CommandLine cmd, ParseResult parseResult) {

if (cmd.getCommand() instanceof AbstractCommand) {
if (((AbstractCommand<?>) cmd.getCommand()).getLoggingArgs().isStacktrace()) {
if (((AbstractCommand) cmd.getCommand()).getLoggingArgs().isStacktrace()) {
ex.printStackTrace(cmd.getErr());
}
}
Expand Down

This file was deleted.

Loading

0 comments on commit 73ff68d

Please sign in to comment.