Skip to content

Commit

Permalink
interim commit for adding JsonArrays as message types on event bus
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Jun 14, 2012
1 parent eaf5524 commit a534e34
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 11 deletions.
23 changes: 23 additions & 0 deletions src/main/java/org/vertx/java/core/eventbus/EventBus.java
Expand Up @@ -19,6 +19,7 @@
import org.vertx.java.core.AsyncResultHandler;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;

/**
Expand Down Expand Up @@ -72,6 +73,21 @@ public interface EventBus {
*/
void send(String address, JsonObject message);

/**
* Send a JSON array as a message
* @param address The address to send it to
* @param message The message
* @param replyHandler Reply handler will be called when any reply from the recipient is received
*/
void send(String address, JsonArray message, Handler<Message<JsonArray>> replyHandler);

/**
* Send a JSON array as a message
* @param address The address to send it to
* @param message The message
*/
void send(String address, JsonArray message);

/**
* Send a Buffer as a message
* @param address The address to send it to
Expand Down Expand Up @@ -244,6 +260,13 @@ public interface EventBus {
*/
void publish(String address, JsonObject message);

/**
* Publish a JSON array as a message
* @param address The address to publish it to
* @param message The message
*/
void publish(String address, JsonArray message);

/**
* Publish a Buffer as a message
* @param address The address to publish it to
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.vertx.java.core.eventbus.impl.hazelcast.HazelcastClusterManager;
import org.vertx.java.core.impl.Context;
import org.vertx.java.core.impl.VertxInternal;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;
Expand Down Expand Up @@ -87,13 +88,21 @@ public DefaultEventBus(VertxInternal vertx, int port, String hostname) {
}

public void send(String address, JsonObject message, final Handler<Message<JsonObject>> replyHandler) {
sendOrPub(new JsonMessage(true, address, message), replyHandler);
sendOrPub(new JsonObjectMessage(true, address, message), replyHandler);
}

public void send(String address, JsonObject message) {
send(address, message, null);
}

public void send(String address, JsonArray message, final Handler<Message<JsonArray>> replyHandler) {
sendOrPub(new JsonArrayMessage(true, address, message), replyHandler);
}

public void send(String address, JsonArray message) {
send(address, message, null);
}

public void send(String address, Buffer message, final Handler<Message<Buffer>> replyHandler) {
sendOrPub(new BufferMessage(true, address, message), replyHandler);
}
Expand Down Expand Up @@ -183,7 +192,11 @@ public void send(String address, Byte message) {
}

public void publish(String address, JsonObject message) {
sendOrPub(new JsonMessage(false, address, message), null);
sendOrPub(new JsonObjectMessage(false, address, message), null);
}

public void publish(String address, JsonArray message) {
sendOrPub(new JsonArrayMessage(false, address, message), null);
}

public void publish(String address, Buffer message) {
Expand Down
@@ -0,0 +1,95 @@
/*
* Copyright 2011-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.vertx.java.core.eventbus.impl;

import org.jboss.netty.util.CharsetUtil;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonArray;
import org.vertx.java.core.json.JsonObject;
import org.vertx.java.core.logging.Logger;
import org.vertx.java.core.logging.impl.LoggerFactory;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
class JsonArrayMessage extends BaseMessage<JsonArray> {

private static final Logger log = LoggerFactory.getLogger(JsonArrayMessage.class);

private byte[] encoded;

JsonArrayMessage(boolean send, String address, JsonArray body) {
super(send, address, body);
}

private JsonArrayMessage(JsonArrayMessage other) {
super(other.send, other.address, other.body == null ? null : other.body.copy());
this.replyAddress = other.replyAddress;
this.bus = other.bus;
this.sender = other.sender;
}

public JsonArrayMessage(Buffer readBuff) {
super(readBuff);
}

protected void readBody(int pos, Buffer readBuff) {
boolean isNull = readBuff.getByte(pos) == (byte)0;
if (!isNull) {
pos++;
int strLength = readBuff.getInt(pos);
pos += 4;
byte[] bytes = readBuff.getBytes(pos, pos + strLength);
String str = new String(bytes, CharsetUtil.UTF_8);
body = new JsonArray(str);
}
}

protected void writeBody(Buffer buff) {
if (body == null) {
buff.appendByte((byte)0);
} else {
buff.appendByte((byte)1);
buff.appendInt(encoded.length);
buff.appendBytes(encoded);
}
}

protected int getBodyLength() {
if (body == null) {
return 1;
} else {
String strJson = body.encode();
encoded = strJson.getBytes(CharsetUtil.UTF_8);
return 1 + 4 + encoded.length;
}
}

protected Message copy() {
return new JsonArrayMessage(this);
}

protected byte type() {
return MessageFactory.TYPE_JSON;
}

protected BaseMessage createReplyMessage(JsonArray reply) {
return new JsonArrayMessage(true, replyAddress, reply);
}

}
Expand Up @@ -17,7 +17,6 @@
package org.vertx.java.core.eventbus.impl;

import org.jboss.netty.util.CharsetUtil;
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.json.JsonObject;
Expand All @@ -27,24 +26,24 @@
/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
class JsonMessage extends BaseMessage<JsonObject> {
class JsonObjectMessage extends BaseMessage<JsonObject> {

private static final Logger log = LoggerFactory.getLogger(JsonMessage.class);
private static final Logger log = LoggerFactory.getLogger(JsonObjectMessage.class);

private byte[] encoded;

JsonMessage(boolean send, String address, JsonObject body) {
JsonObjectMessage(boolean send, String address, JsonObject body) {
super(send, address, body);
}

private JsonMessage(JsonMessage other) {
private JsonObjectMessage(JsonObjectMessage other) {
super(other.send, other.address, other.body == null ? null : other.body.copy());
this.replyAddress = other.replyAddress;
this.bus = other.bus;
this.sender = other.sender;
}

public JsonMessage(Buffer readBuff) {
public JsonObjectMessage(Buffer readBuff) {
super(readBuff);
}

Expand Down Expand Up @@ -81,15 +80,15 @@ protected int getBodyLength() {
}

protected Message copy() {
return new JsonMessage(this);
return new JsonObjectMessage(this);
}

protected byte type() {
return MessageFactory.TYPE_JSON;
}

protected BaseMessage createReplyMessage(JsonObject reply) {
return new JsonMessage(true, replyAddress, reply);
return new JsonObjectMessage(true, replyAddress, reply);
}

}
Expand Up @@ -65,7 +65,7 @@ static BaseMessage read(Buffer buff) {
case TYPE_STRING:
return new StringMessage(buff);
case TYPE_JSON:
return new JsonMessage(buff);
return new JsonObjectMessage(buff);
default:
throw new IllegalStateException("Invalid type " + type);
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/vertx/java/core/json/JsonArray.java
Expand Up @@ -135,6 +135,10 @@ public String encode() throws EncodeException {
return Json.encode(this.list);
}

public JsonArray copy() {
return new JsonArray(encode());
}

public boolean equals(Object o) {
if (this == o) return true;

Expand Down

0 comments on commit a534e34

Please sign in to comment.