Skip to content

Commit

Permalink
HttpServerNettySpec passes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kris Nuttycombe committed Nov 1, 2012
1 parent 911b063 commit 5112221
Show file tree
Hide file tree
Showing 14 changed files with 274 additions and 382 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/blueeyes/core/data/Bijection.scala
Expand Up @@ -29,8 +29,8 @@ trait Bijection[A, B] extends Function1[A, B] with Unapply[A, B] { self =>
def apply(t: A): B def apply(t: A): B


def inverse: Bijection[B, A] = new Bijection[B, A] { def inverse: Bijection[B, A] = new Bijection[B, A] {
def apply(s: B): A = self.unapply(s) def apply(b: B): A = self.unapply(b)
def unapply(t: A): B = self.apply(t) def unapply(a: A): B = self.apply(a)
} }


def compose[R](that: Bijection[R, A]): Bijection[R, B] = new Bijection[R, B] { def compose[R](that: Bijection[R, A]): Bijection[R, B] = new Bijection[R, B] {
Expand Down
Expand Up @@ -21,8 +21,8 @@ trait DefaultBijections {
new Bijection[Future[Array[Byte]], ByteChunk] { new Bijection[Future[Array[Byte]], ByteChunk] {
private implicit val M: Monad[Future] = new FutureMonad(executor) private implicit val M: Monad[Future] = new FutureMonad(executor)


def apply(t: Future[Array[Byte]]): ByteChunk = { def apply(f: Future[Array[Byte]]): ByteChunk = {
Right(t.map(ByteBuffer.wrap _).liftM[StreamT]) Right(f.map(ByteBuffer.wrap _).liftM[StreamT])
} }


def unapply(s: ByteChunk): Future[Array[Byte]] = { def unapply(s: ByteChunk): Future[Array[Byte]] = {
Expand Down Expand Up @@ -105,13 +105,14 @@ trait DefaultBijections {
implicit def chunkToFutureJValue(implicit executor: ExecutionContext) = futureJValueToChunk.inverse implicit def chunkToFutureJValue(implicit executor: ExecutionContext) = futureJValueToChunk.inverse


/// XML Bijections /// /// XML Bijections ///

/*
implicit val XMLToByteArray = new Bijection[NodeSeq, Array[Byte]] { implicit val XMLToByteArray = new Bijection[NodeSeq, Array[Byte]] {
def apply(s: NodeSeq) = s.toString.getBytes("UTF-8") def apply(s: NodeSeq) = s.toString.getBytes("UTF-8")
def unapply(t: Array[Byte]) = XML.loadString(new String(t, "UTF-8")) def unapply(t: Array[Byte]) = XML.loadString(new String(t, "UTF-8"))
} }
implicit val ByteArrayToXML = XMLToByteArray.inverse implicit val ByteArrayToXML = XMLToByteArray.inverse
*/
} }


object DefaultBijections extends DefaultBijections object DefaultBijections extends DefaultBijections
Expand Up @@ -42,7 +42,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import javax.net.ssl.SSLContext import javax.net.ssl.SSLContext


import scalaz._ import scalaz._
import scala.collection.JavaConversions._ import scala.collection.JavaConverters._


class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpClientByteChunk with Logging { class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpClientByteChunk with Logging {
implicit val M: Monad[Future] = new FutureMonad(executor) implicit val M: Monad[Future] = new FutureMonad(executor)
Expand Down Expand Up @@ -99,7 +99,7 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
case _ => promise.failure(HttpException(HttpStatusCodes.BadRequest, reason)) case _ => promise.failure(HttpException(HttpStatusCodes.BadRequest, reason))
} }
} else { } else {
val headers = response.getHeaderNameSet.toList.asInstanceOf[List[String]].foldLeft(Map[String, String]()) { val headers = response.getHeaderNameSet.asScala.foldLeft(Map[String, String]()) {
(acc, name) => acc + (name -> response.getHeader(name)) (acc, name) => acc + (name -> response.getHeader(name))
} }


Expand All @@ -125,7 +125,9 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
} }


xlrequest match { xlrequest match {
case e: IHttpRequest => clientInstance.send(e, handler) case e: IHttpRequest =>
clientInstance.send(e, handler)

case e: IHttpRequestHeader => case e: IHttpRequestHeader =>
val bodyDataSink = clientInstance.send(e, handler) val bodyDataSink = clientInstance.send(e, handler)
request.content.map(sendData(_, bodyDataSink)).getOrElse(bodyDataSink.close()) request.content.map(sendData(_, bodyDataSink)).getOrElse(bodyDataSink.close())
Expand All @@ -146,7 +148,7 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
} }


chunk match { chunk match {
case Left(data) => Future(bodyDataSink.write(data)) case Left(data) => Future(bodyDataSink.write(data)).map(_ => bodyDataSink.close())
case Right(stream) => writeStream(stream) case Right(stream) => writeStream(stream)
} }
} }
Expand All @@ -168,6 +170,7 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
if (available > 0) { if (available > 0) {
val buffer = ByteBuffer.allocate(available) val buffer = ByteBuffer.allocate(available)
source.read(buffer) source.read(buffer)
buffer.flip()


val current = chain val current = chain
chain = Chain.incomplete chain = Chain.incomplete
Expand Down Expand Up @@ -197,7 +200,18 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
origURI.getFragment).toString origURI.getFragment).toString


val newHeaders = request.headers ++ requestContentLength(request) val newHeaders = request.headers ++ requestContentLength(request)
val xlRequest = createXLRequest(request, uri) val xlRequest = request.method match {
case HttpMethods.DELETE => new DeleteRequest(uri)
case HttpMethods.GET => new GetRequest(uri)
case HttpMethods.HEAD => new HeadRequest(uri)
case HttpMethods.OPTIONS => new OptionsRequest(uri)
case HttpMethods.POST => postRequest(request, uri)
case HttpMethods.PUT => putRequest(request, uri)
case HttpMethods.CONNECT => sys.error("CONNECT is not implemented.")
case HttpMethods.TRACE => sys.error("TRACE is not implemented.")
case HttpMethods.PATCH => sys.error("PATCH is not implemented.")
case HttpMethods.CUSTOM(x) => sys.error("CUSTOM is not implemented.")
}


for ((key, value) <- newHeaders.raw) xlRequest.addHeader(key, value) for ((key, value) <- newHeaders.raw) xlRequest.addHeader(key, value)


Expand All @@ -212,21 +226,6 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
} }
} }


