Skip to content

Commit

Permalink
ISPN-6300 Add missing bindings to dist remote exec
Browse files Browse the repository at this point in the history
* Added cacheManager and context parameters as bindings for distributed
  remote task execution.
* Marshaller not currently passed since the marshaller in the context
  can't easily be initialised in the distributed tasks.
* Added tests to verify that bindings are correctly set for normal
  remote execution and typed remote execution.
  • Loading branch information
galderz authored and tristantarrant committed Apr 29, 2016
1 parent 4f0f1bd commit e23b912
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 28 deletions.
Expand Up @@ -9,16 +9,19 @@
import org.infinispan.scripting.ScriptingManager; import org.infinispan.scripting.ScriptingManager;
import org.infinispan.scripting.utils.ScriptingUtils; import org.infinispan.scripting.utils.ScriptingUtils;
import org.infinispan.test.TestingUtil; import org.infinispan.test.TestingUtil;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterMethod; import org.testng.annotations.AfterMethod;
import org.testng.annotations.DataProvider; import org.testng.annotations.DataProvider;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer;


import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.loadScript; import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.loadScript;
import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.withScript; import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.withScript;
Expand Down Expand Up @@ -169,9 +172,40 @@ public void testExecReturnNull(String cacheName) throws IOException {
}); });
} }


@Test(dataProvider = "CacheNameProvider")
public void testLocalExecPutGet(String cacheName) {
execPutGet(cacheName, "/test-put-get.js", ExecMode.LOCAL, "local-key", "local-value");
}

@Test(dataProvider = "CacheNameProvider")
public void testDistExecPutGet(String cacheName) {
execPutGet(cacheName, "/test-put-get-dist.js", ExecMode.DIST, "dist-key", "dist-value");
}

private void execPutGet(String cacheName, String path, ExecMode mode, String key, String value) {
withScript(manager(0), path, scriptName -> {
Map<String, String> params = new HashMap<>();
params.put("k", key);
params.put("v", value);
Object results = clients.get(0).getCache(cacheName).execute(scriptName, params);
mode.assertResult.accept(value, results);
});
}

@DataProvider(name = "CacheNameProvider") @DataProvider(name = "CacheNameProvider")
private static Object[][] provideCacheMode() { private static Object[][] provideCacheMode() {
return new Object[][] {{REPL_CACHE}, {DIST_CACHE}}; return new Object[][] {{REPL_CACHE}, {DIST_CACHE}};
} }


enum ExecMode {
LOCAL(AssertJUnit::assertEquals),
DIST((v, r) -> assertEquals(Arrays.asList(v, v), r));

final BiConsumer<String, Object> assertResult;

ExecMode(BiConsumer<String, Object> assertResult) {
this.assertResult = assertResult;
}
}

} }
Expand Up @@ -6,11 +6,13 @@
import org.infinispan.commons.marshall.StringMarshaller; import org.infinispan.commons.marshall.StringMarshaller;
import org.infinispan.configuration.cache.CacheMode; import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.testng.AssertJUnit;
import org.testng.annotations.Test; import org.testng.annotations.Test;


import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.function.BiConsumer;


import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.withScript; import static org.infinispan.client.hotrod.test.HotRodClientTestingUtil.withScript;
import static org.testng.AssertJUnit.assertEquals; import static org.testng.AssertJUnit.assertEquals;
Expand Down Expand Up @@ -53,27 +55,15 @@ private RemoteCacheManager createAddScriptClient() {
return new InternalRemoteCacheManager(clientBuilder.build()); return new InternalRemoteCacheManager(clientBuilder.build());
} }


public void testRemoteTypedScriptPutGetExecute() throws Exception { public void testLocalTypedExecPutGet() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-put-get.js", scriptName -> { execPutGet("/typed-put-get.js", ExecMode.LOCAL, "local-typed-key", "local-typed-value");
Map<String, String> params = new HashMap<>();
params.put("k", "typed-key");
params.put("v", "typed-value");
String result = execClient.getCache().execute(scriptName, params);
assertEquals("typed-value", result);
});
} }


