Skip to content

Commit

Permalink
ISPN-6551 - Port some tests for Remote Server Task execution to domai…
Browse files Browse the repository at this point in the history
…n mode
  • Loading branch information
andyuk1986 authored and tristantarrant committed May 16, 2016
1 parent 1d8741d commit 22a3cae
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 176 deletions.
Expand Up @@ -69,6 +69,7 @@ public static void beforeClass() throws Exception {
client.enableJmx();
if (isDistributedMode()) {
testCache = "distTestCache";
client.addDistributedCacheConfiguration("distCacheConfiguration", "clustered");
client.addDistributedCache(testCache, "clustered", "distCacheConfiguration");
} else if (isLocalMode()) {
final String targetContainer = "local";
Expand Down
Expand Up @@ -39,6 +39,7 @@ public static void beforeClass() throws Exception {
client.enableJmx();
if (isDistributedMode()) {
testCache = "cmDistTestCache";
client.addDistributedCacheConfiguration("distCacheConfiguration", "clustered");
client.addDistributedCache(testCache, "clustered", "distCacheConfiguration");
} else {
testCache = "cmReplTestCache";
Expand Down
@@ -0,0 +1,165 @@
package org.infinispan.server.test.task;

import org.infinispan.arquillian.core.RemoteInfinispanServer;
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ConfigurationBuilder;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.server.test.task.servertask.DistributedCacheUsingTask;
import org.infinispan.server.test.task.servertask.DistributedJSExecutingServerTask;
import org.infinispan.server.test.task.servertask.DistributedMapReduceServerTask;
import org.infinispan.server.test.task.servertask.DistributedTestServerTask;
import org.infinispan.server.test.task.servertask.JSExecutingServerTask;
import org.infinispan.server.test.task.servertask.LocalMapReduceServerTask;
import org.infinispan.tasks.ServerTask;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.junit.Assert.*;

/**
* Abstract class holding tests for Remote Task Execution in Distributed mode.
*
* @author amanukya
*/
public abstract class AbstractDistributedServerTaskIT {

@Rule
public ExpectedException exceptionRule = ExpectedException.none();
private static List<RemoteCacheManager> managers = null;

protected abstract List<RemoteInfinispanServer> getServers();
protected static List<String> expectedServerList;
protected static final String CACHE_NAME = DistributedMapReduceServerTask.CACHE_NAME;
protected static final String CACHE_NAME_TX = DistributedCacheUsingTask.CACHE_NAME;

@Before
public void setUp() {
if (managers == null) {
Configuration conf;
managers = new ArrayList<>();
for (RemoteInfinispanServer server : getServers()) {
conf = new ConfigurationBuilder().addServer().host(server.getHotrodEndpoint().getInetAddress().getHostName())
.port(server.getHotrodEndpoint().getPort()).build();
managers.add(new RemoteCacheManager(conf, true));
}
}

for (RemoteCacheManager rcm : managers) {
rcm.getCache().clear();
rcm.getCache(CACHE_NAME).clear();
rcm.getCache(CACHE_NAME_TX).clear();
}
}

@AfterClass
public static void release() {
if (managers != null && !managers.isEmpty()) {
for (RemoteCacheManager manager : managers) {
manager.stop();
}
}
}

protected static JavaArchive createJavaArchive() {
JavaArchive jar = ShrinkWrap.create(JavaArchive.class);
jar.addClass(DistributedTestServerTask.class);
jar.addClass(DistributedCacheUsingTask.class);
jar.addClass(DistributedMapReduceServerTask.class);
jar.addClass(DistributedJSExecutingServerTask.class);
jar.addClass(LocalMapReduceServerTask.class);
jar.addClass(JSExecutingServerTask.class);
jar.addAsServiceProvider(ServerTask.class, DistributedTestServerTask.class, DistributedCacheUsingTask.class,
DistributedMapReduceServerTask.class, DistributedJSExecutingServerTask.class);

return jar;
}

@Test
@SuppressWarnings("unchecked")
public void shouldGatherNodeNamesInRemoteTasks() throws Exception {
Object resultObject = managers.get(0).getCache().execute(DistributedTestServerTask.NAME, Collections.emptyMap());
assertNotNull(resultObject);
List<String> result = (List<String>) resultObject;
assertEquals(2, result.size());
System.out.println("The RESULT IS: " + result);
assertTrue("result list does not contain expected items.", result.containsAll(expectedServerList));
}

@Test
@SuppressWarnings("unchecked")
public void shouldThrowExceptionInRemoteTasks() throws Exception {
Map<String, Boolean> params = new HashMap<String, Boolean>();
params.put("throwException", true);

exceptionRule.expect(HotRodClientException.class);
exceptionRule.expectMessage("Intentionally Thrown Exception");

managers.get(0).getCache().execute(DistributedTestServerTask.NAME, params);
}

@Test
@SuppressWarnings("unchecked")
public void shouldPutNewValueInRemoteCache() throws Exception {
String key = "key";
String value = "value";
String paramValue = "parameter";

Map<String, String> params = new HashMap<>();
params.put(DistributedCacheUsingTask.PARAM_KEY, paramValue);
managers.get(1).getCache(CACHE_NAME_TX);
managers.get(0).getCache(CACHE_NAME_TX).put(key, value);

managers.get(0).getCache(CACHE_NAME_TX).execute(DistributedCacheUsingTask.NAME, params);
assertEquals("modified:modified:value:parameter:parameter", managers.get(0).getCache(CACHE_NAME_TX).get(key));
}

@Test
@SuppressWarnings("unchecked")
public void shouldExecuteMapReduceOnReplCacheViaTask() throws Exception {
RemoteCache remoteCache = managers.get(1).getCache(DistributedMapReduceServerTask.CACHE_NAME);
remoteCache.put(1, "word1 word2 word3");
remoteCache.put(2, "word1 word2");
remoteCache.put(3, "word1");

List<Map<String, Long>> result = (List<Map<String, Long>>)remoteCache.execute(DistributedMapReduceServerTask.NAME, Collections.emptyMap());
assertEquals(2, result.size());
verifyMapReduceResult(result.get(0));
verifyMapReduceResult(result.get(1));

}

@Test
@Ignore(value="Is disabled until the issue ISPN-6303 and ISPN-6173 are fixed.")
public void shouldExecuteMapReduceViaJavaScriptInTask() throws Exception {
RemoteCache remoteCache = managers.get(1).getCache(DistributedJSExecutingServerTask.CACHE_NAME);
remoteCache.put(1, "word1 word2 word3");
remoteCache.put(2, "word1 word2");
remoteCache.put(3, "word1");

List<Map<String, Long>> result = (List<Map<String, Long>>)remoteCache.execute(DistributedJSExecutingServerTask.NAME, Collections.emptyMap());
assertEquals(2, result.size());
verifyMapReduceResult(result.get(0));
verifyMapReduceResult(result.get(1));
}

private void verifyMapReduceResult(Map<String, Long> result) {
assertEquals(3, result.size());
assertEquals(3, result.get("word1").intValue());
assertEquals(2, result.get("word2").intValue());
assertEquals(1, result.get("word3").intValue());
}
}
@@ -0,0 +1,118 @@
package org.infinispan.server.test.task;

import org.infinispan.arquillian.core.InfinispanResource;
import org.infinispan.arquillian.core.RemoteInfinispanServer;
import org.infinispan.server.test.category.HotRodClusteredDomain;
import org.infinispan.server.test.category.Task;
import org.infinispan.server.test.util.ManagementClient;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.container.test.api.TargetsContainer;
import org.jboss.arquillian.junit.Arquillian;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static java.util.Arrays.asList;
import static org.infinispan.server.test.util.ITestUtils.isDistributedMode;
import static org.infinispan.server.test.util.ITestUtils.isReplicatedMode;

/**
* Tests running the remote task execution tests in Domain mode.
*
* @author amanukya
*/
@RunWith(Arquillian.class)
@Category({HotRodClusteredDomain.class, Task.class})
public class DistributedServerTaskDomainIT extends AbstractDistributedServerTaskIT {
@InfinispanResource(value = "master:server-one", jmxPort = 4447)
RemoteInfinispanServer server1;

@InfinispanResource(value = "master:server-two", jmxPort = 4597)
RemoteInfinispanServer server2;

private static final String CUSTOM_TEMPLATE_NAME = "testConf";
private static final String CUSTOM_TX_TEMPLATE_NAME = "testConfTx";

@Deployment(testable = false, name = "custom-distributed-task")
@TargetsContainer("cluster")
public static Archive<?> deploy() {
JavaArchive jar = createJavaArchive();
jar.addAsResource(new File("/stream_serverTask.js"));
jar.addAsManifestResource("MANIFEST.MF");

return jar;
}

@Override
protected List<RemoteInfinispanServer> getServers() {
List<RemoteInfinispanServer> servers = new ArrayList<RemoteInfinispanServer>();
servers.add(server1);
servers.add(server2);

return Collections.unmodifiableList(servers);
}

@BeforeClass
public static void beforeClass() throws Exception {
ManagementClient client = ManagementClient.getInstance();
client.enableJmx();

//Adding TX configuration & Cache with enabled compatibility
Map<String, String> txAttrs = new HashMap<>();
txAttrs.put("mode", "NON_XA");
txAttrs.put("locking", "PESSIMISTIC");

if (isDistributedMode()) {
client.addDistributedCacheConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");
client.addDistributedCache(CACHE_NAME, "clustered", CUSTOM_TEMPLATE_NAME);
client.enableCompatibilityForDistConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");

client.addDistributedCacheConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
client.enableTransactionForDistConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered", txAttrs);
client.enableCompatibilityForDistConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
client.addDistributedCache(CACHE_NAME_TX, "clustered", CUSTOM_TX_TEMPLATE_NAME);
} else if (isReplicatedMode()) {
client.addReplicatedCacheConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");
client.enableCompatibilityForReplConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");
client.addReplicatedCache(CACHE_NAME, "clustered", CUSTOM_TEMPLATE_NAME);

client.addReplicatedCacheConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
client.enableTransactionForReplConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered", txAttrs);
client.enableCompatibilityForReplConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
client.addReplicatedCache(CACHE_NAME_TX, "clustered", CUSTOM_TX_TEMPLATE_NAME);
}

//@TODO The next line is a workaround for JDG-314. Please remove this line when the JDG-314 is fixed.
client.reloadServer();

expectedServerList = asList("master:server-two", "master:server-one");
}

@AfterClass
public static void afterClass() throws Exception {
ManagementClient client = ManagementClient.getInstance();
if (isDistributedMode()) {
client.removeDistributedCache(CACHE_NAME, "clustered");
client.removeDistributedCache(CACHE_NAME_TX, "clustered");
client.removeDistributedCacheConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");
client.removeDistributedCacheConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
} else if (isReplicatedMode()) {
client.removeReplicatedCache(CACHE_NAME, "clustered");
client.removeReplicatedCache(CACHE_NAME_TX, "clustered");
client.removeReplicatedCacheConfiguration(CUSTOM_TEMPLATE_NAME, "clustered");
client.removeReplicatedCacheConfiguration(CUSTOM_TX_TEMPLATE_NAME, "clustered");
}

client.disableJmx();
}
}

0 comments on commit 22a3cae

Please sign in to comment.