Skip to content

Commit

Permalink
Add new UTs and enhance existing UTs
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Nov 25, 2023
1 parent d9e39bc commit f7375fb
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class CloseableRetryableRefreshListener implements ReferenceMana
*/
private static final int TOTAL_PERMITS = 1;

private static final TimeValue DRAIN_TIMEOUT = TimeValue.timeValueMinutes(10);

private final AtomicBoolean closed = new AtomicBoolean(false);

private final Semaphore semaphore = new Semaphore(TOTAL_PERMITS);
Expand Down Expand Up @@ -187,7 +189,8 @@ private void scheduleRetry(boolean afterRefreshSuccessful, boolean didRefresh) {

public final Releasable drainRefreshes() {
try {
if (semaphore.tryAcquire(TOTAL_PERMITS, 10, TimeUnit.MINUTES)) {
TimeValue timeout = getDrainTimeout();
if (semaphore.tryAcquire(TOTAL_PERMITS, timeout.seconds(), TimeUnit.SECONDS)) {
boolean result = closed.compareAndSet(false, true);
assert result && semaphore.availablePermits() == 0;
getLogger().info("All permits are acquired and refresh listener is closed");
Expand All @@ -208,6 +211,14 @@ public final Releasable drainRefreshes() {

protected abstract Logger getLogger();

// Made available for unit testing purpose only
/**
* Returns the timeout which is used while draining refreshes.
*/
protected TimeValue getDrainTimeout() {
return DRAIN_TIMEOUT;
}

// Visible for testing
/**
* Returns if the retry is scheduled or not.
Expand All @@ -217,4 +228,14 @@ public final Releasable drainRefreshes() {
boolean getRetryScheduledStatus() {
return retryScheduled.get();
}

// Visible for testing
int availablePermits() {
return semaphore.availablePermits();
}

// Visible for testing
boolean isClosed() {
return closed.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
Expand Down Expand Up @@ -274,6 +275,7 @@ protected Logger getLogger() {
testRefreshListener.afterRefresh(randomBoolean());
testRefreshListener.drainRefreshes();
assertNotEquals(0, countDownLatch.getCount());
assertRefreshListenerClosed(testRefreshListener);
}

public void testCloseWaitsForAcquiringAllPermits() throws Exception {
Expand Down Expand Up @@ -308,6 +310,7 @@ protected Logger getLogger() {
thread.start();
assertBusy(() -> assertEquals(0, countDownLatch.getCount()));
testRefreshListener.drainRefreshes();
assertRefreshListenerClosed(testRefreshListener);
}

public void testScheduleRetryAfterClose() throws Exception {
Expand Down Expand Up @@ -368,6 +371,7 @@ protected TimeValue getNextRetryInterval() {
thread1.join();
thread2.join();
assertBusy(() -> assertEquals(1, runCount.get()));
assertRefreshListenerClosed(testRefreshListener);
}

public void testConcurrentScheduleRetry() throws Exception {
Expand Down Expand Up @@ -409,6 +413,7 @@ protected boolean isRetryEnabled() {
testRefreshListener.afterRefresh(true);
assertBusy(() -> assertEquals(3, runCount.get()));
testRefreshListener.drainRefreshes();
assertRefreshListenerClosed(testRefreshListener);
}

public void testExceptionDuringThreadPoolSchedule() throws Exception {
Expand Down Expand Up @@ -451,11 +456,135 @@ protected boolean isRetryEnabled() {
assertBusy(() -> assertFalse(testRefreshListener.getRetryScheduledStatus()));
assertEquals(1, runCount.get());
testRefreshListener.drainRefreshes();
assertRefreshListenerClosed(testRefreshListener);
}

public void testTimeoutDuringClose() throws Exception {
// This test checks the expected behaviour when the drainRefreshes times out.
CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
try {
Thread.sleep(TimeValue.timeValueSeconds(2).millis());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return true;
}

@Override
public void beforeRefresh() {}

@Override
protected Logger getLogger() {
return logger;
}

@Override
protected TimeValue getDrainTimeout() {
return TimeValue.timeValueSeconds(1);
}
};
Thread thread1 = new Thread(() -> {
try {
testRefreshListener.afterRefresh(true);
} catch (IOException e) {
throw new AssertionError(e);
}
});
thread1.start();
assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits()));
RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes);
assertEquals("Failed to acquire all permits", ex.getMessage());
thread1.join();
}

public void testThreadInterruptDuringClose() throws Exception {
// This test checks the expected behaviour when the thread performing the drainRefresh is interrupted.
CountDownLatch latch = new CountDownLatch(2);
CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
try {
Thread.sleep(TimeValue.timeValueSeconds(2).millis());
} catch (InterruptedException e) {
throw new AssertionError(e);
}
return true;
}

@Override
public void beforeRefresh() {}

@Override
protected Logger getLogger() {
return logger;
}

@Override
protected TimeValue getDrainTimeout() {
return TimeValue.timeValueSeconds(2);
}
};
Thread thread1 = new Thread(() -> {
try {
testRefreshListener.afterRefresh(true);
latch.countDown();
} catch (IOException e) {
throw new AssertionError(e);
}
});
Thread thread2 = new Thread(() -> {
RuntimeException ex = assertThrows(RuntimeException.class, testRefreshListener::drainRefreshes);
assertEquals("Failed to acquire all permits", ex.getMessage());
latch.countDown();
});
thread1.start();
assertBusy(() -> assertEquals(0, testRefreshListener.availablePermits()));
thread2.start();
thread2.interrupt();
thread1.join();
thread2.join();
assertEquals(0, latch.getCount());
}

public void testResumeRefreshesAfterDrainRefreshes() {
// This test checks the expected behaviour when the refresh listener is drained, but then refreshes are resumed again
// by closing the releasables acquired by calling the drainRefreshes method.
CloseableRetryableRefreshListener testRefreshListener = new CloseableRetryableRefreshListener(mock(ThreadPool.class)) {
@Override
protected boolean performAfterRefreshWithPermit(boolean didRefresh) {
return true;
}

@Override
public void beforeRefresh() {}

@Override
protected Logger getLogger() {
return logger;
}
};
assertRefreshListenerOpen(testRefreshListener);
Releasable releasable = testRefreshListener.drainRefreshes();
assertRefreshListenerClosed(testRefreshListener);
releasable.close();
assertRefreshListenerOpen(testRefreshListener);
}

@After
public void tearDown() throws Exception {
super.tearDown();
terminate(threadPool);
}

private void assertRefreshListenerClosed(CloseableRetryableRefreshListener testRefreshListener) {
assertTrue(testRefreshListener.isClosed());
assertEquals(0, testRefreshListener.availablePermits());
}

private void assertRefreshListenerOpen(CloseableRetryableRefreshListener testRefreshListener) {
assertFalse(testRefreshListener.isClosed());
assertEquals(1, testRefreshListener.availablePermits());
}
}

0 comments on commit f7375fb

Please sign in to comment.