Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Server #54

Merged
merged 3 commits into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions zio-http/src/main/scala/zhttp/core/AliasModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ trait AliasModule {
type JHttpMethod = jHttp.HttpMethod
type JHttpScheme = jHttp.HttpScheme
type JHttpHeaders = jHttp.HttpHeaders
type JHttpVersion = jHttp.HttpVersion
type JHttpRequest = jHttp.HttpRequest
type JDefaultHttpHeaders = jHttp.DefaultHttpHeaders
type JFullHttpRequest = jHttp.FullHttpRequest
Expand Down
21 changes: 0 additions & 21 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
package zhttp.http

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.HttpVersion
import zhttp.core.{JDefaultFullHttpRequest, JFullHttpRequest}
import zio.Task

// REQUEST
final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.empty) { self =>
val headers: List[Header] = data.headers
Expand All @@ -17,27 +12,11 @@ final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.e
case _ => Option.empty
}

def asJFullHttpRequest: Task[JFullHttpRequest] = Request.asJFullHttpRequest(self)
}

object Request {

final case class Data(headers: List[Header], content: HttpContent[Any, String])
object Data {
val empty: Data = Data(Nil, HttpContent.Empty)
}

def asJFullHttpRequest(req: Request): Task[JFullHttpRequest] = Task {
val method = req.method.asJHttpMethod
val uri = req.url.asString
val content = req.getBodyAsString match {
case Some(text) => JUnpooled.copiedBuffer(text, HTTP_CHARSET)
case None => JUnpooled.EMPTY_BUFFER
}
val headers = Header.disassemble(req.headers)
val jReq = new JDefaultFullHttpRequest(HttpVersion.HTTP_1_1, method, uri, content)
jReq.headers().set(headers)

jReq
}
}
41 changes: 5 additions & 36 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
@@ -1,54 +1,23 @@
package zhttp.http

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core.{JDefaultFullHttpResponse, JDefaultHttpHeaders, JFullHttpResponse, JHttpHeaders}
import zhttp.core.JFullHttpResponse
import zhttp.socket.WebSocketFrame
import zio.Task
import zio.stream.ZStream

import java.io.{PrintWriter, StringWriter}
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

// RESPONSE
sealed trait Response extends Product with Serializable { self => }

