Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Key affinity tests hanging #1077

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -221,27 +221,30 @@ public void handleCacheStopped(CacheStoppedEvent cse) {
}


public class KeyGeneratorWorker implements Runnable {
private class KeyGeneratorWorker implements Runnable {

private volatile boolean isActive;
private volatile boolean isStopped = false;
private volatile Thread runner;
private volatile boolean isStopped = false;

@Override
public void run() {
this.runner = Thread.currentThread();
try {
while (true) {
if (waitToBeWakenUp()) break;
isActive = true;
log.trace("KeyGeneratorWorker marked as ACTIVE");
generateKeys();

isActive = false;
log.trace("KeyGeneratorWorker marked as INACTIVE");
while (isStopped == false) {
keyProducerStartLatch.await();
if (isStopped == false) {
isActive = true;
log.trace("KeyGeneratorWorker marked as ACTIVE");
generateKeys();

isActive = false;
log.trace("KeyGeneratorWorker marked as INACTIVE");
}
}
} finally {
isStopped = true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
finally {
log.debugf("Shutting down KeyAffinity service for key set: %s", filter);
}
}

Expand All @@ -257,8 +260,7 @@ private void generateKeys() {
// in order to fill all the queues
int maxMisses = maxNumberOfKeys.get();
int missCount = 0;
while (!Thread.currentThread().isInterrupted() && existingKeyCount.get() < maxNumberOfKeys.get()
&& missCount < maxMisses) {
while (existingKeyCount.get() < maxNumberOfKeys.get() && missCount < maxMisses) {
K key = keyGenerator.getKey();
Address addressForKey = getAddressForKey(key);
if (interestedInAddress(addressForKey)) {
Expand All @@ -276,18 +278,6 @@ private void generateKeys() {
}
}

private boolean waitToBeWakenUp() {
try {
keyProducerStartLatch.await();
} catch (InterruptedException e) {
if (log.isDebugEnabled()) {
log.debugf("Shutting down KeyAffinity service for key set: %s", filter);
}
return true;
}
return false;
}

private boolean tryAddKey(Address address, K key) {
BlockingQueue<K> queue = address2key.get(address);
// on node stop the distribution manager might still return the dead server for a while after we have already removed its queue
Expand All @@ -304,7 +294,8 @@ public boolean isActive() {
}

public void stop() {
runner.interrupt();
isStopped = true;
keyProducerStartLatch.open();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import junit.framework.Assert;

import static junit.framework.Assert.assertEquals;

/**
Expand All @@ -49,7 +51,7 @@ public class BaseKeyAffinityServiceTest extends BaseDistFunctionalTest {

protected ThreadFactory threadFactory = new ThreadFactory() {
public Thread newThread(Runnable r) {
return new Thread(r, "KeyGeneratorThread," + this.getClass().getSimpleName());
return new Thread(r, "KeyGeneratorThread," + BaseKeyAffinityServiceTest.this.getClass().getSimpleName());
}
};
protected ExecutorService executor = Executors.newSingleThreadExecutor(threadFactory);
Expand All @@ -59,7 +61,11 @@ public Thread newThread(Runnable r) {
public void stopExecutorService() throws InterruptedException {
keyAffinityService.stop();
executor.shutdown();
assert executor.awaitTermination(100, TimeUnit.MILLISECONDS);
boolean terminatedGracefully = executor.awaitTermination(100, TimeUnit.MILLISECONDS);
if (!terminatedGracefully) {
executor.shutdownNow();
Assert.fail("KeyGenerator Executor not terminated in expected time");
}
}

protected void assertMapsToAddress(Object o, Address addr) {
Expand Down