Permalink
Browse files

Renaming request, response return meta information to "RInfo"

  • Loading branch information...
1 parent e678890 commit 3fc5481cbb78253c5e13ed63322042ac0fe738d3 Anand Kesari committed Jan 28, 2011
@@ -281,10 +281,10 @@ private void dispatchToAllClients(byte[] b) {
private void dispatchResponse(EventWrapper event, byte[] b) {
Response res = (Response) event.getEvent();
- Request.Info rinfo = res.getRequestInfo();
+ Request.RInfo rinfo = res.getRInfo();
- if (rinfo instanceof Request.ClientInfo) {
- UUID uuid = ((Request.ClientInfo) rinfo).getRequesterUUID();
+ if (rinfo instanceof Request.ClientRInfo) {
+ UUID uuid = ((Request.ClientRInfo) rinfo).getRequesterUUID();
ClientConnection c = clients.get(uuid);
@@ -436,6 +436,7 @@ public void run() {
Object event = ew.getEvent();
if (event instanceof Request) {
decorateRequest((Request) event);
+ logger.info("Decorated client request: " + ew.toString());
}
injectEvent(ew);
@@ -450,10 +451,10 @@ public void run() {
private void decorateRequest(Request r) {
// add UUID of client into request.
- Request.Info info = r.getInfo();
+ Request.RInfo info = r.getRInfo();
- if (info != null && info instanceof Request.ClientInfo)
- ((Request.ClientInfo) info).setRequesterUUID(ClientConnection.this.uuid);
+ if (info != null && info instanceof Request.ClientRInfo)
+ ((Request.ClientRInfo) info).setRequesterUUID(ClientConnection.this.uuid);
}
};
@@ -19,13 +19,20 @@
import io.s4.message.SinglePERequest;
import java.lang.reflect.Type;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.InstanceCreator;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonSerializationContext;
+import com.google.gson.JsonSerializer;
import flexjson.JSONDeserializer;
import flexjson.JSONSerializer;
@@ -116,22 +123,24 @@ public static void main(String[] argv) throws Exception {
String[] query = { "name", "count", "freq" };
String target[] = { "ACDW", "11" };
- io.s4.message.Request.ClientInfo rinfo = new io.s4.message.Request.ClientInfo();
+ io.s4.message.Request.ClientRInfo rinfo = new io.s4.message.Request.ClientRInfo();
rinfo.setRequesterUUID(UUID.randomUUID());
Request req = new io.s4.message.SinglePERequest(Arrays.asList(target),
Arrays.asList(query),
rinfo);
System.out.println(req.toString());
- InstanceCreator<io.s4.message.Request.Info> infoCreator = new InstanceCreator<io.s4.message.Request.Info>() {
- public io.s4.message.Request.Info createInstance(Type type) {
- return new io.s4.message.Request.ClientInfo();
+ InstanceCreator<io.s4.message.Request.RInfo> infoCreator = new InstanceCreator<io.s4.message.Request.RInfo>() {
+ public io.s4.message.Request.RInfo createInstance(Type type) {
+ return new io.s4.message.Request.ClientRInfo();
}
};
- Gson gson = (new GsonBuilder()).registerTypeAdapter(io.s4.message.Request.Info.class,
+ Gson gson = (new GsonBuilder()).registerTypeAdapter(io.s4.message.Request.RInfo.class,
infoCreator)
+ .registerTypeAdapter(Object.class,
+ new ObjectTypeAdapter())
.create();
System.out.println("gson: " + gson.toJson(req));
@@ -140,5 +149,52 @@ public static void main(String[] argv) throws Exception {
System.out.println(b.toJson(req));
System.out.println(b.toJson(Arrays.asList(query)));
+
+ System.out.println("----------------------------------------------");
+
+ ArrayList<SSTest> list = new ArrayList<SSTest>();
+
+ SSTest ss1 = new SSTest();
+ ss1.str = "list-element-1";
+ SSTest ss2 = new SSTest();
+ ss2.str = "list-element-2";
+
+ list.add(ss1);
+ list.add(ss2);
+
+ Map<String, Object> listmap = new HashMap<String, Object>();
+ listmap.put("ll", list);
+
+ MapTest mt = new MapTest();
+ mt.map = listmap;
+
+ Object listmapobj = listmap;
+
+ System.out.println("list: " + gson.toJson(list));
+ System.out.println("listmap: " + gson.toJson(listmap));
+ System.out.println("listmapobj: " + gson.toJson(listmapobj));
+ System.out.println("mapobject: " + gson.toJson(mt));
+ }
+
+ private static class SSTest {
+ public String str;
}
+
+ private static class MapTest {
+ Map<String, Object> map;
+ Map gmap;
+ }
+
+ private static class ObjectTypeAdapter implements JsonSerializer<Object> {
+ public JsonElement serialize(Object src, Type typeOfSrc,
+ JsonSerializationContext context) {
+
+ if (src.getClass() != Object.class) {
+ return context.serialize(src, src.getClass());
+ }
+
+ return new JsonObject();
+ }
+ }
+
}
@@ -131,7 +131,7 @@ public void emit(int partitionId, EventWrapper eventWrapper) {
// Add partition id of sender
private void decorateRequest(Request r) {
- Request.Info rinfo = r.getInfo();
+ Request.RInfo rinfo = r.getRInfo();
if (rinfo != null && listener != null)
rinfo.setPartition(listener.getId());
@@ -32,7 +32,7 @@
private final List<String> query;
- public PrototypeRequest(List<String> query, Info info) {
+ public PrototypeRequest(List<String> query, RInfo info) {
this.query = query;
this.rinfo = info;
}
@@ -17,15 +17,25 @@
import io.s4.dispatcher.partitioner.CompoundKeyInfo;
import io.s4.dispatcher.partitioner.Hasher;
+import io.s4.util.GsonUtil;
+import java.lang.reflect.Type;
import java.util.List;
import java.util.UUID;
+import com.google.gson.InstanceCreator;
+
abstract public class Request {
- protected Info rinfo;
+ protected RInfo rinfo = nullRInfo;
+
+ public final static RInfo nullRInfo = new NullRInfo();
+
+ /**
+ * Requester/Return information
+ */
+ abstract public static class RInfo {
- abstract public static class Info {
private long id = 0;
/**
@@ -70,9 +80,20 @@ public void setPartition(int partition) {
this.partition = partition;
}
+ // Tell Gson how to instantiate one of these: create a ClientRInfo
+ static {
+ InstanceCreator<RInfo> creator = new InstanceCreator<RInfo>() {
+ public io.s4.message.Request.RInfo createInstance(Type type) {
+ return new io.s4.message.Request.ClientRInfo();
+ }
+ };
+
+ GsonUtil.registerTypeAdapter(RInfo.class, creator);
+ }
+
}
- public static class ClientInfo extends Info {
+ public static class ClientRInfo extends RInfo {
private UUID requesterUUID = null;
/**
@@ -96,7 +117,7 @@ public String toString() {
}
}
- public static class PEInfo extends Info {
+ public static class PERInfo extends RInfo {
private String requesterKey = null;
/**
@@ -120,15 +141,41 @@ public String toString() {
}
}
+ public static class NullRInfo extends RInfo {
+ public NullRInfo() {
+ super.stream = "@null";
+ super.partition = -1;
+ }
+ }
+
/**
* Query metainformation.
*
* @return Info representing origin of request.
*/
- public Info getInfo() {
+ public RInfo getRInfo() {
return rinfo;
}
+ /**
+ * Query metainformation.
+ */
+ public void setRInfo(RInfo rinfo) {
+ this.rinfo = rinfo;
+ }
+
+ /**
+ * Partition itself. This is used by the default partitioner.
+ *
+ * @param h
+ * hasher
+ * @param delim
+ * delimiter used to concatenate compound key values
+ * @param partCount
+ * number of partitions
+ * @return list of compound keys: one event may have to be sent to multiple
+ * nodes.
+ */
abstract public List<CompoundKeyInfo> partition(Hasher h, String delim,
int partCount);
}
@@ -64,8 +64,8 @@ public Request getRequest() {
return request;
}
- public SinglePERequest.Info getRequestInfo() {
- return (request != null ? request.getInfo() : null);
+ public Request.RInfo getRInfo() {
+ return (request != null ? request.getRInfo() : null);
}
public String toString() {
@@ -75,7 +75,7 @@ public String toString() {
public List<CompoundKeyInfo> partition(int partCount) {
// partition id is available from the request info object
- int p = this.getRequestInfo().getPartition();
+ int p = this.getRInfo().getPartition();
List<CompoundKeyInfo> partitionInfoList = null;
if (p >= 0 && p < partCount) {
@@ -37,7 +37,7 @@
private final List<String> query;
- public SinglePERequest(List<String> target, List<String> query, Info info) {
+ public SinglePERequest(List<String> target, List<String> query, RInfo info) {
this.target = target;
this.query = query;
this.rinfo = info;
@@ -50,7 +50,7 @@ public void process(EventWrapper e, PrototypeWrapper p) {
protected void execute(EventWrapper e, PrototypeWrapper p) {
List<CompoundKeyInfo> keyInfoList = e.getCompoundKeys();
Object event = e.getEvent();
-
+
if (event instanceof SinglePERequest) {
// Handle Requests to individual PEs
if (keyInfoList.isEmpty())
@@ -63,14 +63,14 @@ protected void execute(EventWrapper e, PrototypeWrapper p) {
ProcessingElement pe = p.lookupPE(keyVal);
Response response = ((SinglePERequest) event).evaluate(pe);
- String stream = response.getRequestInfo().getStream();
+ String stream = response.getRInfo().getStream();
dispatcher.dispatchEvent(stream, response);
} else if (event instanceof PrototypeRequest) {
// Or handle aggregate requests to Prototypes.
Response response = ((PrototypeRequest) event).evaluate(p);
- String stream = response.getRequestInfo().getStream();
+ String stream = response.getRInfo().getStream();
dispatcher.dispatchEvent(stream, response);
}

0 comments on commit 3fc5481

Please sign in to comment.