Permalink
Browse files

Merge pull request #206 from zznate/result-mode

Really nice that we have factored out the two big readcf methods. I had ...
  • Loading branch information...
2 parents 7c84acc + ad50de5 commit 7698211feb304f030aa29daa4acfde36fcad305f @zznate committed May 24, 2013
@@ -72,6 +72,7 @@
public static final String TTL = "ttl";
public static final String TIMESTAMP = "timestamp";
public static final String AUTOTIMESTAMP = "autotimestamp";
+ public static final String OK = "OK";
private Operations() {}
@@ -111,6 +111,7 @@ public static void registerOperationHandlers(Vertx x) {
x.eventBus().registerHandler("operations.filtermode", new FilterModeHandler());
x.eventBus().registerHandler("operations.createmultiprocess", new CreateMultiProcessHandler(x.eventBus()));
x.eventBus().registerHandler("operations.createserviceprocess", new CreateServiceProcessHandler(x.eventBus()));
+ x.eventBus().registerHandler("operations.resultmode", new ResultModeHandler());
}
}
@@ -11,10 +11,11 @@
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
-public class BatchHandler implements Handler<Message<JsonObject>> {
+public class BatchHandler extends AbstractIntravertHandler {
@Override
- public void handle(Message<JsonObject> event) {
+ public void handleUser(Message<JsonObject> event) {
+ System.out.println("I gots da batch "+event.body);
Integer id = event.body.getInteger("id");
JsonObject params = event.body.getObject("op");
JsonObject state = event.body.getObject("state");
@@ -32,4 +32,6 @@ public void handleUser(Message<JsonObject> event) {
mutations.add(new CounterMutation(rm, HandlerUtils.instance.determineConsistencyLevel(state)));
HandlerUtils.instance.write(mutations, event, id, state);
}
-}
+}
+
+
@@ -165,64 +165,11 @@ public String determineKs(JsonObject params, JsonObject state, JsonObject row) {
return ks;
}
- public JsonArray readCf(ColumnFamily columnFamily, JsonObject state, JsonObject params) {
- JsonArray components = state.getArray("components");
- JsonArray array = new JsonArray();
- Iterator<IColumn> it = columnFamily.iterator();
- while (it.hasNext()) {
- IColumn ic = it.next();
- if (ic.isLive()) {
- HashMap m = new HashMap();
- if (components.contains("name")) {
- JsonObject columnMetadata = findMetaData(columnFamily, state, "column");
- if (columnMetadata == null) {
- m.put("name", TypeHelper.getBytes(ic.name()));
- } else {
- String clazz = columnMetadata.getString("clazz");
- Object name = TypeHelper.getTyped(clazz, ic.name());
- if (name instanceof ByteBuffer) {
- m.put("name", TypeHelper.getBytes(ic.name()));
- } else {
- m.put("name", TypeHelper.getTyped(clazz, ic.name()));
- }
- }
- }
- if (components.contains("value")) {
- if (ic instanceof CounterColumn) {
- m.put("value", ((CounterColumn) ic).total());
- } else {
- JsonObject valueMetadata = findColumnMetaData(columnFamily, state, ic.name()
- .duplicate());
- if (valueMetadata == null) {
- valueMetadata = findMetaData(columnFamily, state, "value");
- }
- if (valueMetadata == null) {
- valueMetadata = findRangedMetaData(columnFamily, state, ic.name().duplicate());
- }
- if (valueMetadata == null) {
- m.put("value", TypeHelper.getBytes(ic.value()));
- } else {
- String clazz = valueMetadata.getString("clazz");
- Object value = TypeHelper.getTyped(clazz, ic.value());
- if (value instanceof ByteBuffer) {
- m.put("value", TypeHelper.getBytes(ic.value()));
- } else {
- m.put("value", value);
- }
- }
- }
- }
- if (components.contains("timestamp")) {
- m.put("timestamp", ic.timestamp());
- }
- if (components.contains("markeddelete")) {
- m.put("markeddelete", ic.getMarkedForDeleteAt());
- }
- array.addObject(new JsonObject(m));
- }
- }
- return array;
+
+ public JsonArray readCf(ColumnFamily columnFamily, JsonObject state) {
+ return internalCfRead(columnFamily, state);
}
+
public JsonObject findMetaData(ColumnFamily cf, JsonObject state, String type) {
JsonObject meta = state.getObject("meta");
@@ -286,12 +233,12 @@ public JsonObject findColumnMetaData(ColumnFamily cf, JsonObject state, ByteBuff
}
}
- public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
- Handler<Message<JsonArray>> filterReplyHandler) {
+ public JsonArray internalCfRead(ColumnFamily columnFamily, JsonObject state){
JsonArray components = state.getArray("components");
JsonArray array = new JsonArray();
-
- for (IColumn column : columnFamily) {
+ Iterator<IColumn> it = columnFamily.iterator();
+ while (it.hasNext()) {
+ IColumn column = it.next();
if (column.isLive()) {
HashMap<String,Object> m = new HashMap<>(4);
if (components.contains("name")) {
@@ -301,6 +248,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
} else {
String clazz = columnMetadata.getString("clazz");
m.put("name", TypeHelper.getTyped(clazz, column.name()));
+ Object name = TypeHelper.getTyped(clazz, column.name());
+ if (name instanceof ByteBuffer) {
+ m.put("name", TypeHelper.getBytes(column.name()));
+ } else {
+ m.put("name", TypeHelper.getTyped(clazz, column.name()));
+ }
}
}
if (components.contains("value")) {
@@ -319,7 +272,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
m.put("value", ByteBufferUtil.getArray(column.value()));
} else {
String clazz = valueMetaData.getString("clazz");
- m.put("value", TypeHelper.getTyped(clazz, column.value()));
+ Object value = TypeHelper.getTyped(clazz, column.value());
+ if (value instanceof ByteBuffer) {
+ m.put("value", TypeHelper.getBytes(column.value()));
+ } else {
+ m.put("value", value);
+ }
}
}
}
@@ -332,7 +290,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
array.addObject(new JsonObject(m));
}
}
-
+ return array;
+ }
+
+ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
+ Handler<Message<JsonArray>> filterReplyHandler) {
+ JsonArray array = internalCfRead(columnFamily, state);
String filter = state.getString("currentFilter");
eb.send("filters." + filter, array, filterReplyHandler);
}
@@ -1,6 +1,9 @@
package org.usergrid.vx.server.operations;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.db.ColumnFamily;
+import org.usergrid.vx.experimental.Operations;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
@@ -13,6 +16,7 @@
private JsonObject state;
private Message<JsonObject> event;
private EventBus eb;
+ private ByteBuffer rowKey;
public ReadHandler(Message<JsonObject> event, EventBus eb) {
this.event = event;
@@ -24,25 +28,43 @@ public ReadHandler(Message<JsonObject> event, EventBus eb) {
public void handleRead(ColumnFamily cf) {
final Integer id = event.body.getInteger("id");
JsonArray array;
+ final JsonObject resultMode = HandlerUtils.instance.getResultMode(state);
if (cf == null) {
event.reply(new JsonObject().putArray(id.toString(), new JsonArray()));
} else {
String filter = state.getString("currentFilter");
if (filter == null) {
- array = HandlerUtils.instance.readCf(cf, state, params);
- JsonObject resultMode = HandlerUtils.instance.getResultMode(state);
+ array = HandlerUtils.instance.internalCfRead(cf, state);
JsonObject response = new JsonObject();
if (resultMode == null){
+
response.putArray(id.toString(), array);
+ event.reply(response);
} else {
- //send to batch handler?
+ //System.out.println("This will fail fix me ");
+ //eb.send("operations.batchset", array);
}
- event.reply(response);
+
} else {
HandlerUtils.instance.readCf(cf, state, eb, new Handler<Message<JsonArray>>() {
@Override
- public void handle(Message<JsonArray> filterEvent) {
- event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ public void handle(final Message<JsonArray> filterEvent) {
+
+ if (resultMode == null){
+ event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ } else {
+ JsonObject obj = new JsonObject();
+ obj.putObject(Operations.OP, new JsonObject().putArray("rows", filterEvent.body) );
+ obj.putObject(Operations.STATE, state);
+ obj.putNumber(Operations.ID, 999);
+ eb.send("operations.batchset", obj, new Handler<Message<JsonObject>>(){
+ @Override
+ public void handle(Message<JsonObject> arg0) {
+ event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ }
+ });
+ }
+
}
});
}
@@ -13,6 +13,8 @@
*/
@Override
public void handleUser(Message<JsonObject> event) {
+ System.out.println("result mode"+event.body);
+ Integer id = event.body.getInteger("id");
JsonObject params = event.body.getObject(Operations.OP);
JsonObject state = event.body.getObject(Operations.STATE);
boolean on = params.getBoolean(Operations.ON);
@@ -21,6 +23,8 @@ public void handleUser(Message<JsonObject> event) {
} else {
HandlerUtils.instance.deactivateResultMode(state);
}
+ event.reply(new JsonObject().putString(id.toString(), Operations.OK)
+ .putObject(Operations.STATE, state));
}
}
@@ -4,19 +4,20 @@
import java.util.Map;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.usergrid.vx.client.IntraClient2;
+@Ignore
@RunWith(CassandraRunner.class)
@RequiresKeyspace(ksName = "rmks")
@RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
public class ResultModeITest {
- @Ignore
+
@Test
@RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
-
public void filterTest() throws Exception {
IntraReq preq = new IntraReq();

0 comments on commit 7698211

Please sign in to comment.