Skip to content

Commit

Permalink
Streaming Response (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
d11-amitsingh committed Apr 19, 2021
1 parent 6f2b83c commit ec9dc28
Show file tree
Hide file tree
Showing 17 changed files with 142 additions and 85 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ You can checkout more examples in the examples project —
- [Simple Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/HelloWorld.scala)
- [Advanced Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/HelloWorldAdvanced.scala)
- [WebSocket Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/SocketEchoServer.scala)
- [Streaming Response](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/StreamingResponse.scala)
- [Simple Client](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/SimpleClient.scala)

# Installation
Expand Down
7 changes: 4 additions & 3 deletions example/src/main/scala/SimpleClient.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import zhttp.http.HttpContent
import zhttp.http.HttpData
import zhttp.service.{ChannelFactory, Client, EventLoopGroup}
import zio._

Expand All @@ -9,8 +9,9 @@ object SimpleClient extends App {
res <- Client.request("https://api.github.com/users/zio/repos")
_ <- console.putStrLn {
res.content match {
case HttpContent.Complete(data) => data.map(_.toChar).mkString
case HttpContent.Chunked(_) => "<Chunked>"
case HttpData.CompleteData(data) => data.map(_.toChar).mkString
case HttpData.StreamData(_) => "<Chunked>"
case HttpData.Empty => ""
}
}
} yield ()
Expand Down
33 changes: 33 additions & 0 deletions example/src/main/scala/StreamingResponse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import zhttp.http._
import zhttp.service.Server
import zio._
import zio.stream.ZStream

/**
* Example to encode content using a ZStream
*/
object StreamingResponse extends App {
// Create a message as a Chunk[Byte]
val message = Chunk.fromArray("Hello world !\r\n".getBytes(HTTP_CHARSET))

// Use `Http.collect` to match on route
val app = Http.collect {

// Simple (non-stream) based route
case Method.GET -> Root / "health" => Response.ok

// ZStream powered response
case Method.GET -> Root / "stream" =>
Response.http(
status = Status.OK,
headers = List(Header.contentLength(message.length.toLong)),
content = HttpData.fromStream(ZStream.succeed(message)), // Encoding content using a ZStream
)

}
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {

// Starting the server (for more advanced startup configuration checkout `HelloWorldAdvanced`)
Server.start(8090, app.silent).exitCode
}
}
2 changes: 2 additions & 0 deletions zio-http/src/main/scala/zhttp/http/Header.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object Header {
val contentTypeXml: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_XML)
val contentTypeXhtmlXml: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_XHTML)
val contentTypeTextPlain: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.TEXT_PLAIN)
val transferEncodingChunked: Header = Header(JHttpHeaderNames.TRANSFER_ENCODING, JHttpHeaderValues.CHUNKED)
def contentLength(size: Long): Header = Header(JHttpHeaderNames.CONTENT_LENGTH, size.toString)
val contentTypeFormUrlEncoded: Header =
Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED)

Expand Down
21 changes: 0 additions & 21 deletions zio-http/src/main/scala/zhttp/http/HttpContent.scala

This file was deleted.

35 changes: 35 additions & 0 deletions zio-http/src/main/scala/zhttp/http/HttpData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package zhttp.http

import io.netty.buffer.ByteBuf
import zio.Chunk
import zio.stream.ZStream

/**
* Content holder for Requests and Responses
*/
sealed trait HttpData[-R, +E] extends Product with Serializable

object HttpData {
case object Empty extends HttpData[Any, Nothing]
final case class CompleteData(data: Chunk[Byte]) extends HttpData[Any, Nothing]
final case class StreamData[R, E](data: ZStream[R, E, Chunk[Byte]]) extends HttpData[R, E]

/**
* Helper to create CompleteData from ByteBuf
*/
def fromByteBuf(byteBuf: ByteBuf): HttpData[Any, Nothing] = {
val bytes = new Array[Byte](byteBuf.readableBytes)
byteBuf.readBytes(bytes)
HttpData.CompleteData(Chunk.fromArray(bytes))
}

/**
* Helper to create StreamData from Stream of Chunks
*/
def fromStream[R, E](data: ZStream[R, E, Chunk[Byte]]): HttpData[R, E] = HttpData.StreamData(data)

/**
* Helper to create Empty HttpData
*/
def empty: HttpData[Any, Nothing] = Empty
}
11 changes: 4 additions & 7 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package zhttp.http

