Skip to content

Commit

Permalink
Added filtering, more awesome sauce.
Browse files Browse the repository at this point in the history
  • Loading branch information
edwardcapriolo committed Dec 30, 2012
1 parent c32c050 commit 828e636
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 20 deletions.
7 changes: 7 additions & 0 deletions src/main/java/org/usergrid/vx/experimental/Filter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.usergrid.vx.experimental;

import java.util.Map;

public interface Filter {
public Map filter(Map row);
}
22 changes: 20 additions & 2 deletions src/main/java/org/usergrid/vx/experimental/IntraOp.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,22 @@ public static IntraOp dropKeyspaceOp(String ksname){
i.set("keyspace", ksname);
return i;
}


public static IntraOp createFilterOp(String name, String spec, String value) {
IntraOp i = new IntraOp(Type.CREATEFILTER);
i.set("name", name);
i.set("spec", spec);
i.set("value", value);
return i;
}

public static IntraOp filterModeOp(String name, boolean on) {
IntraOp i = new IntraOp(Type.FILTERMODE);
i.set("name", name);
i.set("on", on);
return i;
}

public Type getType() {
return type;
}
Expand Down Expand Up @@ -204,7 +219,10 @@ public enum Type {
ASSUME,
CREATEPROCESSOR,
PROCESS,
DROPKEYSPACE
DROPKEYSPACE,
CREATEFILTER,
FILTERMODE;

}

}
75 changes: 59 additions & 16 deletions src/main/java/org/usergrid/vx/experimental/IntraService.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
createProcessor(req,res,state,i,vertx);
} else if (op.getType().equals(IntraOp.Type.PROCESS)){
process(req,res,state,i,vertx);
} else if (op.getType().equals(IntraOp.Type.CREATEFILTER)){
createFilter(req,res,state,i,vertx);
} else if (op.getType().equals(IntraOp.Type.FILTERMODE)){
filterMode(req,res,state,i,vertx);
}
} catch (Exception ex){
res.setExceptionAndId(ex,i);
Expand Down Expand Up @@ -269,14 +273,7 @@ private void slice(IntraReq req, IntraRes res, IntraState state,int i){
ColumnFamily cf = results.get(0).cf;
if (cf == null){ //cf= null is no data
} else {
Iterator <IColumn> it = cf.iterator();
while (it.hasNext()){
IColumn ic = it.next();
HashMap m = new HashMap();
m.put("name", ic.name());
m.put("value", ic.value());
finalResults.add(m);
}
readCf(cf, finalResults,state);
}
res.getOpsRes().put(i,finalResults);
} catch (ReadTimeoutException e) {
Expand Down Expand Up @@ -331,14 +328,7 @@ private void get(IntraReq req, IntraRes res, IntraState state, int i) {
ColumnFamily cf = rows.get(0).cf;
if (cf == null) { // cf= null is no data
} else {
Iterator<IColumn> it = cf.iterator();
while (it.hasNext()) {
IColumn ic = it.next();
HashMap m = new HashMap();
m.put("name", TypeHelper.getTypedIfPossible(state, "column", ic.name()));
m.put("value", TypeHelper.getTypedIfPossible(state, "value", ic.value()));
finalResults.add(m);
}
readCf(cf, finalResults,state);
}
res.getOpsRes().put(i, finalResults);
} catch (ReadTimeoutException e) {
Expand All @@ -357,6 +347,24 @@ private void get(IntraReq req, IntraRes res, IntraState state, int i) {

}

private void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
Iterator<IColumn> it = cf.iterator();
while (it.hasNext()) {
IColumn ic = it.next();
HashMap m = new HashMap();
m.put("name", TypeHelper.getTypedIfPossible(state, "column", ic.name()));
m.put("value", TypeHelper.getTypedIfPossible(state, "value", ic.value()));
if (state.currentFilter != null){
Map newMap = state.currentFilter.filter(m);
if (newMap != null){
finalResults.add(newMap);
}
} else {
finalResults.add(m);
}
}
}

private void assume(IntraReq req, IntraRes res, IntraState state, int i) {
IntraOp op = req.getE().get(i);
IntraMetaData imd = new IntraMetaData();
Expand Down Expand Up @@ -385,6 +393,41 @@ private void createProcessor(IntraReq req, IntraRes res, IntraState state, int i
IntraState.processors.put(name,p);
}

private void createFilter(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
IntraOp op = req.getE().get(i);
String name = (String) op.getOp().get("name");
GroovyClassLoader gc = new GroovyClassLoader();
Class c = gc.parseClass((String) op.getOp().get("value") );
Filter f = null;
try {
f = (Filter) c.newInstance();
} catch (InstantiationException e) {
res.setExceptionAndId(e, i);
return;
} catch (IllegalAccessException e) {
res.setExceptionAndId(e, i);
return;
}
IntraState.filters.put(name, f);
}

private void filterMode(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
IntraOp op = req.getE().get(i);
String name = (String) op.getOp().get("name");
Boolean on = (Boolean) op.getOp().get("on");
if (on){
Filter f = state.filters.get(name);
if (f == null){
res.setExceptionAndId("filter "+name +" not found", i);
return;
} else {
state.currentFilter=f;
}
} else {
state.currentFilter = null;
}
}

private void process(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
IntraOp op = req.getE().get(i);
String processorName = (String) op.getOp().get("processorname");
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/usergrid/vx/experimental/IntraState.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

/* class that holds properties for the request lifecycle */
public class IntraState {

String currentKeyspace="";
String currentColumnFamily="";
boolean autoTimestamp= true;
Expand All @@ -20,4 +19,6 @@ public class IntraState {
Map<IntraMetaData,String> meta = new HashMap<IntraMetaData,String>();
//TODO separate per/request state from application/session state
static Map<String,Processor> processors = new HashMap<String,Processor>();
static Map<String,Filter> filters = new HashMap<String,Filter>();
Filter currentFilter;
}
60 changes: 59 additions & 1 deletion src/main/java/org/usergrid/vx/experimental/PROCESSING.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Terminology

* CREATEPROCESSOR: Creates and loads a processor
* PROCESS: Instructs a processor to operate on the result of an operation
* CREATEFILTER: Creates and loads a filter
* FILTERMODE: Enables or disables filters

Example
Processor example
----
GET and SLICE verbs both return List<Map>. A user wishes to modify the results on the server side before they are
returned to the client. In the case the value of the columns are String and the user wishes to upper-case those values.
Expand Down Expand Up @@ -70,3 +72,59 @@ The value of the get from step 7 was "wow".
The result of step 9 has converted the value to upper case

Assert.assertEquals( "WOW", x.get(0).get("value") );

Filter example
----
Filtering using a Filter operates on rows as they are being read. This has an advantage over processors because it saves memory request and keeps the request size low. For example, imagine a user wants to take the results of a slice and limit the rows in the slice. A filter can do this as the rows are being read.

API
--------
Filter takes as input a Map that represents the column and produces a map or null. Returning null eliminates the column from the list.

package org.usergrid.vx.experimental;

import java.util.Map;

public interface Filter {
public Map filter(Map row);
}


IntraReq req = new IntraReq();
req.add( IntraOp.setKeyspaceOp("filterks") ); //0
eq.add( IntraOp.createKsOp("filterks", 1)); //1
req.add( IntraOp.createCfOp("filtercf")); //2
req.add( IntraOp.setColumnFamilyOp("filtercf") ); //3
req.add( IntraOp.setAutotimestampOp() ); //4
req.add( IntraOp.assumeOp("filterks", "filtercf", "value", "UTF-8"));//5

Insert two columns, one with a value of 20 and one with a value greater then 20

req.add( IntraOp.setOp("rowa", "col1", "20")); //6
req.add( IntraOp.setOp("rowa", "col2", "22")); //7

We will create a filter that returns rows that only have a column value greater then 21.

req.add( IntraOp.createFilterOp("over21", "groovy",
"public class Over21 implements org.usergrid.vx.experimental.Filter { \n"+
" public Map filter(Map row){ \n" +
" if (Integer.parseInt( row.get(\"value\") ) >21){ \n"+
" return row; \n" +
" } else { return null; } \n" +
" } \n" +
"} \n"
)); //8

Enabled the filter. The results from any get or slice requests are automatically filtered.

req.add( IntraOp.filterModeOp("over21", true)); //9
req.add( IntraOp.sliceOp("rowa", "col1", "col3", 10)); //10
IntraRes res = new IntraRes();
is.handleIntraReq(req, res, x);

Notice although the slice should have returned two columns the filter removed one of them.

List<Map> results = (List<Map>) res.getOpsRes().get(10);
Assert.assertEquals( "22", results.get(0).get("value") );
Assert.assertEquals(1, results.size());

30 changes: 30 additions & 0 deletions src/test/java/org/usergrid/vx/experimental/IntraServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,36 @@ public void assumeTest() throws CharacterCodingException{
Assert.assertEquals( "wow", x.get(0).get("value") );
}

@Test
public void filterTest() throws CharacterCodingException{
IntraReq req = new IntraReq();
req.add( IntraOp.setKeyspaceOp("filterks") ); //0
req.add( IntraOp.createKsOp("filterks", 1)); //1
req.add( IntraOp.createCfOp("filtercf")); //2
req.add( IntraOp.setColumnFamilyOp("filtercf") ); //3
req.add( IntraOp.setAutotimestampOp() ); //4
req.add( IntraOp.assumeOp("filterks", "filtercf", "value", "UTF-8"));//5
req.add( IntraOp.setOp("rowa", "col1", "20")); //6
req.add( IntraOp.setOp("rowa", "col2", "22")); //7
req.add( IntraOp.createFilterOp("over21", "groovy",
"public class Over21 implements org.usergrid.vx.experimental.Filter { \n"+
" public Map filter(Map row){ \n" +
" if (Integer.parseInt( row.get(\"value\") ) >21){ \n"+
" return row; \n" +
" } else { return null; } \n" +
" } \n" +
"} \n"
)); //8
req.add( IntraOp.filterModeOp("over21", true)); //9
req.add( IntraOp.sliceOp("rowa", "col1", "col3", 10)); //10
IntraRes res = new IntraRes();
is.handleIntraReq(req, res, x);
System.out.println ( res.getException() );
List<Map> results = (List<Map>) res.getOpsRes().get(10);
Assert.assertEquals( "22", results.get(0).get("value") );
Assert.assertEquals(1, results.size());

}

@Test
public void processorTest() throws CharacterCodingException{
Expand Down

0 comments on commit 828e636

Please sign in to comment.