Skip to content

Commit

Permalink
Added HTTP proxying service combinator.
Browse files Browse the repository at this point in the history
  • Loading branch information
Kris Nuttycombe committed Apr 23, 2013
1 parent 326ac90 commit 970326a
Show file tree
Hide file tree
Showing 23 changed files with 75 additions and 67 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/blueeyes/core/http/HttpRequest.scala
Expand Up @@ -22,7 +22,7 @@ sealed case class HttpRequest[T] private(method: HttpMethod, uri: URI, parameter
def map[U](f: T => U): HttpRequest[U] = copy(content = content map f)
}

object HttpRequest{
object HttpRequest {
def apply[T](method: HttpMethod, uri: URI, parameters: Map[Symbol, String] = Map(), headers: HttpHeaders = HttpHeaders.Empty, content: Option[T] = None, remoteHost: Option[InetAddress] = None, version: HttpVersion = `HTTP/1.1`): HttpRequest[T] = {
val subpath = uri.path.getOrElse("")
val query = uri.query
Expand Down
12 changes: 1 addition & 11 deletions core/src/main/scala/blueeyes/core/service.scala
Expand Up @@ -11,21 +11,11 @@ import scalaz.syntax.traverse._
import scalaz.std.option._

package object service {
type AsyncHttpService[T] = HttpService[T, Future[HttpResponse[T]]]
type AsyncHttpService[A, B] = HttpService[A, Future[HttpResponse[B]]]
type AsyncHttpTranscoder[A, B] = AsyncTranscoder[HttpRequest, HttpResponse, A, B]

type HttpClientHandler[T] = PartialFunction[HttpRequest[T], Future[HttpResponse[T]]]

type HttpServiceHandler[T, S] = HttpRequest[T] => S

type HttpClientTransformer[T, S] = HttpClient[T] => Future[S]

type ServiceDescriptorFactory[T, S] = ServiceContext => ServiceLifecycle[T, S]

type HttpResponseTransformer[T, S] = HttpResponse[T] => Future[S]

type HttpServiceCombinator[A, B, C, D] = HttpService[A, B] => HttpService[C, D]

implicit def asyncHttpTranscoder[A, B](implicit M: Monad[Future], projection: A => B, surjection: B => Future[A]): AsyncHttpTranscoder[A, B] = {
new AsyncTranscoder[HttpRequest, HttpResponse, A, B] {
def apply(request: HttpRequest[A]) = request.copy(content = request.content.map(projection))
Expand Down
Expand Up @@ -24,7 +24,7 @@ trait ConfigurableHttpClient {

protected def realClient: HttpClientByteChunk = new HttpClientXLightWeb()

private def mockClient(h: AsyncHttpService[ByteChunk]): HttpClientByteChunk = new HttpClientByteChunk {
private def mockClient(h: AsyncHttpService[ByteChunk, ByteChunk]): HttpClientByteChunk = new HttpClientByteChunk {
val executor = executionContext
def isDefinedAt(r: HttpRequest[ByteChunk]) = true
def apply(r: HttpRequest[ByteChunk]): Future[HttpResponse[ByteChunk]] = h.service(r) match {
Expand All @@ -41,7 +41,7 @@ trait ConfigurableHttpClient {
case e => HttpResponse[ByteChunk](HttpStatus(InternalServerError, Option(e.getMessage).getOrElse("")))
}

protected def mockServer: AsyncHttpService[ByteChunk] = new CustomHttpService[ByteChunk, Future[HttpResponse[ByteChunk]]] {
protected def mockServer: AsyncHttpService[ByteChunk, ByteChunk] = new CustomHttpService[ByteChunk, Future[HttpResponse[ByteChunk]]] {
def service = (request: HttpRequest[ByteChunk]) => {
success(Future(HttpResponse[ByteChunk](HttpStatus(NotFound, "Mock server handles no requests."))))
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/blueeyes/core/service/HttpClient.scala
Expand Up @@ -11,7 +11,7 @@ import scalaz.std.option._
import scalaz.std.string._
import scalaz.syntax.semigroup._

trait HttpClient[A] extends HttpClientHandler[A] { self =>
trait HttpClient[A] extends PartialFunction[HttpRequest[A], Future[HttpResponse[A]]] { self =>
def get[B](path: String)(implicit transcoder: AsyncHttpTranscoder[B, A]) = method[B](HttpMethods.GET, path)

def post[B](path: String)(content: B)(implicit f: B => A, transcoder: AsyncHttpTranscoder[B, A]) = method[B](HttpMethods.POST, path, Some(f(content)))
Expand Down Expand Up @@ -149,7 +149,7 @@ trait HttpClient[A] extends HttpClientHandler[A] { self =>
}

object HttpClient extends blueeyes.bkka.AkkaDefaults {
implicit def requestHandlerToHttpClient[A](h: HttpClientHandler[A]): HttpClient[A] = new HttpClient[A] {
implicit def requestHandlerToHttpClient[A](h: PartialFunction[HttpRequest[A], Future[HttpResponse[A]]]): HttpClient[A] = new HttpClient[A] {
def isDefinedAt(r: HttpRequest[A]): Boolean = h.isDefinedAt(r)

def apply(r: HttpRequest[A]): Future[HttpResponse[A]] = h.apply(r)
Expand Down
Expand Up @@ -20,7 +20,7 @@ import scalaz.Monad


trait HttpRequestHandlerCombinators {
implicit def handle[A, B](handler: HttpServiceHandler[A, B]): HttpService[A, B] = new HttpHandlerService(handler)
implicit def handle[A, B](handler: HttpRequest[A] => B): HttpService[A, B] = new HttpHandlerService(handler)

def debug[A, B](logger: Logger): HttpService[A, B] => HttpService[A, B] =
(h: HttpService[A, B]) => new DebugService[A, B](logger, h)
Expand Down Expand Up @@ -231,7 +231,7 @@ trait HttpRequestHandlerCombinators {
/** The aggregate combinator creates a handler that stitches together chunks
* to make a bigger chunk, up to the specified size.
*/
def aggregate(chunkSize: Option[DataSize])(h: HttpService[ByteChunk, Future[HttpResponse[ByteChunk]]])(implicit executor: ExecutionContext) =
def aggregate(chunkSize: Option[DataSize])(h: AsyncHttpService[ByteChunk, ByteChunk])(implicit executor: ExecutionContext) =
new AggregateService(chunkSize, h)

/** The jsonp combinator creates a handler that accepts and produces JSON.
Expand All @@ -240,7 +240,7 @@ trait HttpRequestHandlerCombinators {
* HTTP method and content using the query string parameters "method" and
* "content", respectively.
*/
def jsonp[A](delegate: HttpService[A, Future[HttpResponse[A]]])(implicit fromString: String => A, semigroup: Semigroup[A]) =
def jsonp[A](delegate: AsyncHttpService[A, A])(implicit fromString: String => A, semigroup: Semigroup[A]) =
new JsonpService[A](delegate)

/** The jvalue combinator creates a handler that accepts and produces JSON.
Expand All @@ -255,15 +255,18 @@ trait HttpRequestHandlerCombinators {
def xml[A](h: HttpService[Future[NodeSeq], Future[HttpResponse[NodeSeq]]])(implicit inj: A => Future[NodeSeq], surj: NodeSeq => A, M: Monad[Future]) =
contentType(MimeTypes.text/MimeTypes.xml) { h.contramap(inj) } map { _ map { _ map surj } }

def proxy[A](httpClient: HttpClient[A])(filter: HttpRequest[A] => Boolean = (_: HttpRequest[A]) => true): AsyncHttpService[A, A] =
new ProxyService(httpClient, filter)


def forwarding[A, A0](f: HttpRequest[A] => Option[HttpRequest[A0]], httpClient: HttpClient[A0])(h: HttpService[A, HttpResponse[A]]) =
new ForwardingService[A, A0](f, httpClient, h)

def metadata[A, B](metadata: Metadata*)(h: HttpService[A, Future[HttpResponse[B]]]) = new MetadataService(Some(AndMetadata(metadata: _*)), h)
def metadata[A, B](metadata: Metadata*)(h: AsyncHttpService[A, B]) = new MetadataService(Some(AndMetadata(metadata: _*)), h)

def describe[A, B](description: String)(h: HttpService[A, Future[HttpResponse[B]]]) = new MetadataService(Some(DescriptionMetadata(description)), h)
def describe[A, B](description: String)(h: AsyncHttpService[A, B]) = new MetadataService(Some(DescriptionMetadata(description)), h)

/** The decodeUrl combinator creates a handler that decode HttpRequest URI.
*/
/** A handler that applies an extra pass of HTTP URL decoding to the inbound HttpRequest URI. */
def decodeUrl[A, B](h: HttpService[A, B]) = new DecodeUrlService[A, B](h)
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ trait HttpServerModule extends Logging {
ServiceContext(rootConfig, serviceConfig, service.name, service.version, service.desc, host, port, sslPort)
}

def start: Option[Future[(AsyncHttpService[ByteChunk], Option[Stoppable])]] = {
def start: Option[Future[(AsyncHttpService[ByteChunk, ByteChunk], Option[Stoppable])]] = {
def append[S](lifecycle: ServiceLifecycle[ByteChunk, S], tail: List[Service[ByteChunk, _]]): ServiceLifecycle[ByteChunk, _] = {
tail match {
case x :: xs => append(lifecycle ~ x.lifecycle(context(x)), xs)
Expand All @@ -119,7 +119,7 @@ trait HttpServerModule extends Logging {
lifecycle map { _.run map { (trapErrors _).first } }
}

def trapErrors(delegate: AsyncHttpService[ByteChunk]): AsyncHttpService[ByteChunk] = new CustomHttpService[ByteChunk, Future[HttpResponse[ByteChunk]]] {
def trapErrors(delegate: AsyncHttpService[ByteChunk, ByteChunk]): AsyncHttpService[ByteChunk, ByteChunk] = new CustomHttpService[ByteChunk, Future[HttpResponse[ByteChunk]]] {
private def convertErrorToResponse(th: Throwable): HttpResponse[ByteChunk] = th match {
case e: HttpException => HttpResponse[ByteChunk](HttpStatus(e.failure, e.reason))
case e => {
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/blueeyes/core/service/HttpServices.scala
Expand Up @@ -22,6 +22,7 @@ import java.net.URLDecoder._
import scalaz.{ Unapply => _, _ }
import scalaz.syntax.functor._
import scalaz.syntax.kleisli._
import scalaz.syntax.show._
import scalaz.syntax.semigroup._
import scalaz.syntax.validation._
import scalaz.syntax.std.boolean._
Expand Down Expand Up @@ -129,7 +130,7 @@ object ResponseModifier {
// Handlers that are descendents of the ADT types //
////////////////////////////////////////////////////

class HttpHandlerService[A, B](h: HttpServiceHandler[A, B]) extends CustomHttpService[A, B] {
class HttpHandlerService[A, B](h: HttpRequest[A] => B) extends CustomHttpService[A, B] {
val service = (r: HttpRequest[A]) => h(r).success

val metadata = NoMetadata
Expand Down Expand Up @@ -437,6 +438,19 @@ object JsonpService extends AkkaDefaults {
}
}

class ProxyService[A](httpClient: HttpClient[A], filter: HttpRequest[A] => Boolean)
extends CustomHttpService[A, Future[HttpResponse[A]]] {
def service = { r: HttpRequest[A] =>
if (filter(r) && httpClient.isDefinedAt(r)) {
Success(httpClient(r))
} else {
Failure(inapplicable)
}
}

val metadata = NoMetadata
}


class ForwardingService[T, U](f: HttpRequest[T] => Option[HttpRequest[U]], httpClient: HttpClient[U], val delegate: HttpService[T, HttpResponse[T]])
extends DelegatingService[T, HttpResponse[T], T, HttpResponse[T]] with AkkaDefaults {
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/blueeyes/core/service/Service.scala
Expand Up @@ -22,7 +22,7 @@ trait Service[T, S] {
override def toString = name + "." + version.majorVersion
}

case class ServiceLifecycle[T, S](startup: () => Future[S], runningState: S => (AsyncHttpService[T], Option[Stoppable])) { self =>
case class ServiceLifecycle[T, S](startup: () => Future[S], runningState: S => (AsyncHttpService[T, T], Option[Stoppable])) { self =>
def ~ [S0](that: ServiceLifecycle[T, S0]): ServiceLifecycle[T, (S, S0)] = {
ServiceLifecycle[T, (S, S0)](
startup = () => self.startup() zip that.startup(),
Expand All @@ -36,7 +36,7 @@ case class ServiceLifecycle[T, S](startup: () => Future[S], runningState: S => (
)
}

def ~> (that: AsyncHttpService[T]): ServiceLifecycle[T, S] = {
def ~> (that: AsyncHttpService[T, T]): ServiceLifecycle[T, S] = {
self.copy(
runningState = (s: S) => {
val (service, stoppable) = self.runningState(s)
Expand All @@ -45,7 +45,7 @@ case class ServiceLifecycle[T, S](startup: () => Future[S], runningState: S => (
)
}

def ~ (that: AsyncHttpService[T]): ServiceLifecycle[T, S] = {
def ~ (that: AsyncHttpService[T, T]): ServiceLifecycle[T, S] = {
self.copy(
runningState = (s: S) => {
val (service, stoppable) = self.runningState(s)
Expand Down
Expand Up @@ -32,9 +32,9 @@ trait ServiceBuilder[T] {
StartupDescriptor[T, S](thunk)
}

protected def request[S](request: S => AsyncHttpService[T]): RequestDescriptor[T, S] = RequestDescriptor[T, S](request)
protected def request[S](request: S => AsyncHttpService[T, T]): RequestDescriptor[T, S] = RequestDescriptor[T, S](request)

protected def request(request: => AsyncHttpService[T]): RequestDescriptor[T, Unit] = RequestDescriptor[T, Unit]((u) => request)
protected def request(request: => AsyncHttpService[T, T]): RequestDescriptor[T, Unit] = RequestDescriptor[T, Unit]((u) => request)

def service[S](sname: String, sversion: ServiceVersion, sdesc: Option[String] = None)(slifecycle: ServiceContext => ServiceLifecycle[T, S]): Service[T, S] = {
new Service[T, S] {
Expand Down Expand Up @@ -82,5 +82,5 @@ case class StartupDescriptor[T, S](startup: () => Future[S]) {
}
}

case class RequestDescriptor[T, S](serviceBuilder: S => AsyncHttpService[T])
case class RequestDescriptor[T, S](serviceBuilder: S => AsyncHttpService[T, T])
case class ShutdownDescriptor[-S](shutdownBuilder: S => Option[Stoppable])
Expand Up @@ -283,7 +283,7 @@ trait ServiceDescriptorFactoryCombinators extends HttpRequestHandlerCombinators
}
}

private[service] class HttpRequestLoggerService[T](actor: ActorRef, underlying: AsyncHttpService[T])(implicit executor: ExecutionContext)
private[service] class HttpRequestLoggerService[T](actor: ActorRef, underlying: AsyncHttpService[T, T])(implicit executor: ExecutionContext)
extends CustomHttpService[T, Future[HttpResponse[T]]]{
def service = (request: HttpRequest[T]) => {
try {
Expand Down Expand Up @@ -316,7 +316,7 @@ trait ServiceDescriptorFactoryCombinators extends HttpRequestHandlerCombinators
val metadata = NoMetadata
}

private[service] class MonitorHttpRequestService[T](val delegate: AsyncHttpService[T], healthMonitor: HealthMonitor) extends DelegatingService[T, Future[HttpResponse[T]], T, Future[HttpResponse[T]]] with JPathImplicits{
private[service] class MonitorHttpRequestService[T](val delegate: AsyncHttpService[T, T], healthMonitor: HealthMonitor) extends DelegatingService[T, Future[HttpResponse[T]], T, Future[HttpResponse[T]]] with JPathImplicits{
def service = {request: HttpRequest[T] =>
val methodName = request.method.value
val requestPath = JPathField(methodName)
Expand Down
Expand Up @@ -32,6 +32,6 @@ trait AbstractNettyEngine extends HttpServerModule { self =>
}
}

protected def nettyServers(service: AsyncHttpService[ByteChunk]): List[NettyServer]
protected def nettyServers(service: AsyncHttpService[ByteChunk, ByteChunk]): List[NettyServer]
}
}
Expand Up @@ -14,7 +14,7 @@ import com.weiglewilczek.slf4s.Logging

import HttpServerConfig._

private[engines] class HttpNettyServerProvider(conf: HttpServerConfig, service: AsyncHttpService[ByteChunk], executionContext: ExecutionContext) extends AbstractNettyServerProvider {
private[engines] class HttpNettyServerProvider(conf: HttpServerConfig, service: AsyncHttpService[ByteChunk, ByteChunk], executionContext: ExecutionContext) extends AbstractNettyServerProvider {
def pipelineFactory(channelGroup: ChannelGroup) = {
new HttpPipelineFactory("http", conf.host, conf.port, conf.chunkSize, conf.compressionLevel, service, channelGroup, executionContext)
}
Expand All @@ -27,7 +27,7 @@ private[engines] class HttpNettyServerProvider(conf: HttpServerConfig, service:
}

private[engines] class HttpPipelineFactory(protocol: String, host: String, port: Int, chunkSize: Int, compression: Option[CompressionLevel],
service: AsyncHttpService[ByteChunk], channelGroup: ChannelGroup,
service: AsyncHttpService[ByteChunk, ByteChunk], channelGroup: ChannelGroup,
executionContext: ExecutionContext) extends ChannelPipelineFactory with Logging {
def getPipeline: ChannelPipeline = {
val pipeline = Channels.pipeline()
Expand Down
Expand Up @@ -61,7 +61,7 @@ import scala.collection.mutable.{HashSet, SynchronizedSet}
*
* TODO: Pass health monitor to the request handler to report on Netty errors.
*/
private[engines] class HttpServiceUpstreamHandler(service: AsyncHttpService[ByteChunk], executionContext: ExecutionContext) extends SimpleChannelUpstreamHandler with Logging {
private[engines] class HttpServiceUpstreamHandler(service: AsyncHttpService[ByteChunk, ByteChunk], executionContext: ExecutionContext) extends SimpleChannelUpstreamHandler with Logging {
private val pendingResponses = new HashSet[Future[HttpResponse[ByteChunk]]] with SynchronizedSet[Future[HttpResponse[ByteChunk]]]
private implicit val M: Monad[Future] = new FutureMonad(executionContext)

Expand Down
Expand Up @@ -16,7 +16,7 @@ import org.streum.configrity.Configuration

import HttpServerConfig._

private[engines] class HttpsNettyServerProvider(conf: HttpServerConfig, service: AsyncHttpService[ByteChunk], executionContext: ExecutionContext) extends AbstractNettyServerProvider {
private[engines] class HttpsNettyServerProvider(conf: HttpServerConfig, service: AsyncHttpService[ByteChunk, ByteChunk], executionContext: ExecutionContext) extends AbstractNettyServerProvider {
def pipelineFactory(channelGroup: ChannelGroup) = {
new HttpsPipelineFactory("https", conf.host, conf.sslPort, conf.chunkSize, conf.compressionLevel, service, channelGroup, conf.config, executionContext)
}
Expand All @@ -29,7 +29,7 @@ private[engines] class HttpsNettyServerProvider(conf: HttpServerConfig, service:
}

private[engines] class HttpsPipelineFactory(protocol: String, host: String, port: Int, chunkSize: Int, compression: Option[CompressionLevel],
requestHandler: AsyncHttpService[ByteChunk], channelGroup: ChannelGroup,
requestHandler: AsyncHttpService[ByteChunk, ByteChunk], channelGroup: ChannelGroup,
config: Configuration, //TODO: Use of Configuration here is bogus
executionContext: ExecutionContext)
extends HttpPipelineFactory(protocol: String, host, port, chunkSize, compression, requestHandler, channelGroup, executionContext) {
Expand Down
Expand Up @@ -11,7 +11,7 @@ trait NettyEngine extends AbstractNettyEngine { self =>
type HttpServer <: NettyHttpServer

abstract class NettyHttpServer(rootConfig: Configuration, services: List[Service[ByteChunk, _]], executor: ExecutionContext) extends AbstractNettyHttpServer(rootConfig, services, executor) { self =>
protected def nettyServers(service: AsyncHttpService[ByteChunk]) = {
protected def nettyServers(service: AsyncHttpService[ByteChunk, ByteChunk]) = {
val httpProvider = new HttpNettyServerProvider(self.config, service, executor)
val httpServer = new NettyServer(httpProvider)
if (self.config.sslEnable) {
Expand Down
Expand Up @@ -23,7 +23,7 @@ import scalaz._
import scala.collection.mutable.{HashSet, SynchronizedSet}

abstract class ServletEngine extends HttpServlet with HttpServerModule with HttpServletConverters {
private var service: AsyncHttpService[ByteChunk] = null
private var service: AsyncHttpService[ByteChunk, ByteChunk] = null
private var stopTimeout: Timeout = null
private var stoppable: Option[Stoppable] = null
private var _executionContext: ExecutionContext = null
Expand Down
Expand Up @@ -29,7 +29,7 @@ abstract class BlueEyesServiceSpecification extends Specification with FutureMat
private val NotFound = HttpResponse[ByteChunk](HttpStatus(HttpStatusCodes.NotFound))
private val mockSwitch = sys.props.get(Environment.MockSwitch)

private var _service: AsyncHttpService[ByteChunk] = _
private var _service: AsyncHttpService[ByteChunk, ByteChunk] = _
private var _stoppable: Option[Stoppable] = None

implicit def executionContext: ExecutionContext
Expand Down

0 comments on commit 970326a

Please sign in to comment.