Skip to content

Commit

Permalink
ISPN-9008 RpcManager.invokeCommandOnAll ignores cache member missing …
Browse files Browse the repository at this point in the history
…from cluster view
  • Loading branch information
danberindei authored and rvansa committed May 2, 2018
1 parent 4026bfc commit d1da51d
Show file tree
Hide file tree
Showing 14 changed files with 286 additions and 58 deletions.
@@ -0,0 +1,104 @@
package org.infinispan.commons.util;

import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;

/**
* Wrap a {@link HopscotchHashMap} and allow using it as a {@link Set}.
*
* @param <E> The element type
* @since 9.3
* @author Dan Berindei
*/
public class ImmutableHopscotchHashSet<E> implements Set<E> {
private static final Object PRESENT = new Object();

private final HopscotchHashMap<E, Object> map;

public ImmutableHopscotchHashSet(Collection<E> collection) {
map = new HopscotchHashMap<>(collection.size());
for (E e : collection) {
map.put(e, PRESENT);
}
}

@Override
public int size() {
return map.size();
}

@Override
public boolean isEmpty() {
return map.isEmpty();
}

@Override
public boolean contains(Object o) {
return map.containsKey(o);
}

@Override
public Iterator<E> iterator() {
return new Immutables.ImmutableIteratorWrapper<>(map.keySet().iterator());
}

@Override
public void forEach(Consumer<? super E> action) {
map.keySet().forEach(action);
}

@Override
public Object[] toArray() {
return map.keySet().toArray();
}

@Override
public <T> T[] toArray(T[] a) {
return map.keySet().toArray(a);
}

@Override
public boolean add(E e) {
throw new UnsupportedOperationException("add");
}

@Override
public boolean remove(Object o) {
throw new UnsupportedOperationException("remove");
}

@Override
public boolean containsAll(Collection<?> c) {
return map.keySet().containsAll(c);
}

@Override
public boolean addAll(Collection<? extends E> c) {
throw new UnsupportedOperationException("addAll");
}

@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("retainAll");
}

@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("removeAll");
}

@Override
public boolean removeIf(Predicate<? super E> filter) {
throw new UnsupportedOperationException("removeIf");
}

@Override
public void clear() {
throw new UnsupportedOperationException("clear");
}

// Use the default implementation of spliterator(), stream(), and parallelStream()
}
Expand Up @@ -18,6 +18,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.function.Consumer;