private def createXLRequest(request: HttpRequest[ByteChunk], url: String): IHeader = {
request.method match {
case HttpMethods.DELETE => new DeleteRequest(url)
case HttpMethods.GET => new GetRequest(url)
case HttpMethods.HEAD => new HeadRequest(url)
case HttpMethods.OPTIONS => new OptionsRequest(url)
case HttpMethods.POST => postRequest(request, url)
case HttpMethods.PUT => putRequest(request, url)
case HttpMethods.CONNECT => sys.error("CONNECT is not implemented.")
case HttpMethods.TRACE => sys.error("TRACE is not implemented.")
case HttpMethods.PATCH => sys.error("PATCH is not implemented.")
case HttpMethods.CUSTOM(x) => sys.error("CUSTOM is not implemented.")
}
}

private def postRequest(request: HttpRequest[ByteChunk], url: String): IHeader = { private def postRequest(request: HttpRequest[ByteChunk], url: String): IHeader = {
request.content match { request.content match {
case None => new PostRequest(url) case None => new PostRequest(url)
Expand All @@ -244,6 +243,7 @@ class HttpClientXLightWeb(implicit val executor: ExecutionContext) extends HttpC
} }


private def requestContentType(request: HttpRequest[ByteChunk]) = `Content-Type`(request.mimeTypes : _*) private def requestContentType(request: HttpRequest[ByteChunk]) = `Content-Type`(request.mimeTypes : _*)

private def requestContentLength(request: HttpRequest[ByteChunk]): Option[HttpHeader] = { private def requestContentLength(request: HttpRequest[ByteChunk]): Option[HttpHeader] = {
for (content <- request.content; v <- content.left.toOption) yield { for (content <- request.content; v <- content.left.toOption) yield {
`Content-Length`(v.remaining) `Content-Length`(v.remaining)
Expand Down
Expand Up @@ -9,6 +9,7 @@ import blueeyes.bkka._
import blueeyes.concurrent.ReadWriteLock import blueeyes.concurrent.ReadWriteLock
import blueeyes.core.http._ import blueeyes.core.http._
import blueeyes.core.data._ import blueeyes.core.data._
import blueeyes.util._


import com.weiglewilczek.slf4s.Logger import com.weiglewilczek.slf4s.Logger
import com.weiglewilczek.slf4s.Logging import com.weiglewilczek.slf4s.Logging
Expand Down Expand Up @@ -42,10 +43,13 @@ import org.jboss.netty.handler.stream.ChunkedInput
import org.jboss.netty.handler.stream.ChunkedWriteHandler import org.jboss.netty.handler.stream.ChunkedWriteHandler


import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue


import scala.collection.JavaConversions._
import scala.collection.mutable.{HashSet, SynchronizedSet}
import scalaz._ import scalaz._
import scalaz.syntax.monad._
import scala.collection.JavaConverters._
import scala.collection.mutable.{HashSet, SynchronizedSet}


/** This handler is not thread safe, it's assumed a new one will be created /** This handler is not thread safe, it's assumed a new one will be created
* for each client connection. * for each client connection.
Expand Down Expand Up @@ -107,7 +111,7 @@ private[engines] class HttpServiceUpstreamHandler(service: AsyncHttpService[Byte
case Some(Right(stream)) => case Some(Right(stream)) =>
nettyResponse.setHeader(NettyHttpHeaders.Names.TRANSFER_ENCODING, "chunked") nettyResponse.setHeader(NettyHttpHeaders.Names.TRANSFER_ENCODING, "chunked")
channel.write(nettyResponse) channel.write(nettyResponse)
channel.write(new StreamChunkedInput(stream, channel)) channel.write(StreamChunkedInput(stream, channel, 2))


case None => case None =>
nettyResponse.setHeader(NettyHttpHeaders.Names.CONTENT_LENGTH, "0") nettyResponse.setHeader(NettyHttpHeaders.Names.CONTENT_LENGTH, "0")
Expand Down Expand Up @@ -136,63 +140,53 @@ private[engines] class HttpServiceUpstreamHandler(service: AsyncHttpService[Byte
} }
} }


private[engines] class StreamChunkedInput(stream: StreamT[Future, ByteBuffer], channel: Channel)(implicit M: Monad[Future]) private[engines] class StreamChunkedInput(queue: BlockingQueue[Option[HttpChunk]], channel: Channel) extends ChunkedInput {
extends ChunkedInput with Logging with AkkaDefaults { override def hasNextChunk() = {

val head = queue.peek
private val lock = new ReadWriteLock (head != None && head != null)

}
private var data: HttpChunk = null
@volatile private var isEOF: Boolean = false
@volatile private var awaitingRead: Boolean = false
private var remaining: StreamT[Future, ByteBuffer] = null

advance(stream)

private def advance(stream: StreamT[Future, ByteBuffer]): Future[Unit] = {
stream.uncons map {
case Some((buffer, tail)) =>
//
lock.writeLock {
data = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buffer))
remaining = stream
awaitingRead = true
isEOF = false
}


case None => override def nextChunk() = {
lock.writeLock { queue.poll() match {
data = new DefaultHttpChunkTrailer case None | null => null
remaining = null case Some(data) => data
awaitingRead = true
isEOF = true
}
} onFailure {
case ex =>
logger.error("An error was encountered retrieving the next chunk of data: " + ex.getMessage, ex)
lock.writeLock {
data = null
remaining = null
awaitingRead = false
isEOF = true
}
} }
} }


override def close() { override def isEndOfInput() = {
// FIXME queue.peek == None
} }


override def isEndOfInput() = isEOF override def close() = ()
}


override def hasNextChunk() = awaitingRead && !isEOF object StreamChunkedInput extends Logging {
def apply(stream: StreamT[Future, ByteBuffer], channel: Channel, maxQueueSize: Int = 1)(implicit M: Monad[Future]): ChunkedInput = {
def advance(queue: BlockingQueue[Option[HttpChunk]], stream: StreamT[Future, ByteBuffer]): Future[Unit] = {
stream.uncons flatMap {
case Some((buffer, tail)) =>
queue.put(Some(new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buffer))))
channel.getPipeline.get(classOf[ChunkedWriteHandler]).resumeTransfer()
advance(queue, tail)


override def nextChunk() = { case None =>
lock.writeLock { {
if (!awaitingRead) throw new IllegalStateException("No data available; nextChunk called when not awaiting read.") queue.put(Some(new DefaultHttpChunkTrailer))
val result = data queue.put(None)
awaitingRead = false channel.getPipeline.get(classOf[ChunkedWriteHandler]).resumeTransfer()
if (!isEOF) advance(remaining) }.point[Future]
result } onFailure {
case ex =>
logger.error("An error was encountered retrieving the next chunk of data: " + ex.getMessage, ex)
queue.put(None)
}
}

val queue = new LinkedBlockingQueue[Option[HttpChunk]](maxQueueSize)
advance(queue, stream).onSuccess {
case _ => logger.debug("Response stream fully consumed by Netty.")
} }

new StreamChunkedInput(queue, channel)
} }
} }
Expand Up @@ -2,7 +2,7 @@ package blueeyes.core.service
package engines.netty package engines.netty


import blueeyes.core.data._ import blueeyes.core.data._
import engines.security.BlueEyesKeyStoreFactory import engines.security._
import engines.security.BlueEyesKeyStoreFactory._ import engines.security.BlueEyesKeyStoreFactory._


import akka.dispatch.ExecutionContext import akka.dispatch.ExecutionContext
Expand Down
@@ -1,4 +1,4 @@
package blueeyes.core.service.engines.netty package blueeyes.core.service.engines.security


import java.security.KeyStore import java.security.KeyStore
import javax.net.ssl.{TrustManager, KeyManagerFactory, SSLContext}; import javax.net.ssl.{TrustManager, KeyManagerFactory, SSLContext};
Expand All @@ -13,4 +13,4 @@ object SslContextFactory {


context context
} }
} }
9 changes: 5 additions & 4 deletions core/src/test/scala/blueeyes/core/data/BijectionSpec.scala
Expand Up @@ -19,12 +19,13 @@ class BijectionSpec extends Specification {
Bijection.identity[String].unapply("foo") mustEqual ("foo") Bijection.identity[String].unapply("foo") mustEqual ("foo")
} }
"Bijection.inverse: creates inverse Bijection" in { "Bijection.inverse: creates inverse Bijection" in {
val inversed = ByteArrayToString.inverse val b = ByteArrayToString
inversed.unapply(Array[Byte]('f', 'o', 'o')) mustEqual("foo") b(Array[Byte]('f', 'o', 'o')) must_== "foo"
inversed("foo").toList mustEqual(List[Byte]('f', 'o', 'o')) b.inverse.unapply(Array[Byte]('f','o','o')) must_== "foo"
b.inverse("foo").toList must_== List[Byte]('f', 'o', 'o')
} }
"Bijection.compose: creates composed Bijection" in { "Bijection.compose: creates composed Bijection" in {
val composed = atoi.andThen(itof) val composed = atoi.andThen(itof)
composed("1.23") must_== 1.23d composed("2") must_== 2.0d
} }
} }

0 comments on commit 5112221

Please sign in to comment.