Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

moved service execution logic into enum for implicity execution w/o look... #46

Merged
merged 3 commits into from

2 participants

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
419 src/main/java/org/usergrid/vx/experimental/IntraOp.java
@@ -1,11 +1,30 @@
package org.usergrid.vx.experimental;
import com.google.common.base.Preconditions;
+import groovy.lang.GroovyClassLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Column;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.exceptions.InvalidRequestException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.MigrationManager;
+import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.transport.messages.ResultMessage;
+import org.vertx.java.core.Vertx;
+import java.io.IOException;
import java.io.Serializable;
-import java.util.Map;
-import java.util.TreeMap;
+import java.nio.ByteBuffer;
+import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
public class IntraOp implements Serializable{
@@ -232,30 +251,378 @@ private static void checkForBlankStr(String arg, String msg, Type type) {
public enum Type {
- LISTCOLUMNFAMILY,
- LISTKEYSPACES,
- CONSISTENCY,
- CREATEKEYSPACE,
- CREATECOLUMNFAMILY,
- FOREACH,
- COLUMNPREDICATE,
- SLICE,
- GETREF,
- GET,
- SET,
- AUTOTIMESTAMP,
- SETKEYSPACE,
- SETCOLUMNFAMILY,
- ASSUME,
- CREATEPROCESSOR,
- PROCESS,
- DROPKEYSPACE,
- CREATEFILTER,
- FILTERMODE,
- CQLQUERY,
- CLEAR,
- CREATEMULTIPROCESS,
- MULTIPROCESS
+ LISTCOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String keyspace = (String) op.getOp().get("keyspace");
+ KSMetaData ks = Schema.instance.getKSMetaData(keyspace);
+ res.getOpsRes().put(i, ks.cfMetaData().keySet());
+ }
+ },
+ LISTKEYSPACES {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ res.getOpsRes().put(i, Schema.instance.getNonSystemTables());
+ }
+ },
+ CONSISTENCY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ ConsistencyLevel level = ConsistencyLevel.valueOf((String) op.getOp().get("level"));
+ res.getOpsRes().put(i, "OK");
+ state.consistency = level;
+ }
+ },
+ CREATEKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(0);
+ IntraOp op = req.getE().get(i);
+ String ks = (String) op.getOp().get("name");
+ KsDef def = new KsDef();
+ def.setName(ks);
+ def.setStrategy_class("SimpleStrategy");
+ Map<String, String> strat = new HashMap<String, String>();
+ strat.put("replication_factor", "1");
+ def.setStrategy_options(strat);
+ KSMetaData ksm = null;
+ try {
+ ksm = KSMetaData.fromThrift(def,
+ cfDefs.toArray(new CFMetaData[cfDefs.size()]));
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+
+ try {
+ MigrationManager.announceNewKeyspace(ksm);
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ CREATECOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String cf = (String) op.getOp().get("name");
+ CfDef def = new CfDef();
+ def.setName(cf);
+ def.setKeyspace(state.currentKeyspace);
+ def.unsetId();
+ CFMetaData cfm = null;
+
+ try {
+ cfm = CFMetaData.fromThrift(def);
+ cfm.addDefaultIndexNames();
+ } catch (org.apache.cassandra.exceptions.InvalidRequestException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ try {
+ MigrationManager.announceNewColumnFamily(cfm);
+ } catch (ConfigurationException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ FOREACH {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ COLUMNPREDICATE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ SLICE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ List<Map> finalResults = new ArrayList<Map>();
+ ByteBuffer rowkey = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("rowkey"), req, res, state, i));
+ ByteBuffer start = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("start"), req, res, state, i));
+ ByteBuffer end = IntraService.byteBufferForObject(IntraService.resolveObject(op.getOp().get("end"), req, res, state, i));
+ List<ReadCommand> commands = new ArrayList<ReadCommand>(1);
+ ColumnPath cp = new ColumnPath();
+ cp.setColumn_family(state.currentColumnFamily);
+ QueryPath qp = new QueryPath(cp);
+ SliceFromReadCommand sr = new SliceFromReadCommand(state.currentKeyspace, rowkey, qp, start, end, false, 100);
+ commands.add(sr);
+
+ List<Row> results = null;
+ try {
+ results = StorageProxy.read(commands, state.consistency);
+ ColumnFamily cf = results.get(0).cf;
+ if (cf == null){ //cf= null is no data
+ } else {
+ IntraService.readCf(cf, finalResults, state);
+ }
+ res.getOpsRes().put(i,finalResults);
+ } catch (ReadTimeoutException | org.apache.cassandra.exceptions.UnavailableException
+ | IsBootstrappingException | IOException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;
+ }
+ res.getOpsRes().put(i, finalResults);
+ }
+ },
+ GETREF {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ GET {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ List<Map> finalResults = new ArrayList<Map>();
+ ByteBuffer rowkey = IntraService.byteBufferForObject(IntraService.resolveObject(
+ op.getOp().get("rowkey"), req, res, state, i));
+ ByteBuffer column = IntraService.byteBufferForObject(IntraService.resolveObject(
+ op.getOp().get("column"), req, res, state, i));
+ QueryPath path = new QueryPath(state.currentColumnFamily, null);
+ List<ByteBuffer> nameAsList = Arrays.asList(column);
+ ReadCommand command = new SliceByNamesReadCommand(state.currentKeyspace,
+ rowkey, path, nameAsList);
+ List<Row> rows = null;
+
+ try {
+ rows = StorageProxy.read(Arrays.asList(command), state.consistency);
+ ColumnFamily cf = rows.get(0).cf;
+ if (cf == null) { // cf= null is no data
+ } else {
+ IntraService.readCf(cf, finalResults, state);
+ }
+ res.getOpsRes().put(i, finalResults);
+ } catch (ReadTimeoutException | org.apache.cassandra.exceptions.UnavailableException
+ | IsBootstrappingException | IOException e) {
+ res.getOpsRes().put(i, e.getMessage());
+ return;
+ }
+ }
+ },
+ SET {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ RowMutation rm = new RowMutation(state.currentKeyspace,
+ IntraService.byteBufferForObject(
+ IntraService.resolveObject(op.getOp().get("rowkey"), req, res, state, i)
+ ));
+ QueryPath qp = new QueryPath(state.currentColumnFamily,null, IntraService.byteBufferForObject(
+ IntraService.resolveObject(op.getOp().get("columnName"), req, res, state, i)
+ ) );
+ Object val = op.getOp().get("value");
+ rm.add(qp, IntraService.byteBufferForObject(
+ IntraService.resolveObject(val, req, res, state, i)
+ ), (Long) (state.autoTimestamp ? state.nanotime : op.getOp().get("timestamp")));
+ try {
+ StorageProxy.mutate(Arrays.asList(rm), state.consistency);
+ res.getOpsRes().put(i, "OK");
+ } catch (WriteTimeoutException | org.apache.cassandra.exceptions.UnavailableException | OverloadedException e) {
+ res.setExceptionAndId(e.getMessage(), i);
+ return;}
+ }
+ },
+ AUTOTIMESTAMP {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ state.autoTimestamp = true;
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ SETKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ state.currentKeyspace = (String) op.getOp().get("keyspace");
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ SETCOLUMNFAMILY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ state.currentColumnFamily = (String) op.getOp().get("columnfamily");
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ ASSUME {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ IntraMetaData imd = new IntraMetaData();
+ imd.keyspace = (String) op.getOp().get("keyspace");
+ imd.columnfamily = (String) op.getOp().get("columnfamily");
+ imd.type = (String) op.getOp().get("type");
+ state.meta.put( imd , (String) op.getOp().get("clazz") );
+ res.getOpsRes().put(i, "OK");
+ }
+ },
+ CREATEPROCESSOR {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ Processor p = null;
+ try {
+ p = (Processor) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.processors.put(name,p);
+ }
+ },
+ PROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String processorName = (String) op.getOp().get("processorname");
+ Map params = (Map) op.getOp().get("params");
+ Processor p = state.processors.get(processorName);
+ Integer inputId = (Integer) op.getOp().get("input");
+ List<Map> toProcess = (List<Map>)res.getOpsRes().get(inputId);
+ List<Map> results = p.process(toProcess);
+ res.getOpsRes().put(i, results);
+ }
+ },
+ DROPKEYSPACE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+
+ }
+ },
+ CREATEFILTER {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ Filter f = null;
+ try {
+ f = (Filter) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.filters.put(name, f);
+ }
+ },
+ FILTERMODE {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Boolean on = (Boolean) op.getOp().get("on");
+ if (on){
+ Filter f = state.filters.get(name);
+ if (f == null){
+ res.setExceptionAndId("filter "+name +" not found", i);
+ return;
+ } else {
+ state.currentFilter=f;
+ }
+ } else {
+ state.currentFilter = null;
+ }
+ }
+ },
+ CQLQUERY {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ ClientState clientState = new ClientState();
+ try {
+ clientState.setCQLVersion((String) op.getOp().get("version"));
+ clientState.setKeyspace(state.currentKeyspace);
+ } catch (InvalidRequestException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ QueryState queryState = new QueryState(clientState);
+ ResultMessage rm = null;
+ try {
+ rm = QueryProcessor.process((String) op.getOp().get("query"), state.consistency, queryState);
+ } catch (RequestExecutionException | RequestValidationException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ //ToDo maybe processInternal
+ CqlResult result = rm.toThriftResult();
+
+ List<CqlRow> rows = result.getRows();
+ List<HashMap> returnRows = new ArrayList<HashMap>();
+ for (CqlRow row: rows){
+ List<org.apache.cassandra.thrift.Column> columns = row.getColumns();
+ for (org.apache.cassandra.thrift.Column c: columns){
+ HashMap m = new HashMap();
+ m.put("value", c.value);
+ m.put("name", c.name);
+ returnRows.add(m);
+ }
+ }
+ res.getOpsRes().put(i,returnRows);
+ }
+ },
+ CLEAR {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ int id = (Integer) op.getOp().get("id");
+ res.getOpsRes().put(id, new ArrayList<HashMap>());
+ }
+ },
+ CREATEMULTIPROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ GroovyClassLoader gc = new GroovyClassLoader();
+ Class c = gc.parseClass((String) op.getOp().get("value") );
+ MultiProcessor p = null;
+ try {
+ p = (MultiProcessor) c.newInstance();
+ } catch (InstantiationException | IllegalAccessException e) {
+ res.setExceptionAndId(e, i);
+ return;
+ }
+ IntraState.multiProcessors.put(name, p);
+ }
+ },
+ MULTIPROCESS {
+ @Override
+ public void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx) {
+ IntraOp op = req.getE().get(i);
+ String name = (String) op.getOp().get("name");
+ Map params = (Map) op.getOp().get("params");
+ //Processor p = state.processors.get(processorName);
+ MultiProcessor p = state.multiProcessors.get(name);
+
+ List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
+ res.getOpsRes().put(i, mpResults);
+ }
+ };
+
+ public abstract void execute(IntraReq req, IntraRes res, IntraState state, int i, Vertx vertx);
}
View
438 src/main/java/org/usergrid/vx/experimental/IntraService.java
@@ -1,69 +1,13 @@
package org.usergrid.vx.experimental;
-import groovy.lang.GroovyClassLoader;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.KSMetaData;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.ReadCommand;
-import org.apache.cassandra.db.Row;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.SliceByNamesReadCommand;
-
-import org.apache.cassandra.db.SliceFromReadCommand;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.Int32Type;
-import org.apache.cassandra.db.marshal.IntegerType;
-import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.exceptions.IsBootstrappingException;
-import org.apache.cassandra.exceptions.OverloadedException;
-import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.exceptions.RequestExecutionException;
-import org.apache.cassandra.exceptions.RequestValidationException;
-import org.apache.cassandra.exceptions.UnavailableException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.service.ClientState;
-import org.apache.cassandra.service.MigrationManager;
-import org.apache.cassandra.service.QueryState;
-import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.thrift.CassandraServer;
-import org.apache.cassandra.thrift.CfDef;
-import org.apache.cassandra.thrift.Column;
-import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.ColumnPath;
-import org.apache.cassandra.thrift.CqlResult;
-import org.apache.cassandra.thrift.CqlRow;
-import org.apache.cassandra.thrift.KsDef;
-import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx;
-import org.vertx.java.core.buffer.Buffer;
-import org.vertx.java.core.eventbus.impl.MessageFactory;
-import org.vertx.java.core.json.JsonArray;
-import org.vertx.java.core.json.JsonObject;
-import com.hazelcast.core.Message;
-import com.sun.xml.internal.ws.policy.privateutil.PolicyUtils.Collections;
+import java.nio.ByteBuffer;
+import java.util.*;
public class IntraService {
@@ -84,50 +28,7 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
for (int i=0;i<req.getE().size() && res.getException() == null ;i++){
IntraOp op = req.getE().get(i);
try {
- if (op.getType().equals(IntraOp.Type.SETKEYSPACE)) {
- state.currentKeyspace = (String) op.getOp().get("keyspace");
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.CREATEKEYSPACE)) {
- createKeyspace(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.CREATECOLUMNFAMILY)) {
- createColumnFamily(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.SETCOLUMNFAMILY)){
- state.currentColumnFamily = (String) op.getOp().get("columnfamily");
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.AUTOTIMESTAMP)){
- state.autoTimestamp = true;
- res.getOpsRes().put(i, "OK");
- } else if (op.getType().equals(IntraOp.Type.SET)){
- set(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.SLICE)){
- slice(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.GET)){
- get(req,res,state,i);
- } else if (op.getType().equals(IntraOp.Type.CONSISTENCY)){
- consistency(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.LISTKEYSPACES)){
- listKeyspaces(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.LISTCOLUMNFAMILY)){
- listColumnFamily(req, res, state, i);
- } else if (op.getType().equals(IntraOp.Type.ASSUME)){
- assume(req,res,state,i);
- } else if (op.getType().equals(IntraOp.Type.CREATEPROCESSOR)){
- createProcessor(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.PROCESS)){
- process(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CREATEFILTER)){
- createFilter(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.FILTERMODE)){
- filterMode(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CQLQUERY)){
- cqlQuery(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CLEAR)){
- clear(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.CREATEMULTIPROCESS)){
- createMultiProcess(req,res,state,i,vertx);
- } else if (op.getType().equals(IntraOp.Type.MULTIPROCESS)){
- multiProcess(req,res,state,i,vertx);
- }
+ op.getType().execute(req, res, state, i, vertx);
} catch (Exception ex){
res.setExceptionAndId(ex,i);
ex.printStackTrace();
@@ -136,19 +37,8 @@ protected boolean executeReq(IntraReq req, IntraRes res, IntraState state, Vertx
return true;
}
- private void multiProcess(IntraReq req, IntraRes res, IntraState state,
- int i, Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- Map params = (Map) op.getOp().get("params");
- //Processor p = state.processors.get(processorName);
- MultiProcessor p = state.multiProcessors.get(name);
-
- List<Map> mpResults = p.multiProcess(res.getOpsRes(), params);
- res.getOpsRes().put(i, mpResults);
-
- }
- private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
+
+ static Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState state, int i){
if (o instanceof Object[]){
return o;
} else if (o instanceof Integer){
@@ -170,7 +60,7 @@ private Object resolveObject(Object o, IntraReq req, IntraRes res,IntraState sta
throw new RuntimeException(" do not know what to do with "+o.getClass());
}
}
- ByteBuffer byteBufferForObject(Object o){
+ static ByteBuffer byteBufferForObject(Object o){
if (o instanceof Object[]){
Object [] comp = (Object[]) o;
List<byte[]> b = new ArrayList<byte[]>();
@@ -224,186 +114,11 @@ public boolean verifyReq(IntraReq req, IntraRes res){
}
- private void createColumnFamily(IntraReq req, IntraRes res, IntraState state, int i){
- IntraOp op = req.getE().get(i);
- String cf = (String) op.getOp().get("name");
- CfDef def = new CfDef();
- def.setName(cf);
- def.setKeyspace(state.currentKeyspace);
- def.unsetId();
- CFMetaData cfm = null;
-
- try {
- cfm = CFMetaData.fromThrift(def);
- cfm.addDefaultIndexNames();
- } catch (org.apache.cassandra.exceptions.InvalidRequestException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- try {
- MigrationManager.announceNewColumnFamily(cfm);
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, "OK");
- }
-
- private void createKeyspace(IntraReq req, IntraRes res, IntraState state,
- int i) {
- Collection<CFMetaData> cfDefs = new ArrayList<CFMetaData>(0);
- IntraOp op = req.getE().get(i);
- String ks = (String) op.getOp().get("name");
- KsDef def = new KsDef();
- def.setName(ks);
- def.setStrategy_class("SimpleStrategy");
- Map<String, String> strat = new HashMap<String, String>();
- strat.put("replication_factor", "1");
- def.setStrategy_options(strat);
- KSMetaData ksm = null;
- try {
- ksm = KSMetaData.fromThrift(def,
- cfDefs.toArray(new CFMetaData[cfDefs.size()]));
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
-
- try {
- MigrationManager.announceNewKeyspace(ksm);
- } catch (ConfigurationException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, "OK");
- }
-
- public void set(IntraReq req, IntraRes res, IntraState state,int i) {
- IntraOp op = req.getE().get(i);
- RowMutation rm = new RowMutation(state.currentKeyspace,byteBufferForObject(
- resolveObject ( op.getOp().get("rowkey"),req,res,state, i )
- ));
- QueryPath qp = new QueryPath(state.currentColumnFamily,null, byteBufferForObject(
- resolveObject ( op.getOp().get("columnName"),req,res,state, i )
- ) );
- Object val = op.getOp().get("value");
- rm.add(qp, byteBufferForObject(
- resolveObject (val ,req,res,state, i )
- ), (Long) (state.autoTimestamp ? state.nanotime : op.getOp().get("timestamp")));
- Collection<RowMutation> col = new ArrayList<RowMutation>();
- col.add(rm);
- try {
- StorageProxy.mutate(col, state.consistency);
- res.getOpsRes().put(i, "OK");
- } catch (WriteTimeoutException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (org.apache.cassandra.exceptions.UnavailableException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (OverloadedException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- }
-
- private void slice(IntraReq req, IntraRes res, IntraState state,int i){
- IntraOp op = req.getE().get(i);
- List<Map> finalResults = new ArrayList<Map>();
- ByteBuffer rowkey = byteBufferForObject(resolveObject(op.getOp().get("rowkey"),req,res,state,i));
- ByteBuffer start = byteBufferForObject(resolveObject(op.getOp().get("start"),req,res,state,i));
- ByteBuffer end = byteBufferForObject(resolveObject(op.getOp().get("end"),req,res,state,i));
- List<ReadCommand> commands = new ArrayList<ReadCommand>(1);
- ColumnPath cp = new ColumnPath();
- cp.setColumn_family(state.currentColumnFamily);
- QueryPath qp = new QueryPath(cp);
- SliceFromReadCommand sr = new SliceFromReadCommand(state.currentKeyspace, rowkey, qp, start, end, false, 100);
- commands.add(sr);
-
- List<Row> results = null;
- try {
- results = StorageProxy.read(commands, state.consistency);
- ColumnFamily cf = results.get(0).cf;
- if (cf == null){ //cf= null is no data
- } else {
- readCf(cf, finalResults,state);
- }
- res.getOpsRes().put(i,finalResults);
- } catch (ReadTimeoutException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (org.apache.cassandra.exceptions.UnavailableException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (IsBootstrappingException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- } catch (IOException e) {
- res.setExceptionAndId(e.getMessage(), i);
- return;
- }
- res.getOpsRes().put(i, finalResults);
- }
-
- private void consistency(IntraReq req, IntraRes res, IntraState state,int i){
- IntraOp op = req.getE().get(i);
- ConsistencyLevel level = ConsistencyLevel.valueOf((String) op.getOp().get("level"));
- res.getOpsRes().put(i, "OK");
- state.consistency = level;
- }
- private void listKeyspaces(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- res.getOpsRes().put(i, Schema.instance.getNonSystemTables());
- }
- private void listColumnFamily(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- String keyspace = (String) op.getOp().get("keyspace");
- KSMetaData ks = Schema.instance.getKSMetaData(keyspace);
- res.getOpsRes().put(i, ks.cfMetaData().keySet());
- }
-
- private void get(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- List<Map> finalResults = new ArrayList<Map>();
- ByteBuffer rowkey = byteBufferForObject(resolveObject(
- op.getOp().get("rowkey"), req, res, state, i));
- ByteBuffer column = byteBufferForObject(resolveObject(
- op.getOp().get("column"), req, res, state, i));
- QueryPath path = new QueryPath(state.currentColumnFamily, null);
- List<ByteBuffer> nameAsList = Arrays.asList(column);
- ReadCommand command = new SliceByNamesReadCommand(state.currentKeyspace,
- rowkey, path, nameAsList);
- List<Row> rows = null;
- try {
- rows = StorageProxy.read(Arrays.asList(command), state.consistency);
- ColumnFamily cf = rows.get(0).cf;
- if (cf == null) { // cf= null is no data
- } else {
- readCf(cf, finalResults,state);
- }
- res.getOpsRes().put(i, finalResults);
- } catch (ReadTimeoutException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (UnavailableException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (IsBootstrappingException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- } catch (IOException e) {
- res.getOpsRes().put(i, e.getMessage());
- return;
- }
- }
- private void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
+ static void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
Iterator<IColumn> it = cf.iterator();
while (it.hasNext()) {
IColumn ic = it.next();
@@ -420,141 +135,6 @@ private void readCf(ColumnFamily cf , List<Map> finalResults, IntraState state){
}
}
}
-
- private void assume(IntraReq req, IntraRes res, IntraState state, int i) {
- IntraOp op = req.getE().get(i);
- IntraMetaData imd = new IntraMetaData();
- imd.keyspace = (String) op.getOp().get("keyspace");
- imd.columnfamily = (String) op.getOp().get("columnfamily");
- imd.type = (String) op.getOp().get("type");
- state.meta.put( imd , (String) op.getOp().get("clazz") );
- res.getOpsRes().put(i, "OK");
- }
-
- private void createProcessor(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- Processor p = null;
- try {
- p = (Processor) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.processors.put(name,p);
- }
-
- private void createFilter(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- Filter f = null;
- try {
- f = (Filter) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.filters.put(name, f);
- }
-
- private void filterMode(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- Boolean on = (Boolean) op.getOp().get("on");
- if (on){
- Filter f = state.filters.get(name);
- if (f == null){
- res.setExceptionAndId("filter "+name +" not found", i);
- return;
- } else {
- state.currentFilter=f;
- }
- } else {
- state.currentFilter = null;
- }
- }
-
- private void process(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String processorName = (String) op.getOp().get("processorname");
- Map params = (Map) op.getOp().get("params");
- Processor p = state.processors.get(processorName);
- Integer inputId = (Integer) op.getOp().get("input");
- List<Map> toProcess = (List<Map>)res.getOpsRes().get(inputId);
- List<Map> results = p.process(toProcess);
- res.getOpsRes().put(i, results);
- }
-
- private void cqlQuery(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- ClientState clientState = new ClientState();
- try {
- clientState.setCQLVersion((String) op.getOp().get("version"));
- clientState.setKeyspace(state.currentKeyspace);
- } catch (InvalidRequestException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- QueryState queryState = new QueryState(clientState);
- ResultMessage rm = null;
- try {
- rm = QueryProcessor.process((String) op.getOp().get("query"), state.consistency, queryState);
- } catch (RequestExecutionException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (RequestValidationException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- //ToDo maybe processInternal
- CqlResult result = rm.toThriftResult();
-
- List<CqlRow> rows = result.getRows();
- List<HashMap> returnRows = new ArrayList<HashMap>();
- for (CqlRow row: rows){
- List<Column> columns = row.getColumns();
- for (Column c: columns){
- HashMap m = new HashMap();
- m.put("value", c.value);
- m.put("name", c.name);
- returnRows.add(m);
- }
- }
- res.getOpsRes().put(i,returnRows);
- }
-
- private void clear(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- int id = (Integer) op.getOp().get("id");
- res.getOpsRes().put(id, new ArrayList<HashMap>());
-
- }
-
- private void createMultiProcess(IntraReq req, IntraRes res, IntraState state, int i,Vertx vertx) {
- IntraOp op = req.getE().get(i);
- String name = (String) op.getOp().get("name");
- GroovyClassLoader gc = new GroovyClassLoader();
- Class c = gc.parseClass((String) op.getOp().get("value") );
- MultiProcessor p = null;
- try {
- p = (MultiProcessor) c.newInstance();
- } catch (InstantiationException e) {
- res.setExceptionAndId(e, i);
- return;
- } catch (IllegalAccessException e) {
- res.setExceptionAndId(e, i);
- return;
- }
- IntraState.multiProcessors.put(name, p);
- }
+
+
}
Something went wrong with that request. Please try again.