Permalink
Browse files

ISPN-1910 - State transfer should not force all invocations to be syn…

…chronous

I removed this limitation and replay support altogether, since it was no longer used.
  • Loading branch information...
danberindei authored and galderz committed Mar 14, 2012
1 parent 91441ac commit 29701b7d8e2a77052d2aa94593e35357a6b10d8b
@@ -32,7 +32,6 @@
import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.infinispan.remoting.MembershipArithmetic;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.ResponseMode;
@@ -212,7 +211,7 @@ public void join(String cacheName, CacheViewListener listener) throws Exception
CacheViewControlCommand.Type.REQUEST_JOIN, self);
// If we get a SuspectException we can ignore it, the new coordinator will come asking for our state anyway
Map<Address,Response> rspList = transport.invokeRemotely(Collections.singleton(coordinator), cmd,
- ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null, false);
+ ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null);
checkRemoteResponse(cacheName, cmd, rspList);
}
}
@@ -231,7 +230,7 @@ public void leave(String cacheName) {
final CacheViewControlCommand cmd = new CacheViewControlCommand(cacheName,
CacheViewControlCommand.Type.REQUEST_LEAVE, self);
// ignore any response from the other members
- transport.invokeRemotely(members, cmd, ResponseMode.ASYNCHRONOUS, timeout, false, null, false);
+ transport.invokeRemotely(members, cmd, ResponseMode.ASYNCHRONOUS, timeout, false, null);
} catch (Exception e) {
log.debugf(e, "%s: Error while leaving cache view", cacheName);
}
@@ -301,7 +300,7 @@ private CacheView clusterPrepareView(final String cacheName, final CacheView pen
@Override
public Map<Address, Response> call() throws Exception {
Map<Address, Response> rspList = transport.invokeRemotely(pendingView.getMembers(), cmd,
- ResponseMode.SYNCHRONOUS, timeout, false, null, false);
+ ResponseMode.SYNCHRONOUS, timeout, false, null);
return rspList;
}
});
@@ -343,7 +342,7 @@ private void clusterRollbackView(final String cacheName, int committedViewId, Li
CacheViewControlCommand.Type.ROLLBACK_VIEW, self, newViewId, null, committedViewId, null);
// wait until we get all the responses, but ignore the results
Map<Address, Response> rspList = transport.invokeRemotely(validTargets, cmd,
- ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null, false);
+ ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null);
checkRemoteResponse(cacheName, cmd, rspList);
} catch (Throwable t) {
log.cacheViewRollbackFailure(t, committedViewId, cacheName);
@@ -375,7 +374,7 @@ private void clusterCommitView(final String cacheName, final int viewId, List<Ad
CacheViewControlCommand.Type.COMMIT_VIEW, self, viewId);
// wait until we get all the responses, but ignore the results
Map<Address, Response> rspList = transport.invokeRemotely(validTargets, cmd,
- ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null, false);
+ ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, false, null);
checkRemoteResponse(cacheName, cmd, rspList);
} catch (Throwable t) {
log.cacheViewCommitFailure(t, viewId, cacheName);
@@ -661,7 +660,7 @@ private void recoverViews() {
@Override
public Map<Address, Response> call() throws Exception {
return transport.invokeRemotely(Collections.singleton(member), cmd,
- ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, true, null, false);
+ ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, timeout, true, null);
}
});
futures.add(future);
@@ -594,7 +594,7 @@ public void removeCache(String cacheName) {
if (transport != null) {
Configuration c = getConfiguration(cacheName);
// Use sync replication timeout
- transport.invokeRemotely(null, cmd, ResponseMode.SYNCHRONOUS, c.getSyncReplTimeout(), false, null, false);
+ transport.invokeRemotely(null, cmd, ResponseMode.SYNCHRONOUS, c.getSyncReplTimeout(), false, null);
}
// Once sent to the cluster, remove the local cache
cmd.perform(null);
@@ -78,8 +78,6 @@
import org.infinispan.marshall.exts.SetExternalizer;
import org.infinispan.marshall.exts.SingletonListExternalizer;
import org.infinispan.remoting.responses.ExceptionResponse;
-import org.infinispan.remoting.responses.ExtendedResponse;
-import org.infinispan.remoting.responses.RequestIgnoredResponse;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.responses.UnsuccessfulResponse;
import org.infinispan.remoting.responses.UnsureResponse;
@@ -245,10 +243,8 @@ private void loadInternalMarshallables() {
addInternalExternalizer(new Immutables.ImmutableMapWrapperExternalizer());
addInternalExternalizer(new MarshalledValue.Externalizer(globalMarshaller));
- addInternalExternalizer(new ExtendedResponse.Externalizer());
addInternalExternalizer(new SuccessfulResponse.Externalizer());
addInternalExternalizer(new ExceptionResponse.Externalizer());
- addInternalExternalizer(new RequestIgnoredResponse.Externalizer());
addInternalExternalizer(new UnsuccessfulResponse.Externalizer());
addInternalExternalizer(new UnsureResponse.Externalizer());
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009 Red Hat Inc. and/or its affiliates and other
- * contributors as indicated by the @author tags. All rights reserved.
- * See the copyright.txt in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.remoting.responses;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Set;
-
-import org.infinispan.marshall.AbstractExternalizer;
-import org.infinispan.marshall.Ids;
-import org.infinispan.util.Util;
-
-/**
- * A response with extended information
- *
- * @author Jason T. Greene
- */
-public class ExtendedResponse extends ValidResponse {
- private final boolean replayIgnoredRequests;
- private final Response response;
-
- public ExtendedResponse(Response response, boolean replayIgnoredRequests) {
- this.response = response;
- this.replayIgnoredRequests = replayIgnoredRequests;
- }
-
- public boolean isReplayIgnoredRequests() {
- return replayIgnoredRequests;
- }
-
- public Response getResponse() {
- return response;
- }
-
- public boolean isSuccessful() {
- return response.isSuccessful();
- }
-
- public static class Externalizer extends AbstractExternalizer<ExtendedResponse> {
- @Override
- public void writeObject(ObjectOutput output, ExtendedResponse er) throws IOException {
- output.writeBoolean(er.replayIgnoredRequests);
- output.writeObject(er.response);
- }
-
- @Override
- public ExtendedResponse readObject(ObjectInput input) throws IOException, ClassNotFoundException {
- boolean replayIgnoredRequests = input.readBoolean();
- Response response = (Response) input.readObject();
- return new ExtendedResponse(response, replayIgnoredRequests);
- }
-
- @Override
- public Integer getId() {
- return Ids.EXTENDED_RESPONSE;
- }
-
- @Override
- public Set<Class<? extends ExtendedResponse>> getTypeClasses() {
- return Util.<Class<? extends ExtendedResponse>>asSet(ExtendedResponse.class);
- }
- }
-}
@@ -1,76 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2009 Red Hat Inc. and/or its affiliates and other
- * contributors as indicated by the @author tags. All rights reserved.
- * See the copyright.txt in the distribution for a full listing of
- * individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.infinispan.remoting.responses;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Set;
-
-import org.infinispan.marshall.AbstractExternalizer;
-import org.infinispan.marshall.Ids;
-import org.infinispan.util.Util;
-
-/**
- * Indicates that the request was ignored,
- *
- * @author Jason T. Greene
- */
-public class RequestIgnoredResponse extends InvalidResponse {
- public static final RequestIgnoredResponse INSTANCE = new RequestIgnoredResponse();
-
- private RequestIgnoredResponse() {
- }
-
- @Override
- public boolean isValid() {
- return true;
- }
-
- @Override
- public String toString() {
- return "RequestIgnoredResponse";
- }
-
- public static class Externalizer extends AbstractExternalizer<RequestIgnoredResponse> {
- @Override
- public void writeObject(ObjectOutput output, RequestIgnoredResponse object) throws IOException {
- // no-op
- }
-
- @Override
- public RequestIgnoredResponse readObject(ObjectInput input) throws IOException, ClassNotFoundException {
- return INSTANCE;
- }
-
- @Override
- public Integer getId() {
- return Ids.REQUEST_IGNORED_RESPONSE;
- }
-
- @Override
- public Set<Class<? extends RequestIgnoredResponse>> getTypeClasses() {
- return Util.<Class<? extends RequestIgnoredResponse>>asSet(RequestIgnoredResponse.class);
- }
- }
-}
@@ -145,7 +145,7 @@ private boolean useReplicationQueue(boolean sync) {
responseFilter = new IgnoreExtraResponsesValidityFilter(cacheMembers, getAddress());
}
}
- Map<Address, Response> result = t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter, stateTransferEnabled);
+ Map<Address, Response> result = t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
if (statisticsEnabled) replicationCount.incrementAndGet();
return result;
} catch (CacheException e) {
@@ -77,6 +77,7 @@ void initialize(@ComponentName(GLOBAL_MARSHALLER) StreamingMarshaller marshaller
/**
* Invokes an RPC call on other caches in the cluster.
*
+ *
* @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
* entire cluster.
* @param rpcCommand the cache command to invoke
@@ -85,12 +86,11 @@ void initialize(@ComponentName(GLOBAL_MARSHALLER) StreamingMarshaller marshaller
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
* implementations.
* @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
- * @param supportReplay whether replays of missed messages is supported
* @return a map of responses from each member contacted.
* @throws Exception in the event of problems.
*/
Map<Address, Response> invokeRemotely(Collection<Address> recipients, ReplicableCommand rpcCommand, ResponseMode mode, long timeout,
- boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay) throws Exception;
+ boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
Oops, something went wrong.

0 comments on commit 29701b7

Please sign in to comment.