import zio.Chunk

// REQUEST
final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.empty) { self =>
val headers: List[Header] = data.headers
Expand All @@ -10,16 +8,15 @@ final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.e
val route: Route = method -> url.path

def getBodyAsString: Option[String] = data.content match {
case HttpContent.Complete(data) => Option(data.map(_.toChar).mkString)
case _ => Option.empty
case HttpData.CompleteData(data) => Option(data.map(_.toChar).mkString)
case _ => Option.empty
}

}

object Request {
val emptyContent: HttpContent.Complete[Byte] = HttpContent.Complete(Chunk.empty)
final case class Data(headers: List[Header], content: HttpContent[Any, Byte])
final case class Data(headers: List[Header], content: HttpData[Any, Nothing])
object Data {
val empty: Data = Data(Nil, emptyContent)
val empty: Data = Data(Nil, HttpData.empty)
}
}
4 changes: 2 additions & 2 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ sealed trait Response[-R, +E] extends Product with Serializable { self => }

object Response extends ResponseOps {
// Constructors
final case class HttpResponse[R](status: Status, headers: List[Header], content: HttpContent[R, Byte])
extends Response[R, Nothing]
final case class HttpResponse[-R, +E](status: Status, headers: List[Header], content: HttpData[R, E])
extends Response[R, E]
final case class SocketResponse[-R, +E](socket: Socket[R, E]) extends Response[R, E]
}
15 changes: 7 additions & 8 deletions zio-http/src/main/scala/zhttp/http/ResponseOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ import java.io.{PrintWriter, StringWriter}
trait ResponseOps {
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val emptyContent = HttpContent.Complete(Chunk.empty)

// Helpers

/**
* Creates a new Http Response
*/
def http[R](
def http[R, E](
status: Status = defaultStatus,
headers: List[Header] = defaultHeaders,
content: HttpContent[R, Byte] = emptyContent,
): Response.HttpResponse[R] =
content: HttpData[R, E] = HttpData.empty,
): Response.HttpResponse[R, E] =
HttpResponse(status, headers, content)

/**
Expand All @@ -34,15 +33,15 @@ trait ResponseOps {
http(
error.status,
Nil,
HttpContent.Complete(cause.cause match {
HttpData.CompleteData(cause.cause match {
case Some(throwable) =>
val sw = new StringWriter
throwable.printStackTrace(new PrintWriter(sw))
Chunk.fromArray(s"${cause.message}:\n${sw.toString}".getBytes(HTTP_CHARSET))
case None => Chunk.fromArray(s"${cause.message}".getBytes(HTTP_CHARSET))
}),
)
case _ => http(error.status, Nil, HttpContent.Complete(Chunk.fromArray(error.message.getBytes(HTTP_CHARSET))))
case _ => http(error.status, Nil, HttpData.CompleteData(Chunk.fromArray(error.message.getBytes(HTTP_CHARSET))))
}

}
Expand All @@ -51,13 +50,13 @@ trait ResponseOps {

def text(text: String): UResponse =
http(
content = HttpContent.Complete(Chunk.fromArray(text.getBytes(HTTP_CHARSET))),
content = HttpData.CompleteData(Chunk.fromArray(text.getBytes(HTTP_CHARSET))),
headers = List(Header.contentTypeTextPlain),
)

def jsonString(data: String): UResponse =
http(
content = HttpContent.Complete(Chunk.fromArray(data.getBytes(HTTP_CHARSET))),
content = HttpData.CompleteData(Chunk.fromArray(data.getBytes(HTTP_CHARSET))),
headers = List(Header.contentTypeJson),
)

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zhttp/http/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package object http extends PathModule with RequestSyntax {
type Route = (Method, Path)
type SilentResponse[-E] = CanBeSilenced[E, UResponse]
type UResponse = Response[Any, Nothing]
type UHttpResponse = Response.HttpResponse[Any]
type UHttpResponse = Response.HttpResponse[Any, Nothing]
type ResponseM[-R, +E] = ZIO[R, E, Response[R, E]]
type PartialRequest[+E] = CanSupportPartial[Request, E]

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zhttp/service/DecodeJRequest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait DecodeJRequest {
endpoint = method -> url
data = Request.Data(
headers,
HttpContent.complete(jReq.content()),
HttpData.fromByteBuf(jReq.content()),
)
} yield Request(endpoint, data)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ trait DecodeJResponse {
def decodeJResponse(jRes: JFullHttpResponse): Either[Throwable, UHttpResponse] = Try {
val status = Status.fromJHttpResponseStatus(jRes.status())
val headers = Header.parse(jRes.headers())
val content = HttpContent.complete(jRes.content())
val content = HttpData.fromByteBuf(jRes.content())

Response.http(status, headers, content): UHttpResponse
}.toEither
Expand Down
35 changes: 18 additions & 17 deletions zio-http/src/main/scala/zhttp/service/EncodeResponse.scala
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
package zhttp.service

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core.{JDefaultFullHttpResponse, JDefaultHttpHeaders, JFullHttpResponse, JHttpHeaders}
import zhttp.http.{HttpContent, Response}
import io.netty.handler.codec.http.{
DefaultHttpResponse => JDefaultHttpResponse,
HttpHeaderNames => JHttpHeaderNames,
HttpVersion => JHttpVersion,
}
import zhttp.core.{JDefaultHttpHeaders, JHttpHeaders}
import zhttp.http.{HttpData, Response}

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

trait EncodeResponse {
private val SERVER_NAME: String = "ZIO-Http"
private val jTrailingHeaders = new JDefaultHttpHeaders(false)

/**
* Encode the [[zhttp.http.UHttpResponse]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def encodeResponse[R](jVersion: JHttpVersion, res: Response.HttpResponse[R]): JFullHttpResponse = {
val jHttpHeaders =
def encodeResponse[R, E](jVersion: JHttpVersion, res: Response.HttpResponse[R, E]): JDefaultHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
val jStatus = res.status.toJHttpStatus
res.content match {
case HttpData.CompleteData(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length)
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data.toArray)

case _ =>
case HttpData.StreamData(_) => ()

case HttpData.Empty =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(jVersion, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
new JDefaultHttpResponse(jVersion, jStatus, jHttpHeaders)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ import zio.{Exit, Fiber, URIO, ZIO}
*/
final class UnsafeChannelExecutor[R](runtime: zio.Runtime[R]) {
def unsafeExecute_(ctx: JChannelHandlerContext)(program: ZIO[R, Throwable, Unit]): Unit = {
unsafeExecute(ctx, program)(_ => ())
unsafeExecute(ctx, program)({
case Exit.Success(_) => ()
case Exit.Failure(cause) =>
cause.failureOption match {
case Some(error: Throwable) => ctx.fireExceptionCaught(error)
case _ => ()
}
ctx.close()
()
})
}

def unsafeExecute[A](ctx: JChannelHandlerContext, program: ZIO[R, Throwable, A])(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package zhttp.service.server

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.websocketx.{WebSocketServerProtocolHandler => JWebSocketServerProtocolHandler}
import io.netty.handler.codec.http.{LastHttpContent => JLastHttpContent}
import zhttp.core._
import zhttp.http._
import zhttp.service._
import zio.Exit
import zio.stream.ZStream

/**
* Helper class with channel methods
Expand Down Expand Up @@ -53,11 +56,23 @@ final case class ServerRequestHandler[R](
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest): Unit = {
executeAsync(ctx, jReq) {
case res @ Response.HttpResponse(_, _, _) =>
ctx.writeAndFlush(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
case res @ Response.HttpResponse(_, _, content) =>
ctx.write(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
releaseOrIgnore(jReq)
content match {
case HttpData.StreamData(data) =>
zExec.unsafeExecute_(ctx) {
(data.map(c => JUnpooled.copiedBuffer(c.toArray)) ++ ZStream(JLastHttpContent.EMPTY_LAST_CONTENT))
.mapM(t => ChannelFuture.unit(ctx.writeAndFlush(t)))
.runDrain
}
case HttpData.CompleteData(data) =>
ctx.writeAndFlush(JUnpooled.copiedBuffer(data.toArray), ctx.channel().voidPromise())
case HttpData.Empty => ctx.flush()
}
()
case res @ Response.SocketResponse(_) =>

case res @ Response.SocketResponse(_) =>
ctx
.channel()
.pipeline()
Expand Down

0 comments on commit ec9dc28

Please sign in to comment.