Permalink
Browse files

added counterhandler and coverage for such. closes #114

  • Loading branch information...
1 parent 209b4cc commit 06e7d57523fcd73d16ced04b0a3ed8fe8a07da8e @zznate committed Feb 26, 2013
@@ -79,4 +79,8 @@ public int openScanner(ScanContext context) {
openedScanners.put(id, context);
return id;
}
+
+ public ConsistencyLevel getConsistency() {
+ return consistency;
+ }
}
@@ -24,16 +24,7 @@
import org.usergrid.vx.experimental.IntraHandlerJsonSmile;
import org.usergrid.vx.handler.http.HelloHandler;
import org.usergrid.vx.handler.http.NoMatchHandler;
-import org.usergrid.vx.server.operations.AssumeHandler;
-import org.usergrid.vx.server.operations.CqlQueryHandler;
-import org.usergrid.vx.server.operations.CreateColumnFamilyHandler;
-import org.usergrid.vx.server.operations.CreateKeyspaceHandler;
-import org.usergrid.vx.server.operations.GetHandler;
-import org.usergrid.vx.server.operations.ListKeyspacesHandler;
-import org.usergrid.vx.server.operations.SetColumnFamilyHandler;
-import org.usergrid.vx.server.operations.SetHandler;
-import org.usergrid.vx.server.operations.SetKeyspaceHandler;
-import org.usergrid.vx.server.operations.SliceHandler;
+import org.usergrid.vx.server.operations.*;
import org.vertx.java.core.Vertx;
import org.vertx.java.core.http.RouteMatcher;
@@ -79,17 +70,18 @@ public boolean isRunning() {
return running.get();
}
- private void registerOperationHandlers() {
- vertx.eventBus().registerHandler("request.createkeyspace", new CreateKeyspaceHandler());
- vertx.eventBus().registerHandler("request.setkeyspace", new SetKeyspaceHandler());
- vertx.eventBus().registerHandler("request.createcolumnfamily", new CreateColumnFamilyHandler());
- vertx.eventBus().registerHandler("request.listkeyspaces", new ListKeyspacesHandler());
- vertx.eventBus().registerHandler("request.set", new SetHandler());
- vertx.eventBus().registerHandler("request.setcolumnfamily", new SetColumnFamilyHandler());
- vertx.eventBus().registerHandler("request.assume", new AssumeHandler());
- vertx.eventBus().registerHandler("request.get", new GetHandler());
- vertx.eventBus().registerHandler("request.slice", new SliceHandler());
- vertx.eventBus().registerHandler("request.cqlquery", new CqlQueryHandler());
- }
+ private void registerOperationHandlers() {
+ vertx.eventBus().registerHandler("request.createkeyspace", new CreateKeyspaceHandler());
+ vertx.eventBus().registerHandler("request.setkeyspace", new SetKeyspaceHandler());
+ vertx.eventBus().registerHandler("request.createcolumnfamily", new CreateColumnFamilyHandler());
+ vertx.eventBus().registerHandler("request.listkeyspaces", new ListKeyspacesHandler());
+ vertx.eventBus().registerHandler("request.set", new SetHandler());
+ vertx.eventBus().registerHandler("request.setcolumnfamily", new SetColumnFamilyHandler());
+ vertx.eventBus().registerHandler("request.assume", new AssumeHandler());
+ vertx.eventBus().registerHandler("request.get", new GetHandler());
+ vertx.eventBus().registerHandler("request.slice", new SliceHandler());
+ vertx.eventBus().registerHandler("request.cqlquery", new CqlQueryHandler());
+ vertx.eventBus().registerHandler("request.counter", new CounterHandler());
+ }
}
@@ -0,0 +1,43 @@
+package org.usergrid.vx.server.operations;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.CounterMutation;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.filter.QueryPath;
+import org.usergrid.vx.experimental.IntraOp;
+import org.usergrid.vx.experimental.IntraService;
+import org.vertx.java.core.Handler;
+import org.vertx.java.core.eventbus.Message;
+import org.vertx.java.core.json.JsonObject;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Handler class for counter writes
+ *
+ * @author zznate
+ */
+public class CounterHandler implements Handler<Message<JsonObject>> {
+
+ @Override
+ public void handle(Message<JsonObject> event) {
+ Integer id = event.body.getInteger("id");
+ JsonObject params = event.body.getObject("op");
+ JsonObject state = event.body.getObject("state");
+
+ RowMutation rm = new RowMutation(HandlerUtils.determineKs(params,state),
+ IntraService.byteBufferForObject(params.getString("rowkey")));
+
+ rm.addCounter(new QueryPath(
+ HandlerUtils.determineCf(params,state),
+ null,
+ IntraService.byteBufferForObject(params.getString("name"))),
+ Long.parseLong(params.toMap().get("value").toString()));
+ List<IMutation> mutations = new ArrayList(1);
+ // TODO fix hard-coded consistency
+ mutations.add(new CounterMutation(rm, ConsistencyLevel.ONE));
+ HandlerUtils.write(mutations, event, id);
+ }
+}
@@ -1,13 +1,19 @@
package org.usergrid.vx.server.operations;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.service.StorageProxy;
import org.usergrid.vx.experimental.TypeHelper;
+import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
/**
* @author zznate
@@ -49,7 +55,11 @@ public static JsonArray readCf(ColumnFamily columnFamily, JsonObject state, Json
}
if (components.contains("value")) {
String clazz = state.getObject("meta").getObject("value").getString("clazz");
- m.put("value", TypeHelper.getTyped(clazz, ic.value()));
+ if ( ic instanceof CounterColumn ) {
+ m.put("value", ((CounterColumn)ic).total());
+ } else {
+ m.put("value", TypeHelper.getTyped(clazz, ic.value()));
+ }
}
if (components.contains("timestamp")) {
m.put("timestamp", ic.timestamp());
@@ -63,4 +73,17 @@ public static JsonArray readCf(ColumnFamily columnFamily, JsonObject state, Json
return array;
}
+ public static void write(List<IMutation> mutations, Message<JsonObject> event, Integer id) {
+ try {
+ // We don't want to hard code the consistency level but letting it slide for
+ // since it is also hard coded in IntraState
+ StorageProxy.mutate(mutations, ConsistencyLevel.ONE);
+
+ event.reply(new JsonObject().putString(id.toString(), "OK"));
+ } catch (WriteTimeoutException | UnavailableException | OverloadedException e) {
+ event.reply(new JsonObject()
+ .putString("exception", e.getMessage())
+ .putString("exceptionId", id.toString()));
+ }
+ }
}
@@ -1,8 +1,11 @@
package org.usergrid.vx.server.operations;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.IMutation;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.exceptions.OverloadedException;
@@ -39,17 +42,8 @@ public void handle(Message<JsonObject> event) {
rm.add(qp, IntraService.byteBufferForObject(IntraService.resolveObject(val, null, null, null, id)),
System.nanoTime(), ttl);
}
-
- try {
- // We don't want to hard code the consistency level but letting it slide for
- // since it is also hard coded in IntraState
- StorageProxy.mutate(Arrays.asList(rm), ConsistencyLevel.ONE);
-
- event.reply(new JsonObject().putString(id.toString(), "OK"));
- } catch (WriteTimeoutException | UnavailableException | OverloadedException e) {
- event.reply(new JsonObject()
- .putString("exception", e.getMessage())
- .putString("exceptionId", id.toString()));
- }
+ List<IMutation> mutations = new ArrayList<IMutation>();
+ mutations.add(rm);
+ HandlerUtils.write(mutations, event, id);
}
}
@@ -93,6 +93,19 @@ public void setAndGetColumn() throws Exception {
assertJSONEquals("The response was incorrect", expectedResponse, actualResponse);
}
+ @Test
+ @RequiresColumnFamily(ksName = "myks", cfName = "mycountercf", isCounter = true)
+ public void setAndGetCounter() throws Exception {
+ String setCounterJSON = loadJSON("counter_set.json");
+ submitRequest(setCounterJSON);
+
+ String getCounterJSON = loadJSON("counter_get.json");
+ String actualResponse = submitRequest(getCounterJSON);
+ String expectedResponse = loadJSON("counter_get_response.json");
+
+ assertJSONEquals("The counter response was incorrect", expectedResponse, actualResponse);
+ }
+
@Test
public void executeColumnSliceQuery() throws Exception {
String insertBeersJSON = loadJSON("insert_beers.json");
@@ -0,0 +1,39 @@
+{"e": [
+ {
+ "type": "SETKEYSPACE",
+ "op": {
+ "keyspace": "myks"
+ }
+ },
+ {
+ "type": "SETCOLUMNFAMILY",
+ "op": {
+ "columnfamily": "mycountercf"
+ }
+ },
+ {
+ "type": "ASSUME",
+ "op": {
+ "keyspace": "myks",
+ "columnfamily": "mycountercf",
+ "type": "column",
+ "clazz": "UTF8Type"
+ }
+ },
+ {
+ "type": "ASSUME",
+ "op": {
+ "keyspace": "myks",
+ "columnfamily": "mycountercf",
+ "type": "value",
+ "clazz": "LongType"
+ }
+ },
+ {
+ "type": "GET",
+ "op": {
+ "rowkey": "row_key1",
+ "name": "column1"
+ }
+ }
+]}
@@ -0,0 +1,14 @@
+{
+ "opsRes":{
+ "0":"OK",
+ "1":"OK",
+ "2":"OK",
+ "3":"OK",
+ "4":[
+ {
+ "name":"column1",
+ "value":4
+ }
+ ]},
+ "exception":null, "exceptionId":null
+}
@@ -0,0 +1,22 @@
+{"e": [
+ {
+ "type": "SETKEYSPACE",
+ "op": {
+ "keyspace": "myks"
+ }
+ },
+ {
+ "type": "SETCOLUMNFAMILY",
+ "op": {
+ "columnfamily": "mycountercf"
+ }
+ },
+ {
+ "type": "COUNTER",
+ "op": {
+ "rowkey": "row_key1",
+ "name": "column1",
+ "value": "4"
+ }
+ }
+]}

0 comments on commit 06e7d57

Please sign in to comment.