Permalink
Browse files

Refactor the ThriftClientCodec to encode wire protocol to ThriftReply…

…[(ThriftGeneratedClass).(call_name)_result] instead of (ThriftGeneratedClass).(call_name)_result
  • Loading branch information...
Wilhelm Bierbaum
Wilhelm Bierbaum committed Dec 28, 2010
1 parent f323377 commit 92e9112f660f83c18c95d638f4ff970b921effb7
@@ -54,8 +54,9 @@ class ThriftClientDecoder extends ReplayingDecoder[VoidEnum] {
null
case TMessageType.REPLY =>
val call = ThriftTypes(message.name).newInstance()
- val reply = call.readResponse(protocol)
- reply.asInstanceOf[AnyRef] // Note reply may not be a success
+ val result = call.readResponse(protocol)
+ val reply = call.reply(result.asInstanceOf[AnyRef])
+ reply
case _ =>
Channels.fireExceptionCaught(ctx, new TApplicationException(
TApplicationException.INVALID_MESSAGE_TYPE))
@@ -54,8 +54,8 @@ class ThriftCall[A <: TBase[_, _], R <: TBase[_, _]](
/**
* Wrap a ReplyClass in a ThriftReply.
*/
- def reply(reply: R) =
- new ThriftReply[R](reply, this)
+ def reply(reply: AnyRef) =
+ new ThriftReply[R](reply.asInstanceOf[R], this)
/**
* Read the argument list
@@ -44,7 +44,7 @@ object AsyncServerEndToEndSpec extends Specification {
}
})
- val callResults = new Promise[Silly.bleep_result]
+ val callResults = new Promise[ThriftReply[Silly.bleep_result]]
// ** Set up the client.
val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory)
@@ -56,7 +56,7 @@ object AsyncServerEndToEndSpec extends Specification {
pipeline.addLast("encode", new ThriftClientEncoder)
pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result])
+ callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]])
}
})
@@ -78,7 +78,7 @@ object AsyncServerEndToEndSpec extends Specification {
val result = callResults.within(1.second)
result.isReturn must beTrue
- result().success must be_==("yehyeh")
+ result().response.success must be_==("yehyeh")
serverChannel.close().awaitUninterruptibly()
serverBootstrap.getFactory.releaseExternalResources()
@@ -40,7 +40,7 @@ object DuplexChannelBufferTransportSpec extends Specification with Mockito {
val t = new DuplexChannelBufferTransport(in, out)
val bb = "hello".getBytes
- "writes to the output ChannelBuffe" in {
+ "writes to the output ChannelBuffer" in {
t.write(bb, 0, 1)
there was one(out).writeBytes(bb, 0, 1)
@@ -55,7 +55,7 @@ object EndToEndSpec extends Specification {
}
})
- val callResults = new Promise[Silly.bleep_result]
+ val callResults = new Promise[ThriftReply[Silly.bleep_result]]
// ** Set up the client.
val clientBootstrap = new ClientBootstrap(new DefaultLocalClientChannelFactory)
@@ -67,7 +67,7 @@ object EndToEndSpec extends Specification {
pipeline.addLast("decoder", new ThriftClientDecoder)
pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result])
+ callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]])
Channels.close(ctx.getChannel)
}
})
@@ -89,7 +89,7 @@ object EndToEndSpec extends Specification {
val result = callResults.within(1.second)
result.isReturn must beTrue
- result().success must be_==("yehyeh")
+ result().response.success must be_==("yehyeh")
// ** Shutdown
serverChannel.close().awaitUninterruptibly()
@@ -117,7 +117,7 @@ object EndToEndSpec extends Specification {
processor, serverSocket,
transportFactory, protocolFactory)
- val callResults = new Promise[Silly.bleep_result]
+ val callResults = new Promise[ThriftReply[Silly.bleep_result]]
val cf = new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
@@ -132,7 +132,7 @@ object EndToEndSpec extends Specification {
pipeline.addLast("decoder", new ThriftClientDecoder)
pipeline.addLast("handler", new SimpleChannelUpstreamHandler {
override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
- callResults() = Return(e.getMessage.asInstanceOf[Silly.bleep_result])
+ callResults() = Return(e.getMessage.asInstanceOf[ThriftReply[Silly.bleep_result]])
Channels.close(ctx.getChannel)
}
})
@@ -158,7 +158,7 @@ object EndToEndSpec extends Specification {
val result = callResults.within(1.second)
result.isReturn must beTrue
- result().success must be_==("raboof")
+ result().response.success must be_==("raboof")
thriftServer.stop()
serverThread.join()
@@ -63,8 +63,8 @@ object ServiceEndToEndSpec extends Specification {
val result = promise.within(1.second)
result.isReturn must beTrue
- val reply = result().asInstanceOf[Silly.bleep_result]
- reply.success mustEqual "olleh"
+ val reply = result().asInstanceOf[ThriftReply[Silly.bleep_result]]
+ reply().response.success mustEqual "olleh"
server.close().awaitUninterruptibly()
}
@@ -149,10 +149,9 @@ object ThriftCodecSpec extends Specification {
// verify decode
val message = channel.upstreamEvents(0).asInstanceOf[MessageEvent].getMessage()
- val result = message.asInstanceOf[Silly.bleep_result]
+ val result = message.asInstanceOf[ThriftReply[Silly.bleep_result]]
result mustNot beNull
- result.isSetSuccess must beTrue
- result.success must be_==("result")
+ result.response.success must be_==("result")
}
"decode replys broken in two" in {
@@ -177,10 +176,10 @@ object ThriftCodecSpec extends Specification {
channel.upstreamEvents must haveSize(1)
channel.downstreamEvents must haveSize(0)
val message = channel.upstreamEvents(0).asInstanceOf[MessageEvent].getMessage()
- val result = message.asInstanceOf[Silly.bleep_result]
+ val result = message.asInstanceOf[ThriftReply[Silly.bleep_result]]
result mustNot beNull
- result.isSetSuccess must beTrue
- result.success must be_==("result")
+ // result.re must beTrue
+ result.response.success must be_==("result")
}
}

0 comments on commit 92e9112

Please sign in to comment.