Skip to content

Commit

Permalink
ISPN-1984 - submitEverywhere fails in REPL_SYNC mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Blagojevic authored and galderz committed Apr 24, 2012
1 parent 58d957c commit 35beb57
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
*/
package org.infinispan.distexec;

import java.io.Externalizable;
import java.io.NotSerializableException;
import java.io.Serializable;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
Expand All @@ -42,6 +45,12 @@
* <p>
*
*
* Note that due to potential task migration to other nodes every {@link Callable},
* {@link Runnable} and/or {@link DistributedCallable} submitted must be either {@link Serializable}
* or {@link Externalizable}. Also the value returned from a callable must be {@link Serializable}
* or {@link Externalizable}. Unfortunately if the value returned is not serializable then a
* {@link NotSerializableException} will be thrown.
*
* @see DefaultExecutorService
* @see DistributedCallable
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
*/
package org.infinispan.remoting.responses;

import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.read.DistributedExecuteCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.remote.SingleRpcCommand;
import org.infinispan.container.versioning.EntryVersionsMap;

/**
Expand All @@ -34,6 +37,15 @@
public class DefaultResponseGenerator implements ResponseGenerator {
@Override
public Response getResponse(CacheRpcCommand command, Object returnValue) {
if (command instanceof SingleRpcCommand) {
//https://issues.jboss.org/browse/ISPN-1984
SingleRpcCommand src = (SingleRpcCommand) command;
ReplicableCommand c = src.getCommand();
if (c.getCommandId()== DistributedExecuteCommand.COMMAND_ID) {
// Even null values should be wrapped in this case.
return new SuccessfulResponse(returnValue);
}
}
if (returnValue == null) return null;
if (returnValue instanceof EntryVersionsMap || command.isReturnValueExpected()) {
return SuccessfulResponse.create(returnValue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,13 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.infinispan.Cache;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.test.MultipleCacheManagersTest;
import org.testng.annotations.Test;

/**
Expand All @@ -41,18 +42,34 @@
* @author Vladimir Blagojevic
*/
@Test(groups = "functional", testName = "distexec.DistributedExecutorTest")
public class DistributedExecutorTest extends BaseDistFunctionalTest {
public class DistributedExecutorTest extends MultipleCacheManagersTest {

public DistributedExecutorTest() {
cleanup = CleanupPhase.AFTER_TEST;
}


@Override
protected void createCacheManagers() throws Throwable {
super.createCacheManagers();
ConfigurationBuilder builder = getDefaultClusteredCacheConfig(getCacheMode(), true);
createClusteredCaches(2, cacheName(), builder);
}

protected String cacheName() {
return "DistributedExecutorTest-DIST_SYNC";
}

protected CacheMode getCacheMode() {
return CacheMode.DIST_SYNC;
}

protected Cache<Object, Object> getCache(){
return cache(0, cacheName());
}


public void testBasicInvocation() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());
Future<Integer> future = des.submit(new SimpleCallable());

Integer r = future.get();
Expand All @@ -61,7 +78,7 @@ public void testBasicInvocation() throws Exception {

public void testExceptionInvocation() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());

Future<Integer> future = des.submit(new ExceptionThrowingCallable());
int exceptionCount = 0;
Expand All @@ -86,33 +103,20 @@ public void testExceptionInvocation() throws Exception {
assert exceptionCount == list.size();
}



public void testRunnableInvocation() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());

Future<?> future = des.submit(new BoringRunnable());
Object object = future.get();
assert object == null;

des.execute(new BoringRunnable());
int exceptionCount = 0;
try {
des.execute(new Runnable() {
@Override
public void run() {
}
});
throw new Exception("Should not have happened");
} catch (IllegalArgumentException iae) {
exceptionCount++;
}

assert exceptionCount == 1;
}

public void testInvokeAny() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());

List<SimpleCallable> tasks = new ArrayList<SimpleCallable>();
tasks.add(new SimpleCallable());
Expand All @@ -128,7 +132,7 @@ public void testInvokeAny() throws Exception {

public void testInvokeAll() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());

List<SimpleCallable> tasks = new ArrayList<SimpleCallable>();
tasks.add(new SimpleCallable());
Expand Down Expand Up @@ -156,7 +160,7 @@ public void testInvokeAll() throws Exception {
* @throws Exception
*/
public void testCallableIsolation() throws Exception {
DefaultExecutorService des = new DefaultExecutorService(c1);
DefaultExecutorService des = new DefaultExecutorService(getCache());

List<Future<Integer>> list = des.submitEverywhere(new SimpleCallableWithField());
assert list != null && !list.isEmpty();
Expand All @@ -166,7 +170,7 @@ public void testCallableIsolation() throws Exception {
}

public void testTaskCancellation() throws Exception {
DistributedExecutorService des = new DefaultExecutorService(c2);
DistributedExecutorService des = new DefaultExecutorService(getCache());
Future<Integer> future = des.submit(new SimpleCallable());
if (future.cancel(true)){
assert future.isCancelled();
Expand All @@ -176,19 +180,20 @@ public void testTaskCancellation() throws Exception {

public void testBasicDistributedCallable() throws Exception {

DistributedExecutorService des = new DefaultExecutorService(c2);
DistributedExecutorService des = new DefaultExecutorService(getCache());
Future<Boolean> future = des.submit(new SimpleDistributedCallable(false));
Boolean r = future.get();
assert r;
}

public void testBasicDistributedCallableWitkKeys() throws Exception {
Cache<Object, Object> c1 = getCache();
c1.put("key1", "Manik");
c1.put("key2", "Mircea");
c1.put("key3", "Galder");
c1.put("key4", "Sanne");

DistributedExecutorService des = new DefaultExecutorService(c1);
DistributedExecutorService des = new DefaultExecutorService(getCache());

Future<Boolean> future = des.submit(new SimpleDistributedCallable(true), new String[] {
"key1", "key2" });
Expand All @@ -197,12 +202,13 @@ public void testBasicDistributedCallableWitkKeys() throws Exception {
}

public void testDistributedCallableEverywhereWithKeys() throws Exception {
Cache<Object, Object> c1 = getCache();
c1.put("key1", "Manik");
c1.put("key2", "Mircea");
c1.put("key3", "Galder");
c1.put("key4", "Sanne");

DefaultExecutorService des = new DefaultExecutorService(c1);
DefaultExecutorService des = new DefaultExecutorService(getCache());

List<Future<Boolean>> list = des.submitEverywhere(new SimpleDistributedCallable(true),
new String[] { "key1", "key2" });
Expand All @@ -214,7 +220,7 @@ public void testDistributedCallableEverywhereWithKeys() throws Exception {

public void testDistributedCallableEverywhere() throws Exception {

DefaultExecutorService des = new DefaultExecutorService(c1);
DefaultExecutorService des = new DefaultExecutorService(getCache());

List<Future<Boolean>> list = des.submitEverywhere(new SimpleDistributedCallable(false));
assert list != null && !list.isEmpty();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* JBoss, Home of Professional Open Source
* Copyright 2011 Red Hat Inc. and/or its affiliates and other
* contributors as indicated by the @author tags. All rights reserved.
* See the copyright.txt in the distribution for a full listing of
* individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.infinispan.distexec;

import org.infinispan.configuration.cache.CacheMode;
import org.testng.annotations.Test;

/**
* Tests org.infinispan.distexec.DistributedExecutorService in REPL_SYNC mode
*
* @author Vladimir Blagojevic
*/
@Test(groups = "functional", testName = "distexec.ReplSyncDistributedExecutorTest")
public class ReplSyncDistributedExecutorTest extends DistributedExecutorTest {

public ReplSyncDistributedExecutorTest() {
cleanup = CleanupPhase.AFTER_TEST;
}

protected String cacheName() {
return "DistributedExecutorTest-REPL_SYNC";
}

protected CacheMode getCacheMode() {
return CacheMode.REPL_SYNC;
}
}

0 comments on commit 35beb57

Please sign in to comment.