Skip to content

Commit

Permalink
Fixes a deadlock in client listeners
Browse files Browse the repository at this point in the history
After a straight forward fix made to offload listener call
to Executor, I came accross more problem in the design.
Since most of remote requests are blocking if client is not
connected to remote(if client has connected to node with saying you
are my owner), the operation that trying to connect to cluster and
others should be in seperate executor pool. We have two executor pools
one is for internal operations and other for alien code to hazelcast
like listeners and CompletableFuture.andThen calls. Since these
two are doing a remote call that can potentially block on waiting owner
address to be determined, cluster thread is moved to singleThreadExecutor
to its own. And more cleanup done to differentiate alien and internal
executor usage.

fixes hazelcast#6168
  • Loading branch information
sancar committed Sep 15, 2015
1 parent 0454e62 commit 4196958
Show file tree
Hide file tree
Showing 16 changed files with 226 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ public void shutdown() {

public void doShutdown() {
proxyManager.destroy();
clusterService.shutdown();
executionService.shutdown();
partitionService.stop();
transactionManager.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.hazelcast.client.nearcache;

import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.client.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.NearCacheConfig;
Expand Down Expand Up @@ -119,8 +119,8 @@ public void put(K key, Object object) {
private void fireEvictCache() {
if (canEvict.compareAndSet(true, false)) {
try {
final ClientExecutionService executionService = context.getExecutionService();
executionService.execute(new Runnable() {
final ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) context.getExecutionService();
executionService.executeInternal(new Runnable() {
public void run() {
try {
TreeSet<NearCacheRecord> records = new TreeSet<NearCacheRecord>(selectedComparator);
Expand All @@ -138,7 +138,7 @@ public void run() {
}
if (cache.size() >= maxSize && canEvict.compareAndSet(true, false)) {
try {
executionService.execute(this);
executionService.executeInternal(this);
} catch (RejectedExecutionException e) {
canEvict.set(true);
}
Expand All @@ -160,7 +160,8 @@ private void fireTtlCleanup() {

if (canCleanUp.compareAndSet(true, false)) {
try {
context.getExecutionService().execute(new Runnable() {
ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) context.getExecutionService();
executionService.executeInternal(new Runnable() {
public void run() {
try {
lastCleanup = Clock.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
*/
public interface ClientExecutionService {

/**
* Execute alien(user code) on execution service
*
* @param command to run
*/
void execute(Runnable command);

ICompletableFuture<?> submit(Runnable task);
Expand All @@ -40,6 +45,9 @@ public interface ClientExecutionService {

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit);

/**
* @return executorService that alien(user code) runs on
*/
ExecutorService getAsyncExecutor();

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

public <T> ICompletableFuture<T> submitInternal(Runnable runnable) {
CompletableFutureTask futureTask = new CompletableFutureTask(runnable, null, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

Expand All @@ -93,22 +93,22 @@ public void execute(Runnable command) {
@Override
public ICompletableFuture<?> submit(Runnable task) {
CompletableFutureTask futureTask = new CompletableFutureTask(task, null, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

@Override
public <T> ICompletableFuture<T> submit(Callable<T> task) {
CompletableFutureTask<T> futureTask = new CompletableFutureTask<T>(task, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

@Override
public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
return scheduledExecutor.schedule(new Runnable() {
public void run() {
execute(command);
executeInternal(command);
}
}, delay, unit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private boolean handleRetry() {

try {
sleep();
executionService.execute(this);
((ClientExecutionServiceImpl) executionService).executeInternal(this);
} catch (RejectedExecutionException e) {
if (LOGGER.isFinestEnabled()) {
LOGGER.finest("Retry could not be scheduled ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ public void connectionRemoved(Connection connection) {
public void cleanConnectionResources(ClientConnection connection) {
if (connectionManager.isAlive()) {
try {
executionService.execute(new CleanResourcesTask(connection));
((ClientExecutionServiceImpl) executionService).executeInternal(new CleanResourcesTask(connection));
} catch (RejectedExecutionException e) {
logger.warning("Execution rejected ", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCodec;
import com.hazelcast.client.impl.protocol.codec.ClientAuthenticationCustomCodec;
import com.hazelcast.client.spi.ClientClusterService;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.Member;
import com.hazelcast.logging.ILogger;
Expand All @@ -42,6 +43,7 @@
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.util.Clock;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.executor.PoolExecutorThreadFactory;

import java.io.IOException;
import java.net.InetSocketAddress;
Expand All @@ -51,17 +53,22 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

public abstract class ClusterListenerSupport implements ConnectionListener, ConnectionHeartbeatListener,
ClientClusterService {

private static final ILogger LOGGER = Logger.getLogger(ClusterListenerSupport.class);
private static final long TERMINATE_TIMEOUT_SECONDS = 30;

protected final HazelcastClientInstanceImpl client;
private final Collection<AddressProvider> addressProviders;
private final ManagerAuthenticator managerAuthenticator = new ManagerAuthenticator();
private final ExecutorService clusterExecutor;
private final boolean shuffleMemberList;

private Credentials credentials;
Expand All @@ -74,7 +81,16 @@ public abstract class ClusterListenerSupport implements ConnectionListener, Conn
public ClusterListenerSupport(HazelcastClientInstanceImpl client, Collection<AddressProvider> addressProviders) {
this.client = client;
this.addressProviders = addressProviders;
shuffleMemberList = client.getClientProperties().getShuffleMemberList().getBoolean();
this.shuffleMemberList = client.getClientProperties().getShuffleMemberList().getBoolean();
this.clusterExecutor = createSingleThreadExecutorService(client);
}

private ExecutorService createSingleThreadExecutorService(HazelcastClientInstanceImpl client) {
ThreadGroup threadGroup = client.getThreadGroup();
ClassLoader classLoader = client.getClientConfig().getClassLoader();
PoolExecutorThreadFactory threadFactory =
new PoolExecutorThreadFactory(threadGroup, client.getName() + ".cluster-", classLoader);
return Executors.newSingleThreadExecutor(threadFactory);
}

protected void init() {
Expand All @@ -90,6 +106,19 @@ public Address getOwnerConnectionAddress() {
return ownerConnectionAddress;
}

public void shutdown() {
clusterExecutor.shutdown();
try {
boolean success = clusterExecutor.awaitTermination(TERMINATE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!success) {
LOGGER.warning("ClientClusterService shutdown could not completed in "
+ TERMINATE_TIMEOUT_SECONDS + " seconds");
}
} catch (InterruptedException e) {
LOGGER.warning("ClientClusterService shutdown is interrupted", e);
}
}

private class ManagerAuthenticator implements Authenticator {

@Override
Expand Down Expand Up @@ -223,9 +252,15 @@ private boolean connect(Set<InetSocketAddress> triedAddresses) throws Exception
return false;
}

private void fireConnectionEvent(LifecycleEvent.LifecycleState state) {
final LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl) client.getLifecycleService();
lifecycleService.fireLifecycleEvent(state);
private void fireConnectionEvent(final LifecycleEvent.LifecycleState state) {
ClientExecutionService executionService = client.getClientExecutionService();
executionService.execute(new Runnable() {
@Override
public void run() {
final LifecycleServiceImpl lifecycleService = (LifecycleServiceImpl) client.getLifecycleService();
lifecycleService.fireLifecycleEvent(state);
}
});
}

@Override
Expand All @@ -235,10 +270,9 @@ public void connectionAdded(Connection connection) {

@Override
public void connectionRemoved(Connection connection) {
ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) client.getClientExecutionService();
if (connection.getEndPoint().equals(ownerConnectionAddress)) {
if (client.getLifecycleService().isRunning()) {
executionService.executeInternal(new Runnable() {
clusterExecutor.execute(new Runnable() {
@Override
public void run() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,60 @@ public void entryAdded(EntryEvent<Object, Object> event) {
map1.put(i, i);
}

assertOpenEventually("dadas", latch);
assertOpenEventually(latch);
}

@Test
public void testDeadlock_WhenDoingOperationFromLifecycleListener() {
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
final ClientConfig clientConfig = new ClientConfig();
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig.setExecutorPoolSize(1));

hazelcastFactory.newHazelcastInstance();
final CountDownLatch latch = new CountDownLatch(1);
final IMap<Object, Object> map = client.getMap(randomMapName());

client.getLifecycleService().addLifecycleListener(new LifecycleListener() {
@Override
public void stateChanged(LifecycleEvent event) {
if (event.getState() == LifecycleState.CLIENT_DISCONNECTED) {
map.get(1);
latch.countDown();
}
}
});

instance.shutdown();
assertOpenEventually(latch);
}

@Test
public void testDeadlock_WhenDoingOperationFromLifecycleListener_and_NearCache() {
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
final ClientConfig clientConfig = new ClientConfig();
final NearCacheConfig nearCacheConfig = new NearCacheConfig();
nearCacheConfig.setMaxSize(1);
clientConfig.addNearCacheConfig(nearCacheConfig);
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig.setExecutorPoolSize(1));

hazelcastFactory.newHazelcastInstance();
final CountDownLatch latch = new CountDownLatch(1);
final IMap<Object, Object> map = client.getMap(randomMapName());

client.getLifecycleService().addLifecycleListener(new LifecycleListener() {
@Override
public void stateChanged(LifecycleEvent event) {
if (event.getState() == LifecycleState.CLIENT_DISCONNECTED) {
map.get(1);
map.get(2);
latch.countDown();
}
}
});

instance.shutdown();
assertOpenEventually(latch);
}

@Test(expected = ExecutionException.class, timeout = 120000)
public void testGithubIssue3557()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ public void shutdown() {

public void doShutdown() {
proxyManager.destroy();
clusterService.shutdown();
executionService.shutdown();
partitionService.stop();
transactionManager.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.hazelcast.client.nearcache;

import com.hazelcast.client.spi.ClientContext;
import com.hazelcast.client.spi.ClientExecutionService;
import com.hazelcast.client.spi.impl.ClientExecutionServiceImpl;
import com.hazelcast.config.EvictionPolicy;
import com.hazelcast.config.InMemoryFormat;
import com.hazelcast.config.NearCacheConfig;
Expand Down Expand Up @@ -119,8 +119,8 @@ public void put(K key, Object object) {
private void fireEvictCache() {
if (canEvict.compareAndSet(true, false)) {
try {
final ClientExecutionService executionService = context.getExecutionService();
executionService.execute(new Runnable() {
final ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) context.getExecutionService();
executionService.executeInternal(new Runnable() {
public void run() {
try {
TreeSet<NearCacheRecord> records = new TreeSet<NearCacheRecord>(selectedComparator);
Expand All @@ -138,7 +138,7 @@ public void run() {
}
if (cache.size() >= maxSize && canEvict.compareAndSet(true, false)) {
try {
executionService.execute(this);
executionService.executeInternal(this);
} catch (RejectedExecutionException e) {
canEvict.set(true);
}
Expand All @@ -160,7 +160,8 @@ private void fireTtlCleanup() {

if (canCleanUp.compareAndSet(true, false)) {
try {
context.getExecutionService().execute(new Runnable() {
ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) context.getExecutionService();
executionService.executeInternal(new Runnable() {
public void run() {
try {
lastCleanup = Clock.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
*/
public interface ClientExecutionService {

/**
* Execute alien(user code) on execution service
*
* @param command to run
*/
void execute(Runnable command);

ICompletableFuture<?> submit(Runnable task);
Expand All @@ -40,6 +45,9 @@ public interface ClientExecutionService {

ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit);

/**
* @return executorService that alien(user code) runs on
*/
ExecutorService getAsyncExecutor();

}
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {

public <T> ICompletableFuture<T> submitInternal(Runnable runnable) {
CompletableFutureTask futureTask = new CompletableFutureTask(runnable, null, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

Expand All @@ -93,22 +93,22 @@ public void execute(Runnable command) {
@Override
public ICompletableFuture<?> submit(Runnable task) {
CompletableFutureTask futureTask = new CompletableFutureTask(task, null, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

@Override
public <T> ICompletableFuture<T> submit(Callable<T> task) {
CompletableFutureTask<T> futureTask = new CompletableFutureTask<T>(task, getAsyncExecutor());
executor.submit(futureTask);
internalExecutor.submit(futureTask);
return futureTask;
}

@Override
public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) {
return scheduledExecutor.schedule(new Runnable() {
public void run() {
execute(command);
executeInternal(command);
}
}, delay, unit);
}
Expand Down
Loading

0 comments on commit 4196958

Please sign in to comment.