Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

impling pp

  • Loading branch information...
commit aa06a0bc57a101f75583bcc8eb684cdb6f483d89 1 parent caa47ae
@leonlee authored
View
26 protocol/src/main/groovy/org/riderzen/ogs/protocol/ProtocolParser.groovy
@@ -1,5 +1,8 @@
package org.riderzen.ogs.protocol
+import org.msgpack.MessagePack
+import org.msgpack.unpacker.Unpacker
+import org.vertx.groovy.core.buffer.Buffer
import org.vertx.groovy.core.parsetools.RecordParser
import org.vertx.java.busmods.BusModBase
@@ -20,9 +23,26 @@ class ProtocolParser extends BusModBase {
def init() {
address = getOptionalStringConfig("address", "org.riderzen.ogs.protocol")
- eb.registerHandler(address) { message ->
- logger.debug("received message $message")
-
+ eb.registerHandler(address) {Buffer message ->
+ logger.debug("received message length $message.length")
+
+ MessagePack messagePack = new MessagePack()
+ Unpacker unpacker = messagePack.createUnpacker(new ByteArrayInputStream(message.getBytes()))
+ int size = unpacker.readArrayBegin()
+ String operation;
+ Object[] params;
+ for (i in 0..size) {
+ operation = unpacker.readString()
+ int pSize = unpacker.readArrayBegin()
+ params = new Object[pSize]
+ for (j in 0..pSize) {
+ params[j] = unpacker.read(Object.class)
+ }
+ unpacker.readArrayEnd()
+ }
+ unpacker.readArrayEnd()
+
+ logger.debug("operation: $operation, params: $params")
}
}
}
View
80 tcp-server/src/main/groovy/org/riderzen/ogs/tcp/TcpServer.groovy
@@ -0,0 +1,80 @@
+package org.riderzen.ogs.tcp
+
+import org.vertx.java.busmods.BusModBase
+
+/**
+ * User: Leon Lee <mail.lgq@gmail.com>
+ * Date: 12-12-7
+ */
+class TcpServer extends BusModBase {
+ def server = null
+ def host
+ def port
+ def tcpNoDelay
+ def sendBufferSize
+ def receiveBufferSize
+ def tcpKeepAlive
+ def reuseAddress
+ def soLinger
+ def trafficClass
+ def backlog
+
+ def onConnected = { sock ->
+ sock.dataHandler { buffer ->
+ logger.debug("received ${buffer.lenght} bytes of data")
+ eb.publish("app.protocol", buffer)
+ }
+ sock.exceptionHandler { e ->
+ logger.error("caught error", e)
+ }
+ }
+
+ @Override
+ void start() {
+ super.start()
+
+ initConfig()
+ initServer()
+ initEvent()
+ server.listen(port, host)
+ }
+
+ def initConfig() {
+ host = getOptionalStringConfig("host", "0.0.0.0")
+ port = getOptionalIntConfig("port", 6543)
+ tcpNoDelay = getOptionalBooleanConfig("tcpNoDelay", false)
+ sendBufferSize = getOptionalIntConfig("sendBufferSize", 8192)
+ receiveBufferSize = getOptionalIntConfig("receiveBufferSize", 8192)
+ tcpKeepAlive = getOptionalBooleanConfig("tcpKeepAlive", true)
+ reuseAddress = getOptionalBooleanConfig("reuseAddress", true)
+ soLinger = getOptionalIntConfig("soLinger", -1)
+ trafficClass = getOptionalIntConfig("trafficClass", -1)
+ backlog = getOptionalIntConfig("backlog", -1)
+ }
+
+ def initServer() {
+ logger.info("initializing server with config: $config")
+
+ server = vertx.createNetServer()
+ server.setTCPNoDelay(tcpNoDelay)
+ server.setSendBufferSize(sendBufferSize)
+ server.setReceiveBufferSize(receiveBufferSize)
+ server.setTCPKeepAlive(tcpKeepAlive)
+ server.setReuseAddress(reuseAddress)
+ if (soLinger > 0)
+ server.setSoLinger(soLinger)
+ if (trafficClass >= 0 && trafficClass <= 255)
+ server.setTrafficClass(trafficClass)
+ if (backlog > 0)
+ server.setAcceptBacklog(backlog)
+
+ server.connectHandler(onConnected)
+ }
+
+ def initEvent() {
+ eb.registerHandler("sys.tcp.stop") { message ->
+ logger.info("received stop message, starting stop tcp server...")
+ server.close { logger.info("tcp server was stopped") }
+ }
+ }
+}
View
25 tcp-server/src/main/scripts/tcpServer.groovy
@@ -3,8 +3,27 @@
* User: Leon Lee <mail.lgq@gmail.com>
* Created On: 12-12-5
*/
+import org.vertx.java.core.net.NetServer
+
import static org.vertx.groovy.core.streams.Pump.createPump
-vertx.createNetServer().connectHandler { socket ->
- createPump(socket, socket).start()
-}.listen(1234)
+def config = container.config
+def logger = container.logger
+
+logger.info("initializing tcp server with config: ${config}")
+
+def server = vertx.createNetServer()
+
+server.tcpNoDelay = config.tcpNoDelay
+server.sendBufferSize = config.
+
+server.connectHandler { sock ->
+ logger.debug("client was connected")
+ sock.dataHandler { buffer ->
+
+ }
+ sock.exceptionHandler { e ->
+ logger.error("caught errors on socket", e)
+ }
+}.listen(config.port, config.host)
+
Please sign in to comment.
Something went wrong with that request. Please try again.