public void testRemoteTypedScriptPutGetExecuteCyrillic() throws Exception { public void testLocalTypedExecPutGetCyrillic() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-put-get.js", scriptName -> { execPutGet("/typed-put-get.js", ExecMode.LOCAL, "բարև", "привет");
Map<String, String> params = new HashMap<>();
params.put("k", "բարև");
params.put("v", "привет");
String result = execClient.getCache().execute(scriptName, params);
assertEquals("привет", result);
});
} }


public void testPutGetEmptyString() throws Exception { public void testLocalTypedExecPutGetEmptyString() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-put-get.js", scriptName -> { withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-put-get.js", scriptName -> {
Map<String, String> params = new HashMap<>(); Map<String, String> params = new HashMap<>();
params.put("k", "empty-key"); params.put("k", "empty-key");
Expand All @@ -83,26 +73,51 @@ public void testPutGetEmptyString() throws Exception {
}); });
} }


public void testRemoteTypedScriptSizeExecute() throws Exception { public void testLocalTypedExecSize() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-size.js", scriptName -> { withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-size.js", scriptName -> {
clients.get(0).getCache().clear(); clients.get(0).getCache().clear();
String result = execClient.getCache().execute(scriptName, new HashMap<>()); String result = execClient.getCache().execute(scriptName, new HashMap<>());
assertEquals("0", result); assertEquals("0", result);
}); });
} }


public void testRemoteTypedCacheManagerScriptExecute() throws Exception { public void testLocalTypedExecWithCacheManager() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-cachemanager-put-get.js", scriptName -> { withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-cachemanager-put-get.js", scriptName -> {
String result = execClient.getCache().execute(scriptName, new HashMap<>()); String result = execClient.getCache().execute(scriptName, new HashMap<>());
assertEquals("a", result); assertEquals("a", result);
}); });
} }


public void testRemoteTypedScriptNullReturnExecute() throws Exception { public void testLocalTypedExecNullReturn() {
withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-null-return.js", scriptName -> { withScript(addScriptClient.getCache(SCRIPT_CACHE), "/typed-null-return.js", scriptName -> {
String result = clients.get(0).getCache().execute(scriptName, new HashMap<>()); String result = clients.get(0).getCache().execute(scriptName, new HashMap<>());
assertEquals(null, result); assertEquals(null, result);
}); });
} }


public void testDistTypedExecPutGet() {
execPutGet("/typed-put-get-dist.js", ExecMode.DIST, "dist-typed-key", "dist-typed-value");
}

private void execPutGet(String path, ExecMode mode, String key, String value) {
withScript(addScriptClient.getCache(SCRIPT_CACHE), path, scriptName -> {
Map<String, String> params = new HashMap<>();
params.put("k", key);
params.put("v", value);
String result = execClient.getCache().execute(scriptName, params);
mode.assertResult.accept(value, result);
});
}

enum ExecMode {
LOCAL(AssertJUnit::assertEquals),
DIST((v, r) -> assertEquals(String.format("[\"%1$s\", \"%1$s\"]", v), r));

final BiConsumer<String, String> assertResult;

ExecMode(BiConsumer<String, String> assertResult) {
this.assertResult = assertResult;
}
}

} }
4 changes: 4 additions & 0 deletions client/hotrod-client/src/test/resources/typed-put-get-dist.js
@@ -0,0 +1,4 @@
// mode=distributed,language=javascript,parameters=[k, v],datatype='text/plain; charset=utf-8'
var cache = cacheManager.getCache();
cache.put(k, v);
cache.get(k);
Expand Up @@ -5,6 +5,7 @@
import org.infinispan.commons.util.Immutables; import org.infinispan.commons.util.Immutables;


import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
Expand Down Expand Up @@ -47,11 +48,15 @@ static final class Utf8Transformer implements Transformer {
} }


