Skip to content

Commit

Permalink
Follow up to fix for non blocking Authentication in clients
Browse files Browse the repository at this point in the history
Original fix was hazelcast#7673 for master, hazelcast#7674 for maintenance-3.x

Recent test failures revealed that some corner cases for that
implementation was missing. One obvious case is if getOrConnect
don't trigger a ConnectTask its Callback is not registered. And
it waits on that callback which will never be called.

fixes hazelcast#7718
fixes hazelcast#7709
fixes hazelcast#7963
  • Loading branch information
sancar committed Mar 11, 2016
1 parent d8532ea commit 784c4fb
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -79,18 +78,6 @@

public class ClientConnectionManagerImpl implements ClientConnectionManager {

private static final AuthenticationCallback dummyAuthCallback = new AuthenticationCallback() {
@Override
public void onSuccess(Connection connection) {

}

@Override
public void onFailure(Throwable exception) {

}
};

private final NonBlockingIOThreadOutOfMemoryHandler OUT_OF_MEMORY_HANDLER = new NonBlockingIOThreadOutOfMemoryHandler() {
@Override
public void handle(OutOfMemoryError error) {
Expand All @@ -114,8 +101,8 @@ public void handle(OutOfMemoryError error) {
private final AddressTranslator addressTranslator;
private final ConcurrentMap<Address, ClientConnection> connections
= new ConcurrentHashMap<Address, ClientConnection>();
private final Set<Address> connectionsInProgress =
Collections.newSetFromMap(new ConcurrentHashMap<Address, Boolean>());
private final ConcurrentMap<Address, AuthenticationCallback> connectionsInProgress =
new ConcurrentHashMap<Address, AuthenticationCallback>();
private final Set<ConnectionListener> connectionListeners = new CopyOnWriteArraySet<ConnectionListener>();

private final Set<ConnectionHeartbeatListener> heartbeatListeners =
Expand Down Expand Up @@ -225,31 +212,36 @@ public ClientConnection getConnection(Address target) {

@Override
public Connection getOrConnect(Address address, boolean asOwner) throws IOException {
BlockingCallback blockingCallback = new BlockingCallback();
Connection connection = getOrTriggerConnectInternal(address, asOwner, blockingCallback);
if (connection != null) {
return connection;
}
try {
return blockingCallback.get();
while (true) {
Connection connection = getConnection(address, asOwner);
if (connection != null) {
return connection;
}
AuthenticationCallback firstCallback = triggerConnect(address, asOwner);
connection = firstCallback.get();
if (firstCallback.authenticatedAsOwner) {
return connection;
}
}
} catch (Throwable e) {
throw ExceptionUtil.rethrow(e);
}
}

private static class BlockingCallback implements AuthenticationCallback {
private static class AuthenticationCallback {

private final CountDownLatch countDownLatch = new CountDownLatch(1);
private Connection connection;
private Throwable throwable;
private boolean authenticatedAsOwner;

@Override
public void onSuccess(Connection connection) {
public void onSuccess(Connection connection, boolean asOwner) {
this.connection = connection;
this.authenticatedAsOwner = asOwner;
countDownLatch.countDown();
}

@Override
public void onFailure(Throwable throwable) {
this.throwable = throwable;
countDownLatch.countDown();
Expand All @@ -264,19 +256,17 @@ Connection get() throws Throwable {
}
}

interface AuthenticationCallback {
void onSuccess(Connection connection);

void onFailure(Throwable exception);
}

@Override
public Connection getOrTriggerConnect(Address target, boolean asOwner) {
return getOrTriggerConnectInternal(target, asOwner, dummyAuthCallback);
Connection connection = getConnection(target, asOwner);
if (connection != null) {
return connection;
}
triggerConnect(target, asOwner);
return null;
}

private Connection getOrTriggerConnectInternal(Address target, boolean asOwner,
AuthenticationCallback callback) {
private Connection getConnection(Address target, boolean asOwner) {
target = addressTranslator.translate(target);

if (target == null) {
Expand All @@ -286,20 +276,25 @@ private Connection getOrTriggerConnectInternal(Address target, boolean asOwner,
ClientConnection connection = connections.get(target);

if (connection != null) {
if (asOwner && connection.isAuthenticatedAsOwner()) {
if (!asOwner) {
return connection;
}
if (!asOwner) {
if (connection.isAuthenticatedAsOwner()) {
return connection;
}
}
return null;
}

if (connectionsInProgress.add(target)) {
private AuthenticationCallback triggerConnect(Address target, boolean asOwner) {
AuthenticationCallback callback = new AuthenticationCallback();
AuthenticationCallback firstCallback = connectionsInProgress.putIfAbsent(target, callback);
if (firstCallback == null) {
ClientExecutionServiceImpl executionService = (ClientExecutionServiceImpl) client.getClientExecutionService();
executionService.executeInternal(new InitConnectionTask(target, asOwner, callback));
return callback;
}

return null;
return firstCallback;
}

private void fireConnectionAddedEvent(ClientConnection connection) {
Expand Down Expand Up @@ -479,7 +474,7 @@ public void onResponse(ClientMessage response) {
clusterService.setPrincipal(new ClientPrincipal(result.uuid, result.ownerUuid));
}
authenticated(target, connection);
callback.onSuccess(connection);
callback.onSuccess(connection, asOwner);
break;
case CREDENTIALS_FAILED:
AuthenticationException e = new AuthenticationException("Invalid credentials!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.hazelcast.transaction.TransactionOptions;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand All @@ -50,7 +49,6 @@
@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(HazelcastParametersRunnerFactory.class)
@Category({QuickTest.class, ParallelTest.class})
@Ignore //https://github.com/hazelcast/hazelcast/issues/7693
public class ClientTransactionalMapQuorumTest extends HazelcastTestSupport {

static PartitionedCluster cluster;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.hazelcast.transaction.TransactionContext;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -70,7 +69,6 @@ public void tearDown() {
}

@Test
@Ignore //https://github.com/hazelcast/hazelcast/issues/7709
public void testCommitConcurrently() throws InterruptedException, XAException {
int count = 10000;
String name = randomString();
Expand Down

0 comments on commit 784c4fb

Please sign in to comment.