Skip to content

Commit

Permalink
Refactor: Server (#54)
Browse files Browse the repository at this point in the history
* refactor(core): update AliasModule

add JHttpVersion

* refactor(service): create a common codec layer for http message

* refactor(service): simplify request handler
  • Loading branch information
Tushar Mathur committed Mar 19, 2021
1 parent b5a6daf commit 00a76d1
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 104 deletions.
1 change: 1 addition & 0 deletions zio-http/src/main/scala/zhttp/core/AliasModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ trait AliasModule {
type JHttpMethod = jHttp.HttpMethod
type JHttpScheme = jHttp.HttpScheme
type JHttpHeaders = jHttp.HttpHeaders
type JHttpVersion = jHttp.HttpVersion
type JHttpRequest = jHttp.HttpRequest
type JDefaultHttpHeaders = jHttp.DefaultHttpHeaders
type JFullHttpRequest = jHttp.FullHttpRequest
Expand Down
21 changes: 0 additions & 21 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package zhttp.http

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.HttpVersion
import zhttp.core.{JDefaultFullHttpRequest, JFullHttpRequest}
import zio.Task

// REQUEST
final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.empty) { self =>
val headers: List[Header] = data.headers
Expand All @@ -17,27 +12,11 @@ final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.e
case _ => Option.empty
}

def asJFullHttpRequest: Task[JFullHttpRequest] = Request.asJFullHttpRequest(self)
}

object Request {

final case class Data(headers: List[Header], content: HttpContent[Any, String])
object Data {
val empty: Data = Data(Nil, HttpContent.Empty)
}

def asJFullHttpRequest(req: Request): Task[JFullHttpRequest] = Task {
val method = req.method.asJHttpMethod
val uri = req.url.asString
val content = req.getBodyAsString match {
case Some(text) => JUnpooled.copiedBuffer(text, HTTP_CHARSET)
case None => JUnpooled.EMPTY_BUFFER
}
val headers = Header.disassemble(req.headers)
val jReq = new JDefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, content)
jReq.headers().set(headers)

jReq
}
}
41 changes: 5 additions & 36 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,23 @@
package zhttp.http

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core.{JDefaultFullHttpResponse, JDefaultHttpHeaders, JFullHttpResponse, JHttpHeaders}
import zhttp.core.JFullHttpResponse
import zhttp.socket.WebSocketFrame
import zio.Task
import zio.stream.ZStream

import java.io.{PrintWriter, StringWriter}
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

// RESPONSE
sealed trait Response extends Product with Serializable { self => }