@Override @Override
@SuppressWarnings("unchecked")
public Object fromDataType(Object obj, Optional<Marshaller> marshaller) { public Object fromDataType(Object obj, Optional<Marshaller> marshaller) {
return Objects.isNull(obj) if (obj instanceof List)
? null : obj instanceof String return ((List) obj).stream()
? ((String) obj).getBytes(CHARSET_UTF8) .map(Object::toString)
: obj.toString().getBytes(CHARSET_UTF8); .collect(Collectors.joining("\", \"", "[\"", "\"]"))
.toString().getBytes(CHARSET_UTF8);

return Objects.isNull(obj) ? null : obj.toString().getBytes(CHARSET_UTF8);
} }


private static String asString(Object v) { private static String asString(Object v) {
Expand Down
@@ -1,10 +1,10 @@
package org.infinispan.scripting.impl; package org.infinispan.scripting.impl;


import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodHandles;
import java.util.ArrayList; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.distexec.DefaultExecutorService; import org.infinispan.distexec.DefaultExecutorService;
Expand Down Expand Up @@ -33,11 +33,19 @@ public <T> CompletableFuture<T> runScript(ScriptingManagerImpl scriptManager, Sc
} }
DefaultExecutorService des = new DefaultExecutorService(masterCacheNode); DefaultExecutorService des = new DefaultExecutorService(masterCacheNode);
try { try {
List<CompletableFuture<T>> tasks = des.submitEverywhere(new DistributedScript<T>(metadata)); Map<String, Object> ctxParams = extractContextParams(metadata, binding);
List<CompletableFuture<T>> tasks = des.submitEverywhere(new DistributedScript<T>(metadata, ctxParams));


return (CompletableFuture<T>) CompletableFutures.sequence(tasks); return (CompletableFuture<T>) CompletableFutures.sequence(tasks);
} finally { } finally {
des.shutdown(); des.shutdown();
} }
} }

private Map<String, Object> extractContextParams(ScriptMetadata metadata, CacheScriptBindings binding) {
Map<String, Object> params = new HashMap<>();
metadata.parameters().stream().forEach(paramName -> params.put(paramName, binding.get(paramName)));
return params;
}

} }
@@ -1,6 +1,7 @@
package org.infinispan.scripting.impl; package org.infinispan.scripting.impl;


import java.io.Serializable; import java.io.Serializable;
import java.util.Map;
import java.util.Set; import java.util.Set;


import javax.script.Bindings; import javax.script.Bindings;
Expand All @@ -18,11 +19,13 @@
*/ */
class DistributedScript<T> implements DistributedCallable<Object, Object, T>, Serializable { class DistributedScript<T> implements DistributedCallable<Object, Object, T>, Serializable {
private final ScriptMetadata metadata; private final ScriptMetadata metadata;
private final Map<String, ?> ctxParams;
private transient ScriptingManagerImpl scriptManager; private transient ScriptingManagerImpl scriptManager;
private transient Bindings bindings; private transient Bindings bindings;


DistributedScript(ScriptMetadata metadata) { DistributedScript(ScriptMetadata metadata, Map<String, ?> ctxParams) {
this.metadata = metadata; this.metadata = metadata;
this.ctxParams = ctxParams;
} }


@Override @Override
Expand All @@ -36,5 +39,7 @@ public void setEnvironment(Cache<Object, Object> cache, Set<Object> inputKeys) {
bindings = new SimpleBindings(); bindings = new SimpleBindings();
bindings.put("inputKeys", inputKeys); bindings.put("inputKeys", inputKeys);
bindings.put("cache", cache); bindings.put("cache", cache);
bindings.put("cacheManager", cache.getCacheManager());
ctxParams.entrySet().stream().forEach(e -> bindings.put(e.getKey(), e.getValue()));
} }
} }
4 changes: 4 additions & 0 deletions scripting/src/test/resources/test-put-get-dist.js
@@ -0,0 +1,4 @@
// mode=distributed,language=javascript,parameters=[k, v]
var cache = cacheManager.getCache();
cache.put(k, v);
cache.get(k);
4 changes: 4 additions & 0 deletions scripting/src/test/resources/test-put-get.js
@@ -0,0 +1,4 @@
// mode=local,language=javascript,parameters=[k, v]
var cache = cacheManager.getCache();
cache.put(k, v);
cache.get(k);

0 comments on commit e23b912

Please sign in to comment.