Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

MultiProcessor things like union, intersection, etc. Now can all be plug... #40

Merged
merged 1 commit into from

2 participants

@edwardcapriolo
Collaborator

Here it is. We can now do some of the redis ops like server side union/intersections etc.

@zznate
Owner

fuuuuuuuuuu......

@zznate zznate merged commit 2576b1b into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
19 src/main/java/org/usergrid/vx/experimental/IntraOp.java
@@ -205,6 +205,21 @@ public static IntraOp clear(int resultId){
return i;
}
+ public static IntraOp createMultiProcess(String name, String spec, String value){
+ IntraOp i = new IntraOp(Type.CREATEMULTIPROCESS);
+ i.set("name", name);
+ i.set("spec", spec);
+ i.set("value", value);
+ return i;
+ }
+
+ public static IntraOp multiProcess(String processorName, Map params){
+ IntraOp i = new IntraOp(Type.MULTIPROCESS);
+ i.set("name", processorName);
+ i.set("params", params);
+ return i;
+ }
+
public Type getType() {
return type;
}
@@ -238,7 +253,9 @@ private static void checkForBlankStr(String arg, String msg, Type type) {
CREATEFILTER,
FILTERMODE,
CQLQUERY,
- CLEAR
+ CLEAR,
+ CREATEMULTIPROCESS,
+ MULTIPROCESS
}
View
37 src/main/java/org/usergrid/vx/experimental/IntraService.java
@@ -63,6 +63,7 @@
import org.vertx.java.core.json.JsonObject;
import com.hazelcast.core.Message;
+import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Collections;
public class IntraService {
@@ -122,6 +123,10 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
cqlQuery(req,res,state,i,vertx);
} else if (op.getType().equals(IntraOp.Type.CLEAR)){
clear(req,res,state,i,vertx);
+ } else if (op.getType().equals(IntraOp.Type.CREATEMULTIPROCESS)){
+ createMultiProcess(req,res,state,i,vertx);
+ } else if (op.getType().equals(IntraOp.Type.MULTIPROCESS)){
+ multiProcess(req,res,state,i,vertx);
}
} catch (Exception ex){
res.setExceptionAndId(ex,i);
@@ -131,7 +136,19 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
return true;
}
- private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
+ private void multiProcess(IntraReq req, IntraRes res, IntraState state,
+ int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Map params = (Map) op.getOp().get("params");
+ //Processor p = state.processors.get(processorName);
+ MultiProcessor p = state.multiProcessors.get(name);
+
+ List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
+ res.getOpsRes().put(i, mpResults);
+
+ }
+ private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
if (o instanceof Object[]){
return o;
} else if (o instanceof Integer){
@@ -522,4 +539,22 @@ private void clear(IntraReq req, IntraRes res, IntraState state, int i,Vertx ver
res.getOpsRes().put(id, new ArrayList<HashMap>());
}
+
+ private void createMultiProcess(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ MultiProcessor p = null;
+ try {
+ p = (MultiProcessor) c.newInstance();
+ } catch (InstantiationException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ } catch (IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.multiProcessors.put(name, p);
+ }
}
View
1  src/main/java/org/usergrid/vx/experimental/IntraState.java
@@ -20,5 +20,6 @@
//TODO separate per/request state from application/session state
static Map<String,Processor> processors = new HashMap<String,Processor>();
static Map<String,Filter> filters = new HashMap<String,Filter>();
+ static Map<String,MultiProcessor> multiProcessors = new HashMap<String,MultiProcessor>();
Filter currentFilter;
}
View
8 src/main/java/org/usergrid/vx/experimental/MultiProcessor.java
@@ -0,0 +1,8 @@
+package org.usergrid.vx.experimental;
+
+import java.util.List;
+import java.util.Map;
+
+public interface MultiProcessor {
+ public List<Map> multiProcess(Map<Integer,Object> results, Map params);
+}
View
84 src/main/java/org/usergrid/vx/experimental/PROCESSING.md
@@ -8,6 +8,8 @@ Terminology
* PROCESS: Instructs a processor to operate on the result of an operation
* CREATEFILTER: Creates and loads a filter
* FILTERMODE: Enables or disables filters
+* CREATEMULTIPROCESSOR: Creates and loads a multi-processor
+* MULTIPROCESS: run a multiprocessor
Processor example
----
@@ -128,3 +130,85 @@ Notice although the slice should have returned two columns the filter removed on
Assert.assertEquals( "22", results.get(0).get("value") );
Assert.assertEquals(1, results.size());
+MultiProcessor example
+----
+Processors can operate on the result of one operation. When a user may wishes to combine
+the results of two or more operations a MultiProcessor is used.
+
+API
+----
+
+A MutliProcess has all the results of all the executed steps (res.getOptRes()). Params
+are supplied from the user in the request.
+
+ package org.usergrid.vx.experimental;
+
+ import java.util.List;
+ import java.util.Map;
+
+ public interface MultiProcessor {
+ public List<Map> multiProcess(Map<Integer,Object> results, Map params);
+ }
+
+Example
+----
+
+The user wishes to run two slice operations and do a server side union of the results.
+
+ @Test
+ @RequiresColumnFamily(ksName = "myks", cfName = "mycf")
+ public void multiProcessTest() throws Exception {
+ IntraReq req = new IntraReq();
+ req.add( IntraOp.setKeyspaceOp("myks") ); //0
+ req.add( IntraOp.setColumnFamilyOp("mycf") ); //1
+ req.add( IntraOp.setAutotimestampOp() ); //2
+ req.add( IntraOp.assumeOp("myks", "mycf", "value", "UTF-8")); //3
+ req.add( IntraOp.setOp("rowzz", "col1", "7")); //4
+ req.add( IntraOp.setOp("rowzz", "col2", "8")); //5
+ req.add( IntraOp.setOp("rowyy", "col4", "9")); //6
+ req.add( IntraOp.setOp("rowyy", "col2", "7")); //7
+
+ Perform two slice operations.
+
+ req.add( IntraOp.sliceOp("rowzz", "a", "z", 100));//8
+ req.add( IntraOp.sliceOp("rowyy", "a", "z", 100));//9
+
+ Create the union MultiProcess.
+
+ req.add( IntraOp.createMultiProcess("union", "groovy",
+ "public class Union implements org.usergrid.vx.experimental.MultiProcessor { \n"+
+ " public List<Map> multiProcess(Map<Integer,Object> results, Map params){ \n"+
+ " java.util.HashMap s = new java.util.HashMap(); \n"+
+ " List<Integer> ids = (List<Integer>) params.get(\"steps\");\n"+
+ " for (Integer id: ids) { \n"+
+ " List<Map> rows = results.get(id); \n"+
+ " for (Map row: rows){ \n"+
+ " s.put(row.get(\"value\"),\"\"); \n"+
+ " } \n"+
+ " } \n"+
+ " List<HashMap> ret = new ArrayList<HashMap>(); \n"+
+ " ret.add(s) \n"+
+ " return ret; \n" +
+ " } \n"+
+ "} \n" )); //10
+
+Create the parameters that will tell the MultiProcess what result ids to include.
+
+ Map paramsMap = new HashMap();
+ List<Integer> steps = new ArrayList<Integer>();
+ steps.add(8);
+ steps.add(9);
+ paramsMap.put("steps", steps);
+ req.add( IntraOp.multiProcess("union", paramsMap)); //11
+
+ IntraRes res = new IntraRes();
+ is.handleIntraReq(req, res, x);
+
+The results contain the results after being send to the processor.
+
+ List<Map> x = (List<Map>) res.getOpsRes().get(11);
+ Set<String> expectedResults = new HashSet<String>();
+ expectedResults.addAll( Arrays.asList(new String[] { "7", "8", "9"}));
+ Assert.assertEquals(expectedResults, x.get(0).keySet());
+
+ }
View
52 src/test/java/org/usergrid/vx/experimental/IntraServiceTest.java
@@ -304,4 +304,56 @@ public void cqlEngineTest() throws Exception {
String val2 = ByteBufferUtil.string(cols.get(2).bufferForValue());
assertNotNull(rm);
}
+
+
+ @Test
+ @RequiresColumnFamily(ksName = "myks", cfName = "mycf")
+ public void multiProcessTest() throws Exception {
+ IntraReq req = new IntraReq();
+ req.add( IntraOp.setKeyspaceOp("myks") ); //0
+ req.add( IntraOp.setColumnFamilyOp("mycf") ); //1
+ req.add( IntraOp.setAutotimestampOp() ); //2
+ req.add( IntraOp.assumeOp("myks", "mycf", "value", "UTF-8")); //3
+ req.add( IntraOp.setOp("rowzz", "col1", "7")); //4
+ req.add( IntraOp.setOp("rowzz", "col2", "8")); //5
+ req.add( IntraOp.setOp("rowyy", "col4", "9")); //6
+ req.add( IntraOp.setOp("rowyy", "col2", "7")); //7
+ req.add( IntraOp.sliceOp("rowzz", "a", "z", 100));//8
+ req.add( IntraOp.sliceOp("rowyy", "a", "z", 100));//9
+
+ req.add( IntraOp.createMultiProcess("union", "groovy",
+ "public class Union implements org.usergrid.vx.experimental.MultiProcessor { \n"+
+ " public List<Map> multiProcess(Map<Integer,Object> results, Map params){ \n"+
+ " java.util.HashMap s = new java.util.HashMap(); \n"+
+ " List<Integer> ids = (List<Integer>) params.get(\"steps\");\n"+
+ " for (Integer id: ids) { \n"+
+ " List<Map> rows = results.get(id); \n"+
+ " for (Map row: rows){ \n"+
+ " s.put(row.get(\"value\"),\"\"); \n"+
+ " } \n"+
+ " } \n"+
+ " List<HashMap> ret = new ArrayList<HashMap>(); \n"+
+ " ret.add(s) \n"+
+ " return ret; \n" +
+ " } \n"+
+ "} \n" )); //10
+ Map paramsMap = new HashMap();
+ List<Integer> steps = new ArrayList<Integer>();
+ steps.add(8);
+ steps.add(9);
+ paramsMap.put("steps", steps);
+ req.add( IntraOp.multiProcess("union", paramsMap)); //11
+
+ IntraRes res = new IntraRes();
+ is.handleIntraReq(req, res, x);
+
+ List<Map> x = (List<Map>) res.getOpsRes().get(11);
+
+ Set<String> expectedResults = new HashSet<String>();
+ expectedResults.addAll( Arrays.asList(new String[] { "7", "8", "9"}));
+ Assert.assertEquals(expectedResults, x.get(0).keySet());
+
+ }
+
+
}
Something went wrong with that request. Please try again.