Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into virgil-rest-handler

  • Loading branch information...
commit d47b5beee8ea563eeca9d2b9fc2327e3d46aa7d1 2 parents 04481d2 + 3645c4f
Nate McCall authored
5 src/main/java/org/usergrid/vx/experimental/multiprocessor/FactoryProvider.java
View
@@ -1,13 +1,16 @@
package org.usergrid.vx.experimental.multiprocessor;
+import org.usergrid.vx.experimental.multiprocessor.groovy.GroovyClMultiProcessorFactory;
import org.usergrid.vx.experimental.multiprocessor.groovy.GroovyMultiProcessorFactory;
public class FactoryProvider {
public MultiProcessorFactory getFilterFactory(String spec) {
switch (spec) {
- case "groovyclassloader":
+ case "groovyscript":
return new GroovyMultiProcessorFactory();
+ case "groovyclassloader":
+ return new GroovyClMultiProcessorFactory();
default:
throw new IllegalArgumentException(spec + " is not yet supported for filters");
}
24 src/main/java/org/usergrid/vx/experimental/multiprocessor/groovy/GroovyClMultiProcessorFactory.java
View
@@ -0,0 +1,24 @@
+package org.usergrid.vx.experimental.multiprocessor.groovy;
+
+import groovy.lang.GroovyClassLoader;
+
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessor;
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessorFactory;
+
+public class GroovyClMultiProcessorFactory implements MultiProcessorFactory{
+
+ @Override
+ public MultiProcessor createMultiProcessor(String script) {
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class<?> c = gc.parseClass( script) ;
+ MultiProcessor p = null;
+ try {
+ p = (MultiProcessor) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new RuntimeException (e);
+ }
+ return p;
+ }
+
+}
+
19 src/main/java/org/usergrid/vx/handler/http/OperationsRequestHandler.java
View
@@ -1,6 +1,7 @@
package org.usergrid.vx.handler.http;
import org.usergrid.vx.experimental.IntraOp;
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessor;
import org.usergrid.vx.experimental.processor.Processor;
import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
@@ -112,7 +113,23 @@ public void handle(Message<JsonObject> event) {
idGenerator.incrementAndGet();
TimeoutHandler timeoutHandler = new TimeoutHandler(this);
timerId = vertx.setTimer(timeout, timeoutHandler);
- if (operation.getString("type").equalsIgnoreCase("process")){
+ /*
+ * String name = (String) op.getOp().get("name");
+ Map params = (Map) op.getOp().get("params");
+ //Processor p = state.processors.get(processorName);
+ MultiProcessor p = state.multiProcessors.get(name);
+
+ List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
+ res.getOpsRes().put(i, mpResults);
+ */
+ if (operation.getString("type").equalsIgnoreCase("multiprocess")){
+ JsonObject params = operation.getObject("op");
+ JsonObject theParams = params.getObject("params");
+ operation.putObject("mpparams", theParams);
+ operation.putObject("mpres", results.getObject("opsRes"));
+ System.out.println("sendingevent to"+ params.getString("name"));
+ vertx.eventBus().send("multiprocessors." + params.getString("name"), operation, this);
+ } else if (operation.getString("type").equalsIgnoreCase("process")){
JsonObject params = operation.getObject("op");
Integer input = params.getInteger("input");
operation.putArray("input", this.results.getObject("opsRes").getArray(input+"") );
1  src/main/java/org/usergrid/vx/server/IntravertCassandraServer.java
View
@@ -149,6 +149,7 @@ public static void registerOperationHandlers(Vertx x) {
x.eventBus().registerHandler("request.createfilter", new CreateFilterHandler(x.eventBus()));
x.eventBus().registerHandler("request.createprocessor", new CreateProcessorHandler(x.eventBus()));
x.eventBus().registerHandler("request.filtermode", new FilterModeHandler());
+ x.eventBus().registerHandler("request.createmultiprocess", new CreateMultiProcessHandler(x.eventBus()));
}
}
43 src/main/java/org/usergrid/vx/server/operations/CreateMultiProcessHandler.java
View
@@ -0,0 +1,43 @@
+package org.usergrid.vx.server.operations;
+
+
+import org.usergrid.vx.experimental.multiprocessor.FactoryProvider;
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessor;
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessorFactory;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.eventbus.EventBus;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.core.json.JsonObject;
+
+public class CreateMultiProcessHandler implements Handler<Message<JsonObject>>{
+
+ private EventBus eb;
+
+ public CreateMultiProcessHandler(EventBus eb) {
+ this.eb = eb;
+ }
+
+ @Override
+ public void handle(Message<JsonObject> event) {
+ Integer id = event.body.getInteger("id");
+ JsonObject params = event.body.getObject("op");
+ String name = params.getString("name");
+ String lang = params.getString("spec");
+ String scriptSource = params.getString("value");
+
+ FactoryProvider factoryProvider = new FactoryProvider();
+ try {
+ MultiProcessorFactory processorFactory = factoryProvider.getFilterFactory(lang);
+ MultiProcessor processor = processorFactory.createMultiProcessor(scriptSource);
+ eb.registerHandler("multiprocessors." + name, new MultiProcessorHandler(processor));
+ System.out.println("created multprocessor "+name);
+ event.reply(new JsonObject().putString(id.toString(), "OK"));
+ } catch (IllegalArgumentException e) {
+ event.reply(new JsonObject()
+ .putString(id.toString(), e.getClass().getName())
+ .putString("exception", e.getMessage())
+ .putNumber("exceptionId", id));
+ }
+
+ }
+}
59 src/main/java/org/usergrid/vx/server/operations/MultiProcessorHandler.java
View
@@ -0,0 +1,59 @@
+package org.usergrid.vx.server.operations;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.usergrid.vx.experimental.IntraOp;
+import org.usergrid.vx.experimental.multiprocessor.MultiProcessor;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.core.json.JsonArray;
+import org.vertx.java.core.json.JsonObject;
+
+public class MultiProcessorHandler implements Handler<Message<JsonObject>> {
+
+ MultiProcessor p;
+
+ public MultiProcessorHandler(MultiProcessor p){
+ this.p=p;
+ }
+ /*
+ * JsonObject params = operation.getObject("op");
+ JsonObject theParams = operation.getObject("params");
+ operation.putObject("mpparams", theParams);
+ operation.putObject("mpres", results.getObject("opsRes"));
+ System.out.println("sendingevent to"+ params.getString("name"));
+ vertx.eventBus().send("multiprocessors." + params.getString("name"), operation, this);
+ */
+
+ /*
+ * IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Map params = (Map) op.getOp().get("params");
+ //Processor p = state.processors.get(processorName);
+ MultiProcessor p = state.multiProcessors.get(name);
+
+ List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
+ res.getOpsRes().put(i, mpResults);
+ }
+ */
+ @Override
+ public void handle(Message<JsonObject> event) {
+ System.out.println(event);
+ Integer id = event.body.getInteger("id");
+ Map params = event.body.getObject("mpparams").toMap();
+ System.out.println(params);
+ Map mpres = event.body.getObject("mpres").toMap();
+ System.out.println(mpres);
+ List<Map> results = p.multiProcess(mpres, params);
+ System.out.println(results);
+ JsonArray ja = new JsonArray();
+ for (Map result: results){
+ ja.addObject( new JsonObject(result));
+ }
+ event.reply(new JsonObject().putArray(id.toString(), ja));
+
+ }
+
+}
12 src/test/java/org/usergrid/vx/experimental/IntraServiceITest.java
View
@@ -479,13 +479,13 @@ public void multiProcessTest() throws Exception {
req.add( Operations.sliceOp("rowzz", "a", "z", 100));//8
req.add(Operations.sliceOp("rowyy", "a", "z", 100));//9
- req.add( Operations.createMultiProcess("union", "groovy",
- "public class Union implements org.usergrid.vx.experimental.MultiProcessor { \n" +
+ req.add( Operations.createMultiProcess("union", "groovyclassloader",
+ "public class Union implements org.usergrid.vx.experimental.multiprocessor.MultiProcessor { \n" +
" public List<Map> multiProcess(Map<Integer,Object> results, Map params){ \n" +
" java.util.HashMap s = new java.util.HashMap(); \n" +
" List<Integer> ids = (List<Integer>) params.get(\"steps\");\n" +
" for (Integer id: ids) { \n" +
- " List<Map> rows = results.get(id); \n" +
+ " List<Map> rows = results.get(id+\"\"); \n" +
" for (Map row: rows){ \n" +
" s.put(row.get(\"value\"),\"\"); \n" +
" } \n" +
@@ -502,13 +502,15 @@ public void multiProcessTest() throws Exception {
paramsMap.put("steps", steps);
req.add( Operations.multiProcess("union", paramsMap)); //11
- IntraRes res = new IntraRes();
- is.handleIntraReq(req, res, x);
+ IntraClient2 ic = new IntraClient2("localhost",8080);
+ IntraRes res = ic.sendBlocking(req);
+
List<Map> x = (List<Map>) res.getOpsRes().get(11);
Set<String> expectedResults = new HashSet<String>();
expectedResults.addAll( Arrays.asList(new String[] { "7", "8", "9"}));
+ System.out.println(res);
Assert.assertEquals(expectedResults, x.get(0).keySet());
}
8 src/test/java/org/usergrid/vx/experimental/Rando.java
View
@@ -0,0 +1,8 @@
+package org.usergrid.vx.experimental;
+
+public class Rando {
+ public static void main (String [] args){
+ java.util.Random r = new java.util.Random();
+ System.out.println ( r.nextInt(45)+1);
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.