Skip to content

Commit

Permalink
merged in TypeHelper changes
Browse files Browse the repository at this point in the history
  • Loading branch information
zznate committed Feb 21, 2013
2 parents 5c0f72c + 6d3d3e1 commit 7c00e1b
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 113 deletions.
85 changes: 26 additions & 59 deletions src/main/java/org/usergrid/vx/experimental/TypeHelper.java
Expand Up @@ -16,73 +16,40 @@
package org.usergrid.vx.experimental;

import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.List;

import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;

public class TypeHelper {
public static Object getTypedIfPossible(IntraState state, String type, ByteBuffer bb, IntraOp op){

IntraMetaData imd = new IntraMetaData(IntraService.determineKs(null ,op, state),IntraService.determineCf(null, op, state),type);
String s = state.meta.get(imd);
if (s == null){
return bb;
} else if (s.equals("UTF-8")){
try {
return ByteBufferUtil.string(bb);
} catch (Exception ex){ throw new RuntimeException(ex); }
} else if (s.equals("int32")) {
return ByteBufferUtil.toInt(bb);
} else if ( s.equals("long") ) {
return ByteBufferUtil.toLong(bb);
} else if (s.startsWith("CompositeType")){
int start = s.indexOf("(");
int end = s.indexOf(")");
String list = s.substring(start+1,end);
//System.out.println("list is" + list);

String [] parts = list.split(",");
Object [] results = new Object[parts.length] ;
//System.out.println("parts " + parts.length);
byte[] by = new byte[bb.remaining()];
bb.get(by);
List<byte[]> comp = CompositeTool.readComposite(by);
//System.out.println("results size "+results.length);
//System.out.println("comp size"+ comp.size());
for (int i=0;i<parts.length;i++){
results[i]= getTyped(parts[i], ByteBuffer.wrap(comp.get(i)) );
if (s == null) {
return bb;
}
return results;
} else {
throw new RuntimeException("Do not know what to do with "+s);
}
return compose(bb, s);
}
public static Object getTyped(String type, ByteBuffer bb){
if (type.equals("UTF-8")){
try {
return ByteBufferUtil.string(bb);
} catch (Exception ex){ throw new RuntimeException(ex); }
} else if (type.equals("int32")) {
return ByteBufferUtil.toInt(bb);
} else {
return bb;

public static Object getTyped(String type, ByteBuffer bb) {
return compose(bb, type);
}

public static Object getCqlTyped(String type, ByteBuffer bb) {
if (bb == null) {
return null;
}
return compose(bb, type);
}

private static Object compose(ByteBuffer bb, String s) {
try {
AbstractType<?> abstractType = TypeParser.parse(s);
return abstractType.compose(bb);
} catch (SyntaxException | ConfigurationException e) {
throw new RuntimeException("Failed to parse type [" + s + "]", e);
}
}
}

public static Object getCqlTyped(String type, ByteBuffer bb){
if (bb == null){
return null;
}
if (type.equals("UTF8Type")){
try {
return ByteBufferUtil.string(bb);
} catch (Exception ex){ throw new RuntimeException(ex); }
}
if (type.equals("Int32Type")){
return Int32Type.instance.compose(bb);
}
throw new RuntimeException("wahat is "+type +" ?" );
}
}
121 changes: 67 additions & 54 deletions src/test/java/org/usergrid/vx/experimental/IntraServiceITest.java
Expand Up @@ -28,12 +28,14 @@
import java.util.Map;
import java.util.Set;

import com.google.common.collect.ImmutableMap;

import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.marshal.AbstractCompositeType;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.ThriftClientState;
Expand Down Expand Up @@ -120,7 +122,7 @@ public void assumeTest() throws CharacterCodingException{
req.add( Operations.createCfOp("asscf")); //2
req.add( Operations.setColumnFamilyOp("asscf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("assks", "asscf", "value", "UTF-8"));//5
req.add( Operations.assumeOp("assks", "asscf", "value", "UTF8Type"));//5
req.add( Operations.setOp("rowa", "col1", "wow")); //6
req.add( Operations.getOp("rowa", "col1")); //7
IntraRes res = new IntraRes();
Expand All @@ -137,7 +139,7 @@ public void filterTest() throws CharacterCodingException {
req.add(Operations.createCfOp("filtercf")); // 2
req.add(Operations.setColumnFamilyOp("filtercf")); // 3
req.add(Operations.setAutotimestampOp()); // 4
req.add(Operations.assumeOp("filterks", "filtercf", "value", "UTF-8"));// 5
req.add(Operations.assumeOp("filterks", "filtercf", "value", "UTF8Type"));// 5
req.add(Operations.setOp("rowa", "col1", "20")); // 6
req.add(Operations.setOp("rowa", "col2", "22")); // 7
req.add(Operations
Expand All @@ -153,7 +155,7 @@ public void filterTest() throws CharacterCodingException {
Assert.assertEquals(1, results.size());

IntraReq req2 = new IntraReq();
req2.add(Operations.assumeOp("filterks", "filtercf", "value", "UTF-8"));// 0
req2.add(Operations.assumeOp("filterks", "filtercf", "value", "UTF8Type"));// 0
req2.add(Operations.filterModeOp("over21", true)); // 1
req2.add( Operations.sliceOp("rowa", "col1", "col3", 10) //2
.set("keyspace", "filterks")
Expand All @@ -177,7 +179,7 @@ public void executeJavaScriptFilter() throws CharacterCodingException {
req.add(Operations.createCfOp("filtercf")); //2
req.add(Operations.setColumnFamilyOp("filtercf")); //3
req.add(Operations.setAutotimestampOp()); //4
req.add(Operations.assumeOp("jsFilterks", "filtercf", "value", "UTF-8"));//5
req.add(Operations.assumeOp("jsFilterks", "filtercf", "value", "UTF8Type"));//5
req.add(Operations.setOp("rowa", "col1", "20")); //6
req.add(Operations.setOp("rowa", "col2", "22")); //7
req.add(Operations.createFilterOp("over21", "javascript",
Expand All @@ -201,7 +203,7 @@ public void processorTest() throws CharacterCodingException{
req.add( Operations.createCfOp("proccf")); //2
req.add( Operations.setColumnFamilyOp("proccf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("procks", "proccf", "value", "UTF-8"));//5
req.add( Operations.assumeOp("procks", "proccf", "value", "UTF8Type"));//5
req.add( Operations.setOp("rowa", "col1", "wow")); //6
req.add( Operations.getOp("rowa", "col1")); //7
req.add( Operations.createProcessorOp("capitalize", "groovy",
Expand Down Expand Up @@ -238,8 +240,8 @@ public void intTest() throws CharacterCodingException{
req.add( Operations.createCfOp("intcf")); //2
req.add( Operations.setColumnFamilyOp("intcf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("intks", "intcf", "value", "UTF-8"));//5
req.add( Operations.assumeOp("intks", "intcf", "column", "int32"));//6
req.add( Operations.assumeOp("intks", "intcf", "value", "UTF8Type"));//5
req.add( Operations.assumeOp("intks", "intcf", "column", "Int32Type"));//6
req.add( Operations.setOp("rowa", 1, "wow")); //7
req.add( Operations.getOp("rowa", 1)); //8

Expand All @@ -258,8 +260,8 @@ public void ttlTest () {
req.add( Operations.createCfOp("ttlcf")); //2
req.add( Operations.setColumnFamilyOp("ttlcf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("ttlks", "ttlcf", "value", "UTF-8"));//5
req.add( Operations.assumeOp("ttlks", "ttlcf", "column", "int32"));//6
req.add( Operations.assumeOp("ttlks", "ttlcf", "value", "UTF8Type"));//5
req.add( Operations.assumeOp("ttlks", "ttlcf", "column", "Int32Type"));//6
req.add( Operations.setOp("rowa", 1, "wow")); //7
req.add( Operations.setOp("rowa", 2, "wow").set("ttl", 1)); //8
//req.add( Operations.sliceOp("rowa", 1, 5, 4) ); //9
Expand All @@ -275,8 +277,8 @@ public void ttlTest () {
IntraReq r = new IntraReq();
r.add( Operations.setKeyspaceOp("ttlks") ); //0
r.add( Operations.setColumnFamilyOp("ttlcf") ); //1
r.add( Operations.assumeOp("ttlks", "ttlcf", "value", "UTF-8"));//2
r.add( Operations.assumeOp("ttlks", "ttlcf", "column", "int32"));//3
r.add( Operations.assumeOp("ttlks", "ttlcf", "value", "UTF8Type"));//2
r.add( Operations.assumeOp("ttlks", "ttlcf", "column", "Int32Type"));//3
r.add( Operations.sliceOp("rowa", 1, 5, 4) ); //4
IntraRes rs = new IntraRes();

Expand All @@ -287,27 +289,38 @@ public void ttlTest () {
Assert.assertEquals(1, x.size());

}

@Test
public void compositeTest() throws CharacterCodingException{
IntraReq req = new IntraReq();
req.add( Operations.setKeyspaceOp("compks") ); //0
req.add( Operations.createKsOp("compks", 1)); //1
req.add( Operations.createCfOp("compcf")); //2
req.add( Operations.setColumnFamilyOp("compcf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("compks", "compcf", "value", "CompositeType(UTF-8,int32)"));//5
req.add( Operations.assumeOp("compks", "compcf", "column", "int32"));//6
req.add( Operations.setOp("rowa", 1, new Object[] {"yo",0, 2,0})); //7
req.add( Operations.getOp("rowa", 1)); //8

IntraRes res = new IntraRes();
is.handleIntraReq(req, res, x);
List<Map> x = (List<Map>) res.getOpsRes().get(8);
Assert.assertEquals( 1, x.get(0).get("name") );
Assert.assertEquals( "yo", ((Object [])x.get(0).get("value"))[0] );
Assert.assertEquals( 2, ((Object [])x.get(0).get("value"))[1] );
}

@Test
public void compositeTest() throws CharacterCodingException {
IntraReq req = new IntraReq();
req.add(Operations.setKeyspaceOp("compks")); //0
req.add(Operations.createKsOp("compks", 1)); //1
req.add(Operations.createCfOp("compcf")); //2
req.add(Operations.setColumnFamilyOp("compcf")); //3
req.add(Operations.setAutotimestampOp()); //4
req.add(Operations.assumeOp("compks", "compcf", "value", "CompositeType(UTF8Type,Int32Type)"));//5
req.add(Operations.assumeOp("compks", "compcf", "column", "Int32Type"));//6
req.add(Operations.setOp("rowa", 1, new Object[]{"yo", 0, 2, 0})); //7
req.add(Operations.getOp("rowa", 1)); //8

IntraRes res = new IntraRes();
is.handleIntraReq(req, res, x);
List<Map> x = (List<Map>) res.getOpsRes().get(8);
Assert.assertEquals(1, x.get(0).get("name"));

ByteBuffer bytes = (ByteBuffer) x.get(0).get("value");
List<AbstractType<?>> comparators = new ArrayList<>();
comparators.add(UTF8Type.instance);
comparators.add(Int32Type.instance);
CompositeType comparator = CompositeType.getInstance(comparators);

List<AbstractCompositeType.CompositeComponent> components = comparator.deconstruct(bytes);
AbstractCompositeType.CompositeComponent c1 = components.get(0);
AbstractCompositeType.CompositeComponent c2 = components.get(1);

Assert.assertEquals("yo", c1.comparator.compose(c1.value));
Assert.assertEquals(2, c2.comparator.compose(c2.value));
}


@Test
Expand All @@ -318,8 +331,8 @@ public void CqlTest() throws CharacterCodingException{
req.add( Operations.createCfOp("cqlcf")); //2
req.add( Operations.setColumnFamilyOp("cqlcf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("cqlks", "cqlcf", "value", "int32"));//5
req.add( Operations.assumeOp("cqlks", "cqlcf", "column", "int32"));//6
req.add( Operations.assumeOp("cqlks", "cqlcf", "value", "Int32Type"));//5
req.add( Operations.assumeOp("cqlks", "cqlcf", "column", "Int32Type"));//6
req.add( Operations.setOp("rowa", 1, 2)); //7
req.add( Operations.getOp("rowa", 1)); //8
req.add( Operations.cqlQuery("select * from cqlcf", "3.0.0"));//9
Expand Down Expand Up @@ -358,7 +371,7 @@ public void clearTest() throws CharacterCodingException{
req.add( Operations.createCfOp("clearcf")); //2
req.add( Operations.setColumnFamilyOp("clearcf") ); //3
req.add( Operations.setAutotimestampOp() ); //4
req.add( Operations.assumeOp("clearks", "clearcf", "value", "UTF-8")); //5
req.add( Operations.assumeOp("clearks", "clearcf", "value", "UTF8Type")); //5
req.add( Operations.setOp("rowa", 1, "wow")); //6
req.add( Operations.getOp("rowa", 1)); //7
req.add( Operations.getOp("rowa", 1)); //8
Expand Down Expand Up @@ -407,7 +420,7 @@ public void multiProcessTest() throws Exception {
req.add( Operations.setKeyspaceOp("myks") ); //0
req.add( Operations.setColumnFamilyOp("mycf") ); //1
req.add(Operations.setAutotimestampOp()); //2
req.add(Operations.assumeOp("myks", "mycf", "value", "UTF-8")); //3
req.add(Operations.assumeOp("myks", "mycf", "value", "UTF8Type")); //3
req.add(Operations.setOp("rowzz", "col1", "7")); //4
req.add(Operations.setOp("rowzz", "col2", "8")); //5
req.add( Operations.setOp("rowyy", "col4", "9")); //6
Expand Down Expand Up @@ -458,8 +471,8 @@ public void batchSetTest() throws Exception {
req.add( Operations.setKeyspaceOp("myks") ); //0
req.add( Operations.setColumnFamilyOp("mycf") ); //1
req.add( Operations.setAutotimestampOp() ); //2
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF-8")); //3
req.add( Operations.assumeOp("myks", "mycf", "column", "UTF-8")); //4
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF8Type")); //3
req.add( Operations.assumeOp("myks", "mycf", "column", "UTF8Type")); //4
Map row1 = new HashMap();
row1.put("rowkey", "batchkeya");
row1.put("name", "col1");
Expand Down Expand Up @@ -521,8 +534,8 @@ public void jsonTest() throws Exception {
req.add( Operations.setKeyspaceOp("myks") ); //0
req.add( Operations.setColumnFamilyOp("mycf") ); //1
req.add( Operations.setAutotimestampOp() ); //2
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF-8")); //3
req.add( Operations.assumeOp("myks", "mycf", "column", "UTF-8")); //4
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF8Type")); //3
req.add( Operations.assumeOp("myks", "mycf", "column", "UTF8Type")); //4
Map row1 = new HashMap();
row1.put("rowkey", "jsonkey");
row1.put("name", "data");
Expand Down Expand Up @@ -573,7 +586,7 @@ public void saveStateTest() throws Exception {
r.add( Operations.setColumnFamilyOp("mycf")); //1
r.add( Operations.setAutotimestampOp() ); //2
r.add( Operations.setOp("a", "b", "c") ); //3
r.add( Operations.assumeOp("myks", "mycf", "value", "UTF-8") );//4
r.add( Operations.assumeOp("myks", "mycf", "value", "UTF8Type") );//4
r.add( Operations.saveState() );//5

IntraRes res = new IntraRes();
Expand Down Expand Up @@ -632,8 +645,8 @@ public void optioanKSandCSTest() {
IntraReq req = new IntraReq();

req.add( Operations.setAutotimestampOp() ); //0
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF-8"));//1
req.add( Operations.assumeOp("myks", "mycf", "column", "int32"));//2
req.add( Operations.assumeOp("myks", "mycf", "value", "UTF8Type"));//1
req.add( Operations.assumeOp("myks", "mycf", "column", "Int32Type"));//2
IntraOp setOp = Operations.setOp("optional", 1, "wow"); //3
setOp.set("keyspace", "myks");
setOp.set("columnfamily", "mycf");
Expand All @@ -656,8 +669,8 @@ public void optioanKSandCSTest() {
public void componentTest() {
IntraReq req = new IntraReq();
req.add(Operations.setAutotimestampOp()); // 0
req.add(Operations.assumeOp("myks", "mycf", "value", "UTF-8"));// 1
req.add(Operations.assumeOp("myks", "mycf", "column", "int32"));// 2
req.add(Operations.assumeOp("myks", "mycf", "value", "UTF8Type"));// 1
req.add(Operations.assumeOp("myks", "mycf", "column", "Int32Type"));// 2
IntraOp setOp = Operations.setOp("optional", 1, "wow"); // 3
setOp.set("keyspace", "myks");
setOp.set("columnfamily", "mycf");
Expand Down Expand Up @@ -721,7 +734,7 @@ public void timeoutOpTest() throws CharacterCodingException {
req.add(Operations.createCfOp("timeoutcf")); // 2
req.add(Operations.setColumnFamilyOp("timeoutcf")); // 3
req.add(Operations.setAutotimestampOp()); // 4
req.add(Operations.assumeOp("timeoutks", "timeoutcf", "value", "UTF-8"));// 5
req.add(Operations.assumeOp("timeoutks", "timeoutcf", "value", "UTF8Type"));// 5
req.add(Operations.setOp("rowa", "col1", "20")); // 6
req.add(Operations.setOp("rowa", "col2", "22")); // 7
req.add(Operations.createFilterOp("ALongOne", "groovy", "{ row -> Thread.sleep(5000) }")); // 8
Expand Down Expand Up @@ -760,8 +773,8 @@ public void batchAcrossKeyspaces() throws CharacterCodingException {
.add( Operations.setKeyspaceOp("ks2"))
.add( Operations.createCfOp("cf2"))
.add( Operations.batchSetOp(batch).set("timeout", 1000000))
.add( Operations.assumeOp("ks1", "cf1", "value", "UTF-8"))
.add( Operations.assumeOp("ks2", "cf2", "value", "UTF-8"))
.add( Operations.assumeOp("ks1", "cf1", "value", "UTF8Type"))
.add( Operations.assumeOp("ks2", "cf2", "value", "UTF8Type"))
.add( Operations.getOp("mykey", "mycol")
.set("keyspace", "ks1")
.set("columnfamily", "cf1"));
Expand Down Expand Up @@ -818,8 +831,8 @@ public void scannerTest() throws Exception {
+ "import org.usergrid.vx.experimental.scan.* \n"
+ "public class MyScanner extends PeopleFromNY { \n"
+ " public MyScanner() { super(); } \n" + "} \n"))
.add(Operations.assumeOp("myks", "mycf", "value", "UTF-8"))
.add(Operations.assumeOp("myks", "mycf", "column", "UTF-8"))
.add(Operations.assumeOp("myks", "mycf", "value", "UTF8Type"))
.add(Operations.assumeOp("myks", "mycf", "column", "UTF8Type"))
.add(Operations.setKeyspaceOp("myks"))
.add(Operations.setColumnFamilyOp("mycf"))
.add(Operations.setOp("scannerrow", "ed", "NY")) // 3
Expand Down Expand Up @@ -849,8 +862,8 @@ public void scannerTest() throws Exception {
public void sliceNamesTest() throws Exception {
IntraReq req = new IntraReq();
req.add(
Operations.assumeOp("myks", "mycf", "value", "UTF-8"))
.add(Operations.assumeOp("myks", "mycf", "column", "UTF-8"))
Operations.assumeOp("myks", "mycf", "value", "UTF8Type"))
.add(Operations.assumeOp("myks", "mycf", "column", "UTF8Type"))
.add(Operations.setKeyspaceOp("myks"))
.add(Operations.setColumnFamilyOp("mycf"))
.add(Operations.setOp("slicename", "ed", "NY")) // 4
Expand Down

0 comments on commit 7c00e1b

Please sign in to comment.