import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.marshall.Ids;
Expand Down Expand Up @@ -281,8 +282,7 @@ public interface Immutable {
* We have to re-implement Collections.unmodifiableXXX, since it is not
* simple to detect them (the class names are JDK dependent).
*/

private static class ImmutableIteratorWrapper<E> implements Iterator<E> {
public static class ImmutableIteratorWrapper<E> implements Iterator<E> {
private Iterator<? extends E> iterator;

public ImmutableIteratorWrapper(Iterator<? extends E> iterator) {
Expand All @@ -299,9 +299,11 @@ public E next() {
return iterator.next();
}

// Use the default remove() implementation

@Override
public void remove() {
throw new UnsupportedOperationException();
public void forEachRemaining(Consumer<? super E> action) {
iterator.forEachRemaining(action);
}
}

Expand Down
Expand Up @@ -9,6 +9,7 @@
import java.util.Set;

import org.infinispan.commons.hash.MurmurHash3;
import org.infinispan.commons.util.ImmutableHopscotchHashSet;
import org.infinispan.commons.util.ImmutableIntSet;
import org.infinispan.commons.util.Immutables;
import org.infinispan.commons.util.IntSet;
Expand All @@ -30,6 +31,7 @@
public class LocalizedCacheTopology extends CacheTopology {

private final Address localAddress;
private final Set<Address> membersSet;
private final KeyPartitioner keyPartitioner;
private final boolean isDistributed;
private final boolean allLocal;
Expand Down Expand Up @@ -57,6 +59,7 @@ public LocalizedCacheTopology(CacheMode cacheMode, CacheTopology cacheTopology,
ConsistentHash writeCH = getWriteConsistentHash();

this.localAddress = localAddress;
this.membersSet = new ImmutableHopscotchHashSet<>(cacheTopology.getMembers());
this.keyPartitioner = keyPartitioner;
this.isDistributed = cacheMode.isDistributed();
isScattered = cacheMode.isScattered();
Expand Down Expand Up @@ -210,4 +213,8 @@ public IntSet getLocalReadSegments() {
public Address getLocalAddress() {
return localAddress;
}

public Set<Address> getMembersSet() {
return membersSet;
}
}
23 changes: 12 additions & 11 deletions core/src/main/java/org/infinispan/remoting/rpc/RpcManagerImpl.java
Expand Up @@ -22,6 +22,7 @@
import org.infinispan.commons.configuration.attributes.AttributeListener;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
Expand All @@ -41,7 +42,6 @@
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.remoting.transport.impl.MapResponseCollector;
import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.TimeService;
import org.infinispan.util.concurrent.CompletableFutures;
Expand All @@ -68,7 +68,7 @@ public class RpcManagerImpl implements RpcManager, JmxStatisticsExposer {
@Inject private Transport t;
@Inject private Configuration configuration;
@Inject private CommandsFactory cf;
@Inject private StateTransferManager stateTransferManager;
@Inject private DistributionManager distributionManager;
@Inject private TimeService timeService;

private final Function<ReplicableCommand, ReplicableCommand> toCacheRpcCommand = this::toCacheRpcCommand;
Expand Down Expand Up @@ -112,7 +112,7 @@ private void updateRpcOptions(Attribute<Long> attribute, Long oldValue) {

@ManagedAttribute(description = "Retrieves the committed view.", displayName = "Committed view", dataType = DataType.TRAIT)
public String getCommittedViewAsString() {
CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
CacheTopology cacheTopology = distributionManager.getCacheTopology();
if (cacheTopology == null)
return "N/A";

Expand All @@ -121,7 +121,7 @@ public String getCommittedViewAsString() {

@ManagedAttribute(description = "Retrieves the pending view.", displayName = "Pending view", dataType = DataType.TRAIT)
public String getPendingViewAsString() {
CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
CacheTopology cacheTopology = distributionManager.getCacheTopology();
if (cacheTopology == null)
return "N/A";

Expand Down Expand Up @@ -196,17 +196,18 @@ private <T> T updateStatistics(long startTimeNanos, T response, Throwable throwa
public <T> CompletionStage<T> invokeCommandOnAll(ReplicableCommand command, ResponseCollector<T> collector,
RpcOptions rpcOptions) {
CacheRpcCommand cacheRpc = toCacheRpcCommand(command);
List<Address> cacheMembers = distributionManager.getCacheTopology().getMembers();

if (!statisticsEnabled) {
return t.invokeCommandOnAll(cacheRpc, collector, rpcOptions.deliverOrder(), rpcOptions.timeout(),
rpcOptions.timeUnit());
return t.invokeCommandOnAll(cacheMembers, cacheRpc, collector, rpcOptions.deliverOrder(),
rpcOptions.timeout(), rpcOptions.timeUnit());
}

long startTimeNanos = timeService.time();
CompletionStage<T> invocation;
try {
invocation = t.invokeCommandOnAll(cacheRpc, collector, rpcOptions.deliverOrder(), rpcOptions.timeout(),
rpcOptions.timeUnit());
invocation = t.invokeCommandOnAll(cacheMembers, cacheRpc, collector, rpcOptions.deliverOrder(),
rpcOptions.timeout(), rpcOptions.timeUnit());
} catch (Exception e) {
return errorReplicating(e);
}
Expand Down Expand Up @@ -410,7 +411,7 @@ private void setTopologyId(ReplicableCommand command) {
if (command instanceof TopologyAffectedCommand) {
TopologyAffectedCommand topologyAffectedCommand = (TopologyAffectedCommand) command;
if (topologyAffectedCommand.getTopologyId() == -1) {
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
int currentTopologyId = distributionManager.getCacheTopology().getTopologyId();
if (trace) {
log.tracef("Topology id missing on command %s, setting it to %d", command, currentTopologyId);
}
Expand Down Expand Up @@ -511,7 +512,7 @@ public Address getAddress() {

@Override
public int getTopologyId() {
CacheTopology cacheTopology = stateTransferManager.getCacheTopology();
CacheTopology cacheTopology = distributionManager.getCacheTopology();
return cacheTopology != null ? cacheTopology.getTopologyId() : -1;
}

Expand Down Expand Up @@ -548,6 +549,6 @@ public RpcOptions getDefaultRpcOptions(boolean sync, DeliverOrder deliverOrder)

@Override
public List<Address> getMembers() {
return stateTransferManager.getCacheTopology().getMembers();
return distributionManager.getCacheTopology().getMembers();
}
}
Expand Up @@ -323,6 +323,21 @@ default <T> CompletionStage<T> invokeCommandOnAll(ReplicableCommand command, Res
return invokeCommand(getMembers(), command, collector, deliverOrder, timeout, unit);
}

/**
* Invoke a command on all the nodes in the cluster and pass the responses to a {@link ResponseCollector}.
* <p>
* The command is only executed on the local node if the delivery order is {@link DeliverOrder#TOTAL}.
* The command is not sent across RELAY2 bridges to remote sites.
*
* @since 9.3
*/
@Experimental
default <T> CompletionStage<T> invokeCommandOnAll(Collection<Address> requiredTargets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder,
long timeout, TimeUnit unit) {
return invokeCommand(requiredTargets, command, collector, deliverOrder, timeout, unit);
}

/**
* Invoke a command on a collection of nodes and pass the responses to a {@link ResponseCollector}.
* <p>
Expand Down
@@ -1,10 +1,9 @@
package org.infinispan.remoting.transport.jgroups;

import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.infinispan.commons.util.ImmutableHopscotchHashSet;
import org.infinispan.commons.util.Immutables;
import org.infinispan.remoting.transport.Address;

Expand All @@ -27,7 +26,7 @@ public class ClusterView {
ClusterView(int viewId, List<Address> members, Address self) {
this.viewId = viewId;
this.members = Immutables.immutableListCopy(members);
this.membersSet = Collections.unmodifiableSet(new HashSet<>(members));
this.membersSet = new ImmutableHopscotchHashSet<>(members);
if (!members.isEmpty()) {
this.coordinator = members.get(0);
this.isCoordinator = coordinator.equals(self);
Expand Down
Expand Up @@ -872,6 +872,29 @@ public <T> CompletionStage<T> invokeCommandOnAll(ReplicableCommand command, Resp
return request;
}

@Override
public <T> CompletionStage<T> invokeCommandOnAll(Collection<Address> requiredTargets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder,
long timeout, TimeUnit unit) {
long requestId = requests.newRequestId();
logRequest(requestId, command, "all-required");
Address excludedTarget = deliverOrder == DeliverOrder.TOTAL ? null : getAddress();
MultiTargetRequest<T> request =
new MultiTargetRequest<>(collector, requestId, requests, requiredTargets, excludedTarget);
try {
addRequest(request);
request.onNewView(clusterView.getMembersSet());
sendCommandToAll(command, requestId, deliverOrder, isRsvpCommand(command));
} catch (Throwable t) {
request.cancel(true);
throw t;
}
if (timeout > 0) {
request.setTimeout(timeoutExecutor, timeout, unit);
}
return request;
}

@Override
public <T> CompletionStage<T> invokeCommandStaggered(Collection<Address> targets, ReplicableCommand command,
ResponseCollector<T> collector, DeliverOrder deliverOrder,
Expand Down
Expand Up @@ -10,24 +10,19 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import javax.transaction.RollbackException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;

import org.infinispan.AdvancedCache;
import org.infinispan.api.mvcc.LockAssert;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.write.ClearCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcManagerImpl;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
Expand Down Expand Up @@ -208,8 +203,7 @@ public void testCacheMode() throws Exception {

when(mockTransport.getMembers()).thenReturn(members);
when(mockTransport.getAddress()).thenReturn(addressOne);
when(mockTransport.invokeCommandOnAll(any(ReplicableCommand.class), any(ResponseCollector.class),
any(DeliverOrder.class), anyLong(), any(TimeUnit.class)))
when(mockTransport.invokeCommandOnAll(any(), any(), any(), any(), anyLong(), any()))
.thenReturn(CompletableFutures.completedNull());

cache1.put("k", "v");
Expand Down

0 comments on commit d1da51d

Please sign in to comment.