object Response {
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val defaultContent = HttpContent.Empty
private val jTrailingHeaders = new JDefaultHttpHeaders(false)
private val SERVER_NAME: String = "ZIO-Http"
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val defaultContent = HttpContent.Empty

// Constructors
final case class HttpResponse(status: Status, headers: List[Header], content: HttpContent[Any, String])
extends Response { res =>

/**
* Encode the [[Response]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def toJFullHttpResponse: JFullHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length())
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data, HTTP_CHARSET)

case _ =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(JHttpVersion.HTTP_1_1, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
}
}
extends Response

final case class SocketResponse(
socket: WebSocketFrame => ZStream[Any, Nothing, WebSocketFrame],
Expand Down
12 changes: 7 additions & 5 deletions zio-http/src/main/scala/zhttp/service/Client.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package zhttp.service

import io.netty.handler.codec.http.{HttpVersion => JHttpVersion}
import zhttp.core._
import zhttp.http.{Request, Response}
import zhttp.service
Expand All @@ -8,7 +9,8 @@ import zio.{Promise, Task, ZIO}

import java.net.InetSocketAddress

final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JChannel], el: JEventLoopGroup) {
final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JChannel], el: JEventLoopGroup)
extends HttpMessageCodec {
private def asyncRequest(
req: Request,
jReq: JFullHttpRequest,
Expand All @@ -29,10 +31,10 @@ final case class Client(zx: UnsafeChannelExecutor[Any], cf: JChannelFactory[JCha

def request(request: Request): Task[Response] = for {
promise <- Promise.make[Throwable, JFullHttpResponse]
jReq <- request.asJFullHttpRequest
_ <- asyncRequest(request, jReq, promise).catchAll(cause => promise.fail(cause)).fork
jRes <- promise.await
res <- Response.fromJFullHttpResponse(jRes)
jReq = encodeRequest(JHttpVersion.HTTP_1_1, request)
_ <- asyncRequest(request, jReq, promise).catchAll(cause => promise.fail(cause)).fork
jRes <- promise.await
res <- Response.fromJFullHttpResponse(jRes)
} yield res
}

Expand Down
67 changes: 67 additions & 0 deletions zio-http/src/main/scala/zhttp/service/HttpMessageCodec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package zhttp.service

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core._
import zhttp.http._

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

trait HttpMessageCodec {
private val jTrailingHeaders = new JDefaultHttpHeaders(false)
private val SERVER_NAME: String = "ZIO-Http"

/**
* Tries to decode the [io.netty.handler.codec.http.FullHttpRequest] to [Request].
*/
def decodeJRequest(jReq: JFullHttpRequest): Request = {
val url = URL(Path(jReq.uri()))
val method = Method.fromJHttpMethod(jReq.method())
val headers = Header.make(jReq.headers())
val endpoint = method -> url
val data = Request.Data(headers, HttpContent.Complete(jReq.content().toString(HTTP_CHARSET)))
Request(endpoint, data)
}

/**
* Encode the [[Response]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def encodeResponse(jVersion: JHttpVersion, res: Response.HttpResponse): JFullHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length())
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data, HTTP_CHARSET)

case _ =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(jVersion, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
}

/**
* Converts Request to JFullHttpRequest
*/
def encodeRequest(jVersion: JHttpVersion, req: Request): JFullHttpRequest = {
val method = req.method.asJHttpMethod
val uri = req.url.asString
val content = req.getBodyAsString match {
case Some(text) => JUnpooled.copiedBuffer(text, HTTP_CHARSET)
case None => JUnpooled.EMPTY_BUFFER
}
val headers = Header.disassemble(req.headers)
val jReq = new JDefaultFullHttpRequest(jVersion, method, uri, content)
jReq.headers().set(headers)

jReq
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final case class ServerRequestHandler[R](
zExec: UnsafeChannelExecutor[R],
app: HttpApp[R, Nothing],
) extends JSimpleChannelInboundHandler[JFullHttpRequest](AUTO_RELEASE_REQUEST)
with ServerJHttpRequestDecoder
with HttpMessageCodec
with ServerHttpExceptionHandler {

self =>
Expand Down Expand Up @@ -55,10 +55,27 @@ final case class ServerRequestHandler[R](
()
}

def writeAndFlush(ctx: JChannelHandlerContext, jReq: JFullHttpRequest, res: Response): Unit = {
res match {
/**
* Asynchronously executes the Http app and passes the response to the callback.
*/
private def executeAsync(ctx: JChannelHandlerContext, jReq: JFullHttpRequest)(cb: Response => Unit): Unit =
app.eval(decodeJRequest(jReq)) match {
case HttpResult.Success(a) => cb(a)
case HttpResult.Failure(_) => ()
case HttpResult.Continue(z) =>
zExec.unsafeExecute(ctx, z) {
case Exit.Success(res) => cb(res)
case Exit.Failure(_) => ()
}
}

/**
* Unsafe channel reader for HttpRequest
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest): Unit = {
executeAsync(ctx, jReq) {
case res @ Response.HttpResponse(_, _, _) =>
ctx.writeAndFlush(res.asInstanceOf[Response.HttpResponse].toJFullHttpResponse, ctx.channel().voidPromise())
ctx.writeAndFlush(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
releaseOrIgnore(jReq)
()
case res @ Response.SocketResponse(_, _) =>
Expand All @@ -68,25 +85,6 @@ final case class ServerRequestHandler[R](
}
}

/**
* Unsafe channel reader for HttpRequest
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest /* jReq.refCount = 1 */ ): Unit = {
app.eval(unsafelyDecodeJFullHttpRequest(jReq)) match {
case HttpResult.Success(a) =>
self.writeAndFlush(ctx, jReq, a)
()
case HttpResult.Continue(zio) =>
zExec.unsafeExecute(ctx, zio) {
case Exit.Success(res) =>
writeAndFlush(ctx, jReq, res)
()
case _ => ()
}
case _ => ()
}
}

/**
* Handles exceptions that throws
*/
Expand Down