Permalink
Browse files

fix: downstream messages go downstream.

  • Loading branch information...
1 parent 4555f7c commit f2ad44a28b7c9db93556067c3f85cbc65c8ab8be Robey Pointer committed Dec 23, 2010
Showing with 22 additions and 12 deletions.
  1. +2 −2 src/main/scala/com/twitter/naggati/Codec.scala
  2. +20 −10 src/test/scala/com/twitter/naggati/TestCodec.scala
@@ -54,10 +54,10 @@ extends FrameDecoder with ChannelDownstreamHandler {
// turn an Encodable message into a Buffer.
override final def handleDownstream(context: ChannelHandlerContext, event: ChannelEvent) {
event match {
- case message: MessageEvent =>
+ case message: DownstreamMessageEvent =>
val obj = message.getMessage()
if (encoder.isDefinedAt(obj)) {
- Channels.fireMessageReceived(context, encoder(obj), message.getRemoteAddress())
+ Channels.write(context, message.getFuture, encoder(obj), message.getRemoteAddress)
} else {
context.sendDownstream(event)
}
@@ -30,21 +30,31 @@ class TestCodec(codec: Codec) {
val output = new mutable.ListBuffer[AnyRef]
- val fin = new SimpleChannelUpstreamHandler() {
+ def log(e: MessageEvent) {
+ e.getMessage match {
+ case buffer: ChannelBuffer =>
+ val bytes = new Array[Byte](buffer.readableBytes)
+ buffer.readBytes(bytes)
+ output += bytes
+ case x =>
+ output += x
+ }
+ }
+
+ val upstreamTerminus = new SimpleChannelUpstreamHandler() {
override def messageReceived(c: ChannelHandlerContext, e: MessageEvent) {
- e.getMessage match {
- case buffer: ChannelBuffer =>
- val bytes = new Array[Byte](buffer.readableBytes)
- buffer.readBytes(bytes)
- output += bytes
- case x =>
- output += x
- }
+ log(e)
+ }
+ }
+ val downstreamTerminus = new SimpleChannelDownstreamHandler() {
+ override def writeRequested(c: ChannelHandlerContext, e: MessageEvent) {
+ log(e)
}
}
val pipeline = Channels.pipeline()
+ pipeline.addLast("downstreamTerminus", downstreamTerminus)
pipeline.addLast("decoder", codec)
- pipeline.addLast("fin", fin)
+ pipeline.addLast("upstreamTerminus", upstreamTerminus)
val context = pipeline.getContext(codec)
val sink = new AbstractChannelSink() {

0 comments on commit f2ad44a

Please sign in to comment.