Skip to content

Commit

Permalink
refactor: Removed superfluous AwaitTimeoutException
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Apr 27, 2024
1 parent e63ab8a commit b1347a1
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -34,7 +35,6 @@
import com.redis.spring.batch.reader.MemKeyValue;
import com.redis.spring.batch.reader.MemKeyValueRead;
import com.redis.spring.batch.util.Await;
import com.redis.spring.batch.util.AwaitTimeoutException;
import com.redis.spring.batch.util.BatchUtils;

import io.lettuce.core.AbstractRedisClient;
Expand Down Expand Up @@ -123,7 +123,7 @@ protected synchronized void doOpen() throws Exception {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw e;
} catch (AwaitTimeoutException e) {
} catch (TimeoutException e) {
List<Throwable> exceptions = jobExecution.getAllFailureExceptions();
if (!CollectionUtils.isEmpty(exceptions)) {
throw new JobExecutionException("Job execution unsuccessful", exceptions.get(0));
Expand Down Expand Up @@ -205,7 +205,7 @@ private StatefulRedisModulesConnection<K, V> connection() {
}

@Override
protected synchronized void doClose() throws InterruptedException {
protected synchronized void doClose() throws TimeoutException, InterruptedException {
if (!queue.isEmpty()) {
log.warn(String.format("%s queue still contains %,d elements", getName(), queue.size()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;

import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -135,7 +136,7 @@ void teardown() {
}

@BeforeEach
void flushAll() throws InterruptedException {
void flushAll() throws TimeoutException, InterruptedException {
redisCommands.flushall();
awaitUntilNoSubscribers();
}
Expand Down Expand Up @@ -212,10 +213,12 @@ protected void awaitUntilSubscribers() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ItemStreamException("Interrupted", e);
} catch (TimeoutException e) {
throw new ItemStreamException("Timeout while waiting for subscribers", e);
}
}

protected void awaitUntilNoSubscribers() throws InterruptedException {
protected void awaitUntilNoSubscribers() throws TimeoutException, InterruptedException {
awaitUntil(() -> redisCommands.pubsubNumpat() == 0);
}

Expand Down Expand Up @@ -289,19 +292,19 @@ public static AbstractRedisClient client(RedisServer server) {
return RedisModulesClient.create(server.getRedisURI());
}

public void awaitRunning(JobExecution jobExecution) throws InterruptedException {
public void awaitRunning(JobExecution jobExecution) throws TimeoutException, InterruptedException {
awaitUntil(jobExecution::isRunning);
}

public void awaitTermination(JobExecution jobExecution) throws InterruptedException {
public void awaitTermination(JobExecution jobExecution) throws TimeoutException, InterruptedException {
awaitUntilFalse(jobExecution::isRunning);
}

protected void awaitUntilFalse(BooleanSupplier evaluator) throws InterruptedException {
protected void awaitUntilFalse(BooleanSupplier evaluator) throws TimeoutException, InterruptedException {
awaitUntil(() -> !evaluator.getAsBoolean());
}

protected void awaitUntil(BooleanSupplier evaluator) throws InterruptedException {
protected void awaitUntil(BooleanSupplier evaluator) throws TimeoutException, InterruptedException {
Await.await().initialDelay(pollDelay).delay(awaitPollInterval).timeout(awaitTimeout).until(evaluator);
}

Expand All @@ -317,50 +320,50 @@ protected void generateAsync(TestInfo info, GeneratorItemReader reader) {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ItemStreamException("Data gen interrupted", e);
} catch (JobExecutionException e) {
} catch (Exception e) {
throw new ItemStreamException("Could not run data gen", e);
}
});
}

protected void generate(TestInfo info, GeneratorItemReader reader)
throws JobExecutionException, InterruptedException {
throws JobExecutionException, TimeoutException, InterruptedException {
generate(info, redisClient, reader);
}

protected void generate(TestInfo info, AbstractRedisClient client, GeneratorItemReader reader)
throws JobExecutionException, InterruptedException {
throws JobExecutionException, TimeoutException, InterruptedException {
TestInfo testInfo = testInfo(info, "generate");
RedisItemWriter<String, String, KeyValue<String, Object>> writer = RedisItemWriter.struct(StringCodec.UTF8);
writer.setClient(client);
run(testInfo, reader, writer);
}

protected void run(TestInfo info, GeneratorItemReader reader, ItemWriter<KeyValue<String, Object>> writer)
throws JobExecutionException, InterruptedException {
throws JobExecutionException, TimeoutException, InterruptedException {
run(info, reader, genItemProcessor, writer);
}

protected <T> JobExecution run(TestInfo info, ItemReader<? extends T> reader, ItemWriter<T> writer)
throws JobExecutionException, InterruptedException {
throws JobExecutionException, TimeoutException, InterruptedException {
return run(info, reader, null, writer);
}

protected <I, O> JobExecution run(TestInfo info, ItemReader<I> reader, ItemProcessor<I, O> processor,
ItemWriter<O> writer) throws JobExecutionException, InterruptedException {
ItemWriter<O> writer) throws JobExecutionException, TimeoutException, InterruptedException {
return run(info, step(info, reader, processor, writer));
}

protected <I, O> JobExecution run(TestInfo info, SimpleStepBuilder<I, O> step)
throws JobExecutionException, InterruptedException {
throws JobExecutionException, TimeoutException, InterruptedException {
return run(job(info).start(faultTolerant(step).build()).build());
}

protected <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> step) {
return step.faultTolerant().retryPolicy(new MaxAttemptsRetryPolicy()).retry(RedisCommandTimeoutException.class);
}

protected JobExecution run(Job job) throws JobExecutionException, InterruptedException {
protected JobExecution run(Job job) throws JobExecutionException, TimeoutException, InterruptedException {
JobExecution execution = jobFactory.run(job);
awaitUntilFalse(execution::isRunning);
return execution;
Expand All @@ -375,7 +378,8 @@ protected <I, S extends I, O> FlushingStepBuilder<S, O> flushingStep(TestInfo in
return new FlushingStepBuilder<>(step(info, reader, writer)).idleTimeout(idleTimeout);
}

protected void generateStreams(TestInfo info, int messageCount) throws JobExecutionException, InterruptedException {
protected void generateStreams(TestInfo info, int messageCount)
throws JobExecutionException, TimeoutException, InterruptedException {
GeneratorItemReader gen = generator(3, Item.Type.STREAM);
StreamOptions streamOptions = new StreamOptions();
streamOptions.setMessageCount(Range.of(messageCount));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ void readStruct(TestInfo info) throws Exception {
}

@Test
void readStreamAutoAck(TestInfo info) throws InterruptedException {
void readStreamAutoAck(TestInfo info) throws Exception {
String stream = "stream1";
String consumerGroup = "batchtests-readStreamAutoAck";
Consumer<String> consumer = Consumer.from(consumerGroup, "consumer1");
Expand Down Expand Up @@ -390,7 +390,7 @@ void readStreamManualAck(TestInfo info) throws Exception {
}

@Test
void readStreamManualAckRecover(TestInfo info) throws InterruptedException {
void readStreamManualAckRecover(TestInfo info) throws Exception {
String stream = "stream1";
Consumer<String> consumer = Consumer.from("batchtests-readStreamManualAckRecover", "consumer1");
final StreamItemReader<String, String> reader = streamReader(info, stream, consumer);
Expand Down Expand Up @@ -426,7 +426,7 @@ void readStreamManualAckRecover(TestInfo info) throws InterruptedException {
}

@Test
void readStreamManualAckRecoverUncommitted(TestInfo info) throws InterruptedException {
void readStreamManualAckRecoverUncommitted(TestInfo info) throws Exception {
String stream = "stream1";
String consumerGroup = "batchtests-readStreamManualAckRecoverUncommitted";
Consumer<String> consumer = Consumer.from(consumerGroup, "consumer1");
Expand Down Expand Up @@ -506,7 +506,7 @@ void readStreamManualAckRecoverFromOffset(TestInfo info) throws Exception {
}

@Test
void readStreamRecoverManualAckToAutoAck(TestInfo info) throws InterruptedException {
void readStreamRecoverManualAckToAutoAck(TestInfo info) throws Exception {
String stream = "stream1";
String consumerGroup = "readStreamRecoverManualAckToAutoAck";
Consumer<String> consumer = Consumer.from(consumerGroup, "consumer1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BooleanSupplier;

public class Await {
Expand Down Expand Up @@ -35,11 +36,11 @@ public Await timeout(Duration timeout) {
* Blocks until test is true
*
* @param test boolean supplier to wait for
* @throws InterruptedException if interrupted while waiting
* @throws AwaitTimeoutException if condition was not fulfilled within timeout
* duration
* @throws InterruptedException if interrupted while waiting
* @throws TimeoutException if condition not fulfilled within timeout
* duration
*/
public void until(BooleanSupplier test) throws InterruptedException {
public void until(BooleanSupplier test) throws TimeoutException, InterruptedException {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
try {
executor.scheduleWithFixedDelay(() -> {
Expand All @@ -49,14 +50,14 @@ public void until(BooleanSupplier test) throws InterruptedException {
}, initialDelay.toMillis(), delay.toMillis(), TimeUnit.MILLISECONDS);
boolean terminated = executor.awaitTermination(timeout.toMillis(), TimeUnit.MILLISECONDS);
if (!terminated) {
throw new AwaitTimeoutException(String.format("Condition not fulfilled within %s", timeout));
throw new TimeoutException(String.format("Condition not fulfilled within %s", timeout));
}
} finally {
executor.shutdown();
}
}

public void untilFalse(BooleanSupplier test) throws InterruptedException {
public void untilFalse(BooleanSupplier test) throws TimeoutException, InterruptedException {
until(() -> !test.getAsBoolean());
}

Expand Down

This file was deleted.

0 comments on commit b1347a1

Please sign in to comment.