Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Really nice that we have factored out the two big readcf methods. I had ... #206

Merged
merged 1 commit into from

2 participants

@edwardcapriolo
Collaborator

Lots of cleanup espcially of the big / mostly duplicate /read cf function

@zznate zznate merged commit 7698211 into master
@zznate zznate deleted the result-mode branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
1  src/main/java/org/usergrid/vx/experimental/Operations.java
@@ -72,6 +72,7 @@
public static final String TTL = "ttl";
public static final String TIMESTAMP = "timestamp";
public static final String AUTOTIMESTAMP = "autotimestamp";
+ public static final String OK = "OK";
private Operations() {}
View
1  src/main/java/org/usergrid/vx/server/IntravertCassandraServer.java
@@ -111,6 +111,7 @@ public static void registerOperationHandlers(Vertx x) {
x.eventBus().registerHandler("operations.filtermode", new FilterModeHandler());
x.eventBus().registerHandler("operations.createmultiprocess", new CreateMultiProcessHandler(x.eventBus()));
x.eventBus().registerHandler("operations.createserviceprocess", new CreateServiceProcessHandler(x.eventBus()));
+ x.eventBus().registerHandler("operations.resultmode", new ResultModeHandler());
}
}
View
5 src/main/java/org/usergrid/vx/server/operations/BatchHandler.java
@@ -11,10 +11,11 @@
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
-public class BatchHandler implements Handler<Message<JsonObject>> {
+public class BatchHandler extends AbstractIntravertHandler {
@Override
- public void handle(Message<JsonObject> event) {
+ public void handleUser(Message<JsonObject> event) {
+ System.out.println("I gots da batch "+event.body);
Integer id = event.body.getInteger("id");
JsonObject params = event.body.getObject("op");
JsonObject state = event.body.getObject("state");
View
4 src/main/java/org/usergrid/vx/server/operations/CounterHandler.java
@@ -32,4 +32,6 @@ public void handleUser(Message<JsonObject> event) {
mutations.add(new CounterMutation(rm, HandlerUtils.instance.determineConsistencyLevel(state)));
HandlerUtils.instance.write(mutations, event, id, state);
}
-}
+}
+
+
View
89 src/main/java/org/usergrid/vx/server/operations/HandlerUtils.java
@@ -165,64 +165,11 @@ public String determineKs(JsonObject params, JsonObject state, JsonObject row) {
return ks;
}
- public JsonArray readCf(ColumnFamily columnFamily, JsonObject state, JsonObject params) {
- JsonArray components = state.getArray("components");
- JsonArray array = new JsonArray();
- Iterator<IColumn> it = columnFamily.iterator();
- while (it.hasNext()) {
- IColumn ic = it.next();
- if (ic.isLive()) {
- HashMap m = new HashMap();
- if (components.contains("name")) {
- JsonObject columnMetadata = findMetaData(columnFamily, state, "column");
- if (columnMetadata == null) {
- m.put("name", TypeHelper.getBytes(ic.name()));
- } else {
- String clazz = columnMetadata.getString("clazz");
- Object name = TypeHelper.getTyped(clazz, ic.name());
- if (name instanceof ByteBuffer) {
- m.put("name", TypeHelper.getBytes(ic.name()));
- } else {
- m.put("name", TypeHelper.getTyped(clazz, ic.name()));
- }
- }
- }
- if (components.contains("value")) {
- if (ic instanceof CounterColumn) {
- m.put("value", ((CounterColumn) ic).total());
- } else {
- JsonObject valueMetadata = findColumnMetaData(columnFamily, state, ic.name()
- .duplicate());
- if (valueMetadata == null) {
- valueMetadata = findMetaData(columnFamily, state, "value");
- }
- if (valueMetadata == null) {
- valueMetadata = findRangedMetaData(columnFamily, state, ic.name().duplicate());
- }
- if (valueMetadata == null) {
- m.put("value", TypeHelper.getBytes(ic.value()));
- } else {
- String clazz = valueMetadata.getString("clazz");
- Object value = TypeHelper.getTyped(clazz, ic.value());
- if (value instanceof ByteBuffer) {
- m.put("value", TypeHelper.getBytes(ic.value()));
- } else {
- m.put("value", value);
- }
- }
- }
- }
- if (components.contains("timestamp")) {
- m.put("timestamp", ic.timestamp());
- }
- if (components.contains("markeddelete")) {
- m.put("markeddelete", ic.getMarkedForDeleteAt());
- }
- array.addObject(new JsonObject(m));
- }
- }
- return array;
+
+ public JsonArray readCf(ColumnFamily columnFamily, JsonObject state) {
+ return internalCfRead(columnFamily, state);
}
+
public JsonObject findMetaData(ColumnFamily cf, JsonObject state, String type) {
JsonObject meta = state.getObject("meta");
@@ -286,12 +233,12 @@ public JsonObject findColumnMetaData(ColumnFamily cf, JsonObject state, ByteBuff
}
}
- public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
- Handler<Message<JsonArray>> filterReplyHandler) {
+ public JsonArray internalCfRead(ColumnFamily columnFamily, JsonObject state){
JsonArray components = state.getArray("components");
JsonArray array = new JsonArray();
-
- for (IColumn column : columnFamily) {
+ Iterator<IColumn> it = columnFamily.iterator();
+ while (it.hasNext()) {
+ IColumn column = it.next();
if (column.isLive()) {
HashMap<String,Object> m = new HashMap<>(4);
if (components.contains("name")) {
@@ -301,6 +248,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
} else {
String clazz = columnMetadata.getString("clazz");
m.put("name", TypeHelper.getTyped(clazz, column.name()));
+ Object name = TypeHelper.getTyped(clazz, column.name());
+ if (name instanceof ByteBuffer) {
+ m.put("name", TypeHelper.getBytes(column.name()));
+ } else {
+ m.put("name", TypeHelper.getTyped(clazz, column.name()));
+ }
}
}
if (components.contains("value")) {
@@ -319,7 +272,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
m.put("value", ByteBufferUtil.getArray(column.value()));
} else {
String clazz = valueMetaData.getString("clazz");
- m.put("value", TypeHelper.getTyped(clazz, column.value()));
+ Object value = TypeHelper.getTyped(clazz, column.value());
+ if (value instanceof ByteBuffer) {
+ m.put("value", TypeHelper.getBytes(column.value()));
+ } else {
+ m.put("value", value);
+ }
}
}
}
@@ -332,7 +290,12 @@ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
array.addObject(new JsonObject(m));
}
}
-
+ return array;
+ }
+
+ public void readCf(ColumnFamily columnFamily, JsonObject state, EventBus eb,
+ Handler<Message<JsonArray>> filterReplyHandler) {
+ JsonArray array = internalCfRead(columnFamily, state);
String filter = state.getString("currentFilter");
eb.send("filters." + filter, array, filterReplyHandler);
}
View
34 src/main/java/org/usergrid/vx/server/operations/ReadHandler.java
@@ -1,6 +1,9 @@
package org.usergrid.vx.server.operations;
+import java.nio.ByteBuffer;
+
import org.apache.cassandra.db.ColumnFamily;
+import org.usergrid.vx.experimental.Operations;
import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.EventBus;
import org.vertx.java.core.eventbus.Message;
@@ -13,6 +16,7 @@
private JsonObject state;
private Message<JsonObject> event;
private EventBus eb;
+ private ByteBuffer rowKey;
public ReadHandler(Message<JsonObject> event, EventBus eb) {
this.event = event;
@@ -24,25 +28,43 @@ public ReadHandler(Message<JsonObject> event, EventBus eb) {
public void handleRead(ColumnFamily cf) {
final Integer id = event.body.getInteger("id");
JsonArray array;
+ final JsonObject resultMode = HandlerUtils.instance.getResultMode(state);
if (cf == null) {
event.reply(new JsonObject().putArray(id.toString(), new JsonArray()));
} else {
String filter = state.getString("currentFilter");
if (filter == null) {
- array = HandlerUtils.instance.readCf(cf, state, params);
- JsonObject resultMode = HandlerUtils.instance.getResultMode(state);
+ array = HandlerUtils.instance.internalCfRead(cf, state);
JsonObject response = new JsonObject();
if (resultMode == null){
+
response.putArray(id.toString(), array);
+ event.reply(response);
} else {
- //send to batch handler?
+ //System.out.println("This will fail fix me ");
+ //eb.send("operations.batchset", array);
}
- event.reply(response);
+
} else {
HandlerUtils.instance.readCf(cf, state, eb, new Handler<Message<JsonArray>>() {
@Override
- public void handle(Message<JsonArray> filterEvent) {
- event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ public void handle(final Message<JsonArray> filterEvent) {
+
+ if (resultMode == null){
+ event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ } else {
+ JsonObject obj = new JsonObject();
+ obj.putObject(Operations.OP, new JsonObject().putArray("rows", filterEvent.body) );
+ obj.putObject(Operations.STATE, state);
+ obj.putNumber(Operations.ID, 999);
+ eb.send("operations.batchset", obj, new Handler<Message<JsonObject>>(){
+ @Override
+ public void handle(Message<JsonObject> arg0) {
+ event.reply(new JsonObject().putArray(id.toString(), filterEvent.body));
+ }
+ });
+ }
+
}
});
}
View
4 src/main/java/org/usergrid/vx/server/operations/ResultModeHandler.java
@@ -13,6 +13,8 @@
*/
@Override
public void handleUser(Message<JsonObject> event) {
+ System.out.println("result mode"+event.body);
+ Integer id = event.body.getInteger("id");
JsonObject params = event.body.getObject(Operations.OP);
JsonObject state = event.body.getObject(Operations.STATE);
boolean on = params.getBoolean(Operations.ON);
@@ -21,6 +23,8 @@ public void handleUser(Message<JsonObject> event) {
} else {
HandlerUtils.instance.deactivateResultMode(state);
}
+ event.reply(new JsonObject().putString(id.toString(), Operations.OK)
+ .putObject(Operations.STATE, state));
}
}
View
5 src/test/java/org/usergrid/vx/experimental/ResultModeITest.java
@@ -4,19 +4,20 @@
import java.util.Map;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.usergrid.vx.client.IntraClient2;
+@Ignore
@RunWith(CassandraRunner.class)
@RequiresKeyspace(ksName = "rmks")
@RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
public class ResultModeITest {
- @Ignore
+
@Test
@RequiresColumnFamily(ksName = "rmks", cfName = "rmcf")
-
public void filterTest() throws Exception {
IntraReq preq = new IntraReq();
Something went wrong with that request. Please try again.