Skip to content

Commit

Permalink
Create proxies of a client on cluster back when cluster restarted
Browse files Browse the repository at this point in the history
  • Loading branch information
sancar committed Mar 8, 2018
1 parent 9cbf79d commit 1de4c0e
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 3 deletions.
Expand Up @@ -455,6 +455,7 @@ public void onClusterConnect(Connection ownerConnection) throws Exception {
partitionService.listenPartitionTable(ownerConnection);
clusterService.listenMembershipEvents(ownerConnection);
userCodeDeploymentService.deploy(this, ownerConnection);
proxyManager.createDistributedObjectsOnCluster(ownerConnection);
}

public MetricsRegistryImpl getMetricsRegistry() {
Expand Down
Expand Up @@ -50,8 +50,8 @@
import com.hazelcast.client.proxy.ClientSetProxy;
import com.hazelcast.client.proxy.ClientTopicProxy;
import com.hazelcast.client.proxy.txn.xa.XAResourceProxy;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.AbstractClientInvocationService;
import com.hazelcast.client.spi.impl.ClientInvocation;
import com.hazelcast.client.spi.impl.ClientProxyFactoryWithContext;
import com.hazelcast.client.spi.impl.ClientServiceNotFoundException;
import com.hazelcast.client.spi.impl.ListenerMessageCodec;
Expand Down Expand Up @@ -81,6 +81,8 @@
import com.hazelcast.multimap.impl.MultiMapService;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.ClassLoaderUtil;
import com.hazelcast.nio.Connection;
import com.hazelcast.replicatedmap.ReplicatedMapCantBeCreatedOnLiteMemberException;
import com.hazelcast.replicatedmap.impl.ReplicatedMapService;
import com.hazelcast.ringbuffer.impl.RingbufferService;
import com.hazelcast.scheduledexecutor.impl.DistributedScheduledExecutorService;
Expand All @@ -89,7 +91,6 @@
import com.hazelcast.topic.impl.TopicService;
import com.hazelcast.topic.impl.reliable.ReliableTopicService;
import com.hazelcast.transaction.impl.xa.XAService;
import com.hazelcast.util.EmptyStatement;

import java.io.IOException;
import java.lang.reflect.Constructor;
Expand All @@ -101,6 +102,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

import static com.hazelcast.util.EmptyStatement.ignore;
import static com.hazelcast.util.ExceptionUtil.rethrow;
import static com.hazelcast.util.ServiceLoader.classIterator;

Expand Down Expand Up @@ -359,7 +361,7 @@ private void sleepForProxyInitRetry() {
try {
Thread.sleep(invocationRetryPauseMillis);
} catch (InterruptedException ignored) {
EmptyStatement.ignore(ignored);
ignore(ignored);
}
}

Expand Down Expand Up @@ -414,6 +416,34 @@ public String addDistributedObjectListener(final DistributedObjectListener liste
return client.getListenerService().registerListener(distributedObjectListenerCodec, eventHandler);
}

public void createDistributedObjectsOnCluster(Connection ownerConnection) throws IOException {
if (proxies.isEmpty()) {
return;
}
Address initializationTarget = findNextAddressToSendCreateRequest();
if (initializationTarget == null) {
throw new IOException("Not able to find a member to create proxy on!");
}
for (ClientProxyFuture proxyFuture : proxies.values()) {
ClientProxy clientProxy = proxyFuture.get();
try {
invokeCreateProxy(ownerConnection, clientProxy, initializationTarget);
} catch (ReplicatedMapCantBeCreatedOnLiteMemberException e) {
ignore(e);
}
}
}

private void invokeCreateProxy(Connection ownerConnection, ClientProxy clientProxy, Address initializationTarget) {
ClientMessage clientMessage = ClientCreateProxyCodec.encodeRequest(clientProxy.getDistributedObjectName(),
clientProxy.getServiceName(), initializationTarget);
try {
new ClientInvocation(client, clientMessage, clientProxy.getServiceName(), ownerConnection).invoke().get();
} catch (Exception e) {
throw rethrow(e);
}
}

private final class DistributedObjectEventHandler extends ClientAddDistributedObjectListenerCodec.AbstractEventHandler
implements EventHandler<ClientMessage> {

Expand Down
Expand Up @@ -114,4 +114,28 @@ public void testGetDistributedObjectsAfterRemove_fromClient() {
assertEquals(1, client1.getDistributedObjects().size());
assertEquals(1, client2.getDistributedObjects().size());
}

@Test
public void distributedObjectsCreatedBack_whenClusterRestart() {
final HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();

HazelcastInstance client = hazelcastFactory.newHazelcastClient();

client.getMap("test");

Collection<DistributedObject> distributedObjects = instance.getDistributedObjects();
assertEquals(1, distributedObjects.size());

instance.shutdown();

final HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance();

assertTrueEventually(new AssertTask() {
@Override
public void run() throws Exception {
Collection<DistributedObject> distributedObjects = instance2.getDistributedObjects();
assertEquals(1, distributedObjects.size());
}
});
}
}

0 comments on commit 1de4c0e

Please sign in to comment.