Permalink
Browse files

Merge pull request #46 from zznate/cleanup-ops

moved service execution logic into enum for implicity execution w/o look...
  • Loading branch information...
2 parents 56d7533 + 06c1059 commit 9c4f70695ed41d23b560392708a228c6daefe58e @edwardcapriolo edwardcapriolo committed Jan 3, 2013
View
419 src/main/java/org/usergrid/vx/experimental/IntraOp.java
@@ -1,11 +1,30 @@
package org.usergrid.vx.experimental;
import com.google.common.base.Preconditions;
+import groovy.lang.GroovyClassLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.vertx.java.core.Vertx;
+import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import java.util.TreeMap;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class IntraOp implements Serializable{
@@ -232,30 +251,378 @@ private static void checkForBlankStr(String arg, String msg, Type type) {
public enum Type {
- LISTCOLUMNFAMILY,
- LISTKEYSPACES,
- CONSISTENCY,
- CREATEKEYSPACE,
- CREATECOLUMNFAMILY,
- FOREACH,
- COLUMNPREDICATE,
- SLICE,
- GETREF,
- GET,
- SET,
- AUTOTIMESTAMP,
- SETKEYSPACE,
- SETCOLUMNFAMILY,
- ASSUME,
- CREATEPROCESSOR,
- PROCESS,
- DROPKEYSPACE,
- CREATEFILTER,
- FILTERMODE,
- CQLQUERY,
- CLEAR,
- CREATEMULTIPROCESS,
- MULTIPROCESS
+ LISTCOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String keyspace = (String) op.getOp().get("keyspace");
+ KSMetaData ks = Schema.instance.getKSMetaData(keyspace);
+ res.getOpsRes().put(i, ks.cfMetaData().keySet());
+ }
+ },
+ LISTKEYSPACES {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ res.getOpsRes().put(i, Schema.instance.getNonSystemTables());
+ }
+ },
+ CONSISTENCY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ ConsistencyLevel level = ConsistencyLevel.valueOf((String) op.getOp().get("level"));
+ res.getOpsRes().put(i, "OK");
+ state.consistency = level;
+ }
+ },
+ CREATEKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(0);
+ IntraOp op = req.getE().get(i);
+ String ks = (String) op.getOp().get("name");
+ KsDef def = new KsDef();
+ def.setName(ks);
+ def.setStrategy_class("SimpleStrategy");
+ Map<String, String> strat = new HashMap<String, String>();
+ strat.put("replication_factor", "1");
+ def.setStrategy_options(strat);
+ KSMetaData ksm = null;
+ try {
+ ksm = KSMetaData.fromThrift(def,
+ cfDefs.toArray(new CFMetaData[cfDefs.size()]));
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+
+ try {
+ MigrationManager.announceNewKeyspace(ksm);
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ CREATECOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String cf = (String) op.getOp().get("name");
+ CfDef def = new CfDef();
+ def.setName(cf);
+ def.setKeyspace(state.currentKeyspace);
+ def.unsetId();
+ CFMetaData cfm = null;
+
+ try {
+ cfm = CFMetaData.fromThrift(def);
+ cfm.addDefaultIndexNames();
+ } catch (org.apache.cassandra.exceptions.InvalidRequestException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ try {
+ MigrationManager.announceNewColumnFamily(cfm);
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ FOREACH {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ COLUMNPREDICATE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ SLICE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ List<Map> finalResults = new ArrayList<Map>();
+ ByteBuffer rowkey = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("rowkey"), req, res, state, i));
+ ByteBuffer start = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("start"), req, res, state, i));
+ ByteBuffer end = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("end"), req, res, state, i));
+ List<ReadCommand> commands = new ArrayList<ReadCommand>(1);
+ ColumnPath cp = new ColumnPath();
+ cp.setColumn_family(state.currentColumnFamily);
+ QueryPath qp = new QueryPath(cp);
+ SliceFromReadCommand sr = new SliceFromReadCommand(state.currentKeyspace, rowkey, qp, start, end, false, 100);
+ commands.add(sr);
+
+ List<Row> results = null;
+ try {
+ results = StorageProxy.read(commands, state.consistency);
+ ColumnFamily cf = results.get(0).cf;
+ if (cf == null){ //cf= null is no data
+ } else {
+ IntraService.readCf(cf, finalResults, state);
+ }
+ res.getOpsRes().put(i,finalResults);
+ } catch (ReadTimeoutException | org.apache.cassandra.exceptions.UnavailableException
+ | IsBootstrappingException | IOException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, finalResults);
+ }
+ },
+ GETREF {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ GET {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ List<Map> finalResults = new ArrayList<Map>();
+ ByteBuffer rowkey = IntraService.byteBufferForObject(IntraService.resolveObject(
+ op.getOp().get("rowkey"), req, res, state, i));
+ ByteBuffer column = IntraService.byteBufferForObject(IntraService.resolveObject(
+ op.getOp().get("column"), req, res, state, i));
+ QueryPath path = new QueryPath(state.currentColumnFamily, null);
+ List<ByteBuffer> nameAsList = Arrays.asList(column);
+ ReadCommand command = new SliceByNamesReadCommand(state.currentKeyspace,
+ rowkey, path, nameAsList);
+ List<Row> rows = null;
+
+ try {
+ rows = StorageProxy.read(Arrays.asList(command), state.consistency);
+ ColumnFamily cf = rows.get(0).cf;
+ if (cf == null) { // cf= null is no data
+ } else {
+ IntraService.readCf(cf, finalResults, state);
+ }
+ res.getOpsRes().put(i, finalResults);
+ } catch (ReadTimeoutException | org.apache.cassandra.exceptions.UnavailableException
+ | IsBootstrappingException | IOException e) {
+ res.getOpsRes().put(i, e.getMessage());
+ return;
+ }
+ }
+ },
+ SET {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ RowMutation rm = new RowMutation(state.currentKeyspace,
+ IntraService.byteBufferForObject(
+ IntraService.resolveObject(op.getOp().get("rowkey"), req, res, state, i)
+ ));
+ QueryPath qp = new QueryPath(state.currentColumnFamily,null, IntraService.byteBufferForObject(
+ IntraService.resolveObject(op.getOp().get("columnName"), req, res, state, i)
+ ) );
+ Object val = op.getOp().get("value");
+ rm.add(qp, IntraService.byteBufferForObject(
+ IntraService.resolveObject(val, req, res, state, i)
+ ), (Long) (state.autoTimestamp ? state.nanotime : op.getOp().get("timestamp")));
+ try {
+ StorageProxy.mutate(Arrays.asList(rm), state.consistency);
+ res.getOpsRes().put(i, "OK");
+ } catch (WriteTimeoutException | org.apache.cassandra.exceptions.UnavailableException | OverloadedException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;}
+ }
+ },
+ AUTOTIMESTAMP {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ state.autoTimestamp = true;
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ SETKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ state.currentKeyspace = (String) op.getOp().get("keyspace");
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ SETCOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ state.currentColumnFamily = (String) op.getOp().get("columnfamily");
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ ASSUME {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ IntraMetaData imd = new IntraMetaData();
+ imd.keyspace = (String) op.getOp().get("keyspace");
+ imd.columnfamily = (String) op.getOp().get("columnfamily");
+ imd.type = (String) op.getOp().get("type");
+ state.meta.put( imd , (String) op.getOp().get("clazz") );
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ CREATEPROCESSOR {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ Processor p = null;
+ try {
+ p = (Processor) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.processors.put(name,p);
+ }
+ },
+ PROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String processorName = (String) op.getOp().get("processorname");
+ Map params = (Map) op.getOp().get("params");
+ Processor p = state.processors.get(processorName);
+ Integer inputId = (Integer) op.getOp().get("input");
+ List<Map> toProcess = (List<Map>)res.getOpsRes().get(inputId);
+ List<Map> results = p.process(toProcess);
+ res.getOpsRes().put(i, results);
+ }
+ },
+ DROPKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ CREATEFILTER {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ Filter f = null;
+ try {
+ f = (Filter) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.filters.put(name, f);
+ }
+ },
+ FILTERMODE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Boolean on = (Boolean) op.getOp().get("on");
+ if (on){
+ Filter f = state.filters.get(name);
+ if (f == null){
+ res.setExceptionAndId("filter "+name +" not found", i);
+ return;
+ } else {
+ state.currentFilter=f;
+ }
+ } else {
+ state.currentFilter = null;
+ }
+ }
+ },
+ CQLQUERY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ ClientState clientState = new ClientState();
+ try {
+ clientState.setCQLVersion((String) op.getOp().get("version"));
+ clientState.setKeyspace(state.currentKeyspace);
+ } catch (InvalidRequestException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ QueryState queryState = new QueryState(clientState);
+ ResultMessage rm = null;
+ try {
+ rm = QueryProcessor.process((String) op.getOp().get("query"), state.consistency, queryState);
+ } catch (RequestExecutionException | RequestValidationException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ //ToDo maybe processInternal
+ CqlResult result = rm.toThriftResult();
+
+ List<CqlRow> rows = result.getRows();
+ List<HashMap> returnRows = new ArrayList<HashMap>();
+ for (CqlRow row: rows){
+ List<org.apache.cassandra.thrift.Column> columns = row.getColumns();
+ for (org.apache.cassandra.thrift.Column c: columns){
+ HashMap m = new HashMap();
+ m.put("value", c.value);
+ m.put("name", c.name);
+ returnRows.add(m);
+ }
+ }
+ res.getOpsRes().put(i,returnRows);
+ }
+ },
+ CLEAR {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ int id = (Integer) op.getOp().get("id");
+ res.getOpsRes().put(id, new ArrayList<HashMap>());
+ }
+ },
+ CREATEMULTIPROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ MultiProcessor p = null;
+ try {
+ p = (MultiProcessor) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.multiProcessors.put(name, p);
+ }
+ },
+ MULTIPROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Map params = (Map) op.getOp().get("params");
+ //Processor p = state.processors.get(processorName);
+ MultiProcessor p = state.multiProcessors.get(name);
+
+ List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
+ res.getOpsRes().put(i, mpResults);
+ }
+ };
+
+ public abstract void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx);
}
View
438 src/main/java/org/usergrid/vx/experimental/IntraService.java
@@ -1,69 +1,13 @@
package org.usergrid.vx.experimental;
-import groovy.lang.GroovyClassLoader;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.SliceByNamesReadCommand;
-
-import org.apache.cassandra.db.SliceFromReadCommand;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.IsBootstrappingException;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CassandraServer;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.thrift.KsDef;
-import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.eventbus.impl.MessageFactory;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
-import com.hazelcast.core.Message;
-import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
public class IntraService {
@@ -84,50 +28,7 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
for (int i=0;i<req.getE().size() && res.getException() == null ;i++){
IntraOp op = req.getE().get(i);
try {
- if (op.getType().equals(IntraOp.Type.SETKEYSPACE)) {
- state.currentKeyspace = (String) op.getOp().get("keyspace");
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.CREATEKEYSPACE)) {
- createKeyspace(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.CREATECOLUMNFAMILY)) {
- createColumnFamily(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.SETCOLUMNFAMILY)){
- state.currentColumnFamily = (String) op.getOp().get("columnfamily");
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.AUTOTIMESTAMP)){
- state.autoTimestamp = true;
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.SET)){
- set(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.SLICE)){
- slice(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.GET)){
- get(req,res,state,i);
- } else if (op.getType().equals(IntraOp.Type.CONSISTENCY)){
- consistency(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.LISTKEYSPACES)){
- listKeyspaces(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.LISTCOLUMNFAMILY)){
- listColumnFamily(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.ASSUME)){
- assume(req,res,state,i);
- } else if (op.getType().equals(IntraOp.Type.CREATEPROCESSOR)){
- createProcessor(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.PROCESS)){
- process(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CREATEFILTER)){
- createFilter(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.FILTERMODE)){
- filterMode(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CQLQUERY)){
- cqlQuery(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CLEAR)){
- clear(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CREATEMULTIPROCESS)){
- createMultiProcess(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.MULTIPROCESS)){
- multiProcess(req,res,state,i,vertx);
- }
+ op.getType().execute(req, res, state, i, vertx);
} catch (Exception ex){
res.setExceptionAndId(ex,i);
ex.printStackTrace();
@@ -136,19 +37,8 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
return true;
}
- private void multiProcess(IntraReq req, IntraRes res, IntraState state,
- int i, Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- Map params = (Map) op.getOp().get("params");
- //Processor p = state.processors.get(processorName);
- MultiProcessor p = state.multiProcessors.get(name);
-
- List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
- res.getOpsRes().put(i, mpResults);
-
- }
- private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
+
+ static Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
if (o instanceof Object[]){
return o;
} else if (o instanceof Integer){
@@ -170,7 +60,7 @@ private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState sta
throw new RuntimeException(" do not know what to do with "+o.getClass());
}
}
- ByteBuffer byteBufferForObject(Object o){
+ static ByteBuffer byteBufferForObject(Object o){
if (o instanceof Object[]){
Object [] comp = (Object[]) o;
List<byte[]> b = new ArrayList<byte[]>();
@@ -224,186 +114,11 @@ public boolean verifyReq(IntraReq req, IntraRes res){
}
- private void createColumnFamily(IntraReq req, IntraRes res, IntraState state, int i){
- IntraOp op = req.getE().get(i);
- String cf = (String) op.getOp().get("name");
- CfDef def = new CfDef();
- def.setName(cf);
- def.setKeyspace(state.currentKeyspace);
- def.unsetId();
- CFMetaData cfm = null;
-
- try {
- cfm = CFMetaData.fromThrift(def);
- cfm.addDefaultIndexNames();
- } catch (org.apache.cassandra.exceptions.InvalidRequestException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- try {
- MigrationManager.announceNewColumnFamily(cfm);
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, "OK");
- }
-
- private void createKeyspace(IntraReq req, IntraRes res, IntraState state,
- int i) {
- Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(0);
- IntraOp op = req.getE().get(i);
- String ks = (String) op.getOp().get("name");
- KsDef def = new KsDef();
- def.setName(ks);
- def.setStrategy_class("SimpleStrategy");
- Map<String, String> strat = new HashMap<String, String>();
- strat.put("replication_factor", "1");
- def.setStrategy_options(strat);
- KSMetaData ksm = null;
- try {
- ksm = KSMetaData.fromThrift(def,
- cfDefs.toArray(new CFMetaData[cfDefs.size()]));
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
-
- try {
- MigrationManager.announceNewKeyspace(ksm);
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, "OK");
- }
-
- public void set(IntraReq req, IntraRes res, IntraState state,int i) {
- IntraOp op = req.getE().get(i);
- RowMutation rm = new RowMutation(state.currentKeyspace,byteBufferForObject(
- resolveObject ( op.getOp().get("rowkey"),req,res,state, i )
- ));
- QueryPath qp = new QueryPath(state.currentColumnFamily,null, byteBufferForObject(
- resolveObject ( op.getOp().get("columnName"),req,res,state, i )
- ) );
- Object val = op.getOp().get("value");
- rm.add(qp, byteBufferForObject(
- resolveObject (val ,req,res,state, i )
- ), (Long) (state.autoTimestamp ? state.nanotime : op.getOp().get("timestamp")));
- Collection<RowMutation> col = new ArrayList<RowMutation>();
- col.add(rm);
- try {
- StorageProxy.mutate(col, state.consistency);
- res.getOpsRes().put(i, "OK");
- } catch (WriteTimeoutException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (org.apache.cassandra.exceptions.UnavailableException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (OverloadedException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- }
-
- private void slice(IntraReq req, IntraRes res, IntraState state,int i){
- IntraOp op = req.getE().get(i);
- List<Map> finalResults = new ArrayList<Map>();
- ByteBuffer rowkey = byteBufferForObject(resolveObject(op.getOp().get("rowkey"),req,res,state,i));
- ByteBuffer start = byteBufferForObject(resolveObject(op.getOp().get("start"),req,res,state,i));
- ByteBuffer end = byteBufferForObject(resolveObject(op.getOp().get("end"),req,res,state,i));
- List<ReadCommand> commands = new ArrayList<ReadCommand>(1);
- ColumnPath cp = new ColumnPath();
- cp.setColumn_family(state.currentColumnFamily);
- QueryPath qp = new QueryPath(cp);
- SliceFromReadCommand sr = new SliceFromReadCommand(state.currentKeyspace, rowkey, qp, start, end, false, 100);
- commands.add(sr);
-
- List<Row> results = null;
- try {
- results = StorageProxy.read(commands, state.consistency);
- ColumnFamily cf = results.get(0).cf;
- if (cf == null){ //cf= null is no data
- } else {
- readCf(cf, finalResults,state);
- }
- res.getOpsRes().put(i,finalResults);
- } catch (ReadTimeoutException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (org.apache.cassandra.exceptions.UnavailableException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (IsBootstrappingException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (IOException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, finalResults);
- }
-
- private void consistency(IntraReq req, IntraRes res, IntraState state,int i){
- IntraOp op = req.getE().get(i);
- ConsistencyLevel level = ConsistencyLevel.valueOf((String) op.getOp().get("level"));
- res.getOpsRes().put(i, "OK");
- state.consistency = level;
- }
- private void listKeyspaces(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- res.getOpsRes().put(i, Schema.instance.getNonSystemTables());
- }
- private void listColumnFamily(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- String keyspace = (String) op.getOp().get("keyspace");
- KSMetaData ks = Schema.instance.getKSMetaData(keyspace);
- res.getOpsRes().put(i, ks.cfMetaData().keySet());
- }
-
- private void get(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- List<Map> finalResults = new ArrayList<Map>();
- ByteBuffer rowkey = byteBufferForObject(resolveObject(
- op.getOp().get("rowkey"), req, res, state, i));
- ByteBuffer column = byteBufferForObject(resolveObject(
- op.getOp().get("column"), req, res, state, i));
- QueryPath path = new QueryPath(state.currentColumnFamily, null);
- List<ByteBuffer> nameAsList = Arrays.asList(column);
- ReadCommand command = new SliceByNamesReadCommand(state.currentKeyspace,
- rowkey, path, nameAsList);
- List<Row> rows = null;
- try {
- rows = StorageProxy.read(Arrays.asList(command), state.consistency);
- ColumnFamily cf = rows.get(0).cf;
- if (cf == null) { // cf= null is no data
- } else {
- readCf(cf, finalResults,state);
- }
- res.getOpsRes().put(i, finalResults);
- } catch (ReadTimeoutException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (UnavailableException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (IsBootstrappingException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (IOException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- }
- }
- private void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
+ static void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
Iterator<IColumn> it = cf.iterator();
while (it.hasNext()) {
IColumn ic = it.next();
@@ -420,141 +135,6 @@ private void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
}
}
}
-
- private void assume(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- IntraMetaData imd = new IntraMetaData();
- imd.keyspace = (String) op.getOp().get("keyspace");
- imd.columnfamily = (String) op.getOp().get("columnfamily");
- imd.type = (String) op.getOp().get("type");
- state.meta.put( imd , (String) op.getOp().get("clazz") );
- res.getOpsRes().put(i, "OK");
- }
-
- private void createProcessor(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- Processor p = null;
- try {
- p = (Processor) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.processors.put(name,p);
- }
-
- private void createFilter(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- Filter f = null;
- try {
- f = (Filter) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.filters.put(name, f);
- }
-
- private void filterMode(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- Boolean on = (Boolean) op.getOp().get("on");
- if (on){
- Filter f = state.filters.get(name);
- if (f == null){
- res.setExceptionAndId("filter "+name +" not found", i);
- return;
- } else {
- state.currentFilter=f;
- }
- } else {
- state.currentFilter = null;
- }
- }
-
- private void process(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String processorName = (String) op.getOp().get("processorname");
- Map params = (Map) op.getOp().get("params");
- Processor p = state.processors.get(processorName);
- Integer inputId = (Integer) op.getOp().get("input");
- List<Map> toProcess = (List<Map>)res.getOpsRes().get(inputId);
- List<Map> results = p.process(toProcess);
- res.getOpsRes().put(i, results);
- }
-
- private void cqlQuery(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- ClientState clientState = new ClientState();
- try {
- clientState.setCQLVersion((String) op.getOp().get("version"));
- clientState.setKeyspace(state.currentKeyspace);
- } catch (InvalidRequestException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- QueryState queryState = new QueryState(clientState);
- ResultMessage rm = null;
- try {
- rm = QueryProcessor.process((String) op.getOp().get("query"), state.consistency, queryState);
- } catch (RequestExecutionException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (RequestValidationException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- //ToDo maybe processInternal
- CqlResult result = rm.toThriftResult();
-
- List<CqlRow> rows = result.getRows();
- List<HashMap> returnRows = new ArrayList<HashMap>();
- for (CqlRow row: rows){
- List<Column> columns = row.getColumns();
- for (Column c: columns){
- HashMap m = new HashMap();
- m.put("value", c.value);
- m.put("name", c.name);
- returnRows.add(m);
- }
- }
- res.getOpsRes().put(i,returnRows);
- }
-
- private void clear(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- int id = (Integer) op.getOp().get("id");
- res.getOpsRes().put(id, new ArrayList<HashMap>());
-
- }
-
- private void createMultiProcess(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- MultiProcessor p = null;
- try {
- p = (MultiProcessor) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.multiProcessors.put(name, p);
- }
+
+
}

0 comments on commit 9c4f706

Please sign in to comment.