object Response {
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val defaultContent = HttpContent.Empty
private val jTrailingHeaders = new JDefaultHttpHeaders(false)
private val SERVER_NAME: String = "ZIO-Http"
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val defaultContent = HttpContent.Empty

// Constructors
final case class HttpResponse(status: Status, headers: List[Header], content: HttpContent[Any, String])
extends Response { res =>

/**
* Encode the [[Response]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def toJFullHttpResponse: JFullHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length())
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data, HTTP_CHARSET)

case _ =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(JHttpVersion.HTTP_1_1, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
}
}
extends Response

final case class SocketResponse(
socket: WebSocketFrame => ZStream[Any, Nothing, WebSocketFrame],
Expand Down
12 changes: 7 additions & 5 deletions zio-http/src/main/scala/zhttp/service/Client.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package zhttp.service

import io.netty.handler.codec.http.{HttpVersion => JHttpVersion}
import zhttp.core._
import zhttp.http.{Request, Response}
import zhttp.service
Expand All @@ -8,7 +9,8 @@ import zio.{Promise, Task, ZIO}

import java.net.InetSocketAddress

final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JChannel], el: JEventLoopGroup) {
final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JChannel], el: JEventLoopGroup)
extends HttpMessageCodec {
private def asyncRequest(
req: Request,
jReq: JFullHttpRequest,
Expand All @@ -29,10 +31,10 @@ final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JCha

def request(request: Request): Task[Response] = for {
promise <- Promise.make[Throwable, JFullHttpResponse]
jReq <- request.asJFullHttpRequest
_ <- asyncRequest(request, jReq, promise).catchAll(cause => promise.fail(cause)).fork
jRes <- promise.await
res <- Response.fromJFullHttpResponse(jRes)
jReq = encodeRequest(JHttpVersion.HTTP_1_1, request)
_ <- asyncRequest(request, jReq, promise).catchAll(cause => promise.fail(cause)).fork
jRes <- promise.await
res <- Response.fromJFullHttpResponse(jRes)
} yield res
}

Expand Down
67 changes: 67 additions & 0 deletions zio-http/src/main/scala/zhttp/service/HttpMessageCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package zhttp.service

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core._
import zhttp.http._

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

trait HttpMessageCodec {
private val jTrailingHeaders = new JDefaultHttpHeaders(false)
private val SERVER_NAME: String = "ZIO-Http"

/**
* Tries to decode the [io.netty.handler.codec.http.FullHttpRequest] to [Request].
*/
def decodeJRequest(jReq: JFullHttpRequest): Request = {
val url = URL(Path(jReq.uri()))
val method = Method.fromJHttpMethod(jReq.method())
val headers = Header.make(jReq.headers())
val endpoint = method -> url
val data = Request.Data(headers, HttpContent.Complete(jReq.content().toString(HTTP_CHARSET)))
Request(endpoint, data)
}

/**
* Encode the [[Response]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def encodeResponse(jVersion: JHttpVersion, res: Response.HttpResponse): JFullHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length())
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data, HTTP_CHARSET)

case _ =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(jVersion, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
}

/**
* Converts Request to JFullHttpRequest
*/
def encodeRequest(jVersion: JHttpVersion, req: Request): JFullHttpRequest = {
val method = req.method.asJHttpMethod
val uri = req.url.asString
val content = req.getBodyAsString match {
case Some(text) => JUnpooled.copiedBuffer(text, HTTP_CHARSET)
case None => JUnpooled.EMPTY_BUFFER
}
val headers = Header.disassemble(req.headers)
val jReq = new JDefaultFullHttpRequest(jVersion, method, uri, content)
jReq.headers().set(headers)

jReq
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final case class ServerRequestHandler[R](
zExec: UnsafeChannelExecutor[R],
app: HttpApp[R, Nothing],
) extends JSimpleChannelInboundHandler[JFullHttpRequest](AUTO_RELEASE_REQUEST)
with ServerJHttpRequestDecoder
with HttpMessageCodec
with ServerHttpExceptionHandler {

self =>
Expand Down Expand Up @@ -55,10 +55,27 @@ final case class ServerRequestHandler[R](
()
}

def writeAndFlush(ctx: JChannelHandlerContext, jReq: JFullHttpRequest, res: Response): Unit = {
res match {
/**
* Asynchronously executes the Http app and passes the response to the callback.
*/
private def executeAsync(ctx: JChannelHandlerContext, jReq: JFullHttpRequest)(cb: Response => Unit): Unit =
app.eval(decodeJRequest(jReq)) match {
case HttpResult.Success(a) => cb(a)
case HttpResult.Failure(_) => ()
case HttpResult.Continue(z) =>
zExec.unsafeExecute(ctx, z) {
case Exit.Success(res) => cb(res)
case Exit.Failure(_) => ()
}
}

/**
* Unsafe channel reader for HttpRequest
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest): Unit = {
executeAsync(ctx, jReq) {
case res @ Response.HttpResponse(_, _, _) =>
ctx.writeAndFlush(res.asInstanceOf[Response.HttpResponse].toJFullHttpResponse, ctx.channel().voidPromise())
ctx.writeAndFlush(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
releaseOrIgnore(jReq)
()
case res @ Response.SocketResponse(_, _) =>
Expand All @@ -68,25 +85,6 @@ final case class ServerRequestHandler[R](
}
}

/**
* Unsafe channel reader for HttpRequest
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest /* jReq.refCount = 1 */ ): Unit = {
app.eval(unsafelyDecodeJFullHttpRequest(jReq)) match {
case HttpResult.Success(a) =>
self.writeAndFlush(ctx, jReq, a)
()
case HttpResult.Continue(zio) =>
zExec.unsafeExecute(ctx, zio) {
case Exit.Success(res) =>
writeAndFlush(ctx, jReq, res)
()
case _ => ()
}
case _ => ()
}
}

/**
* Handles exceptions that throws
*/
Expand Down

0 comments on commit 00a76d1

Please sign in to comment.