Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming Response #143

Merged
merged 15 commits into from
Apr 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ You can checkout more examples in the examples project —
- [Simple Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/HelloWorld.scala)
- [Advanced Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/HelloWorldAdvanced.scala)
- [WebSocket Server](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/SocketEchoServer.scala)
- [Streaming Response](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/StreamingResponse.scala)
- [Simple Client](https://github.com/dream11/zio-http/blob/main/example/src/main/scala/SimpleClient.scala)

# Installation
Expand Down
7 changes: 4 additions & 3 deletions example/src/main/scala/SimpleClient.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import zhttp.http.HttpContent
import zhttp.http.HttpData
import zhttp.service.{ChannelFactory, Client, EventLoopGroup}
import zio._

Expand All @@ -9,8 +9,9 @@ object SimpleClient extends App {
res <- Client.request("https://api.github.com/users/zio/repos")
_ <- console.putStrLn {
res.content match {
case HttpContent.Complete(data) => data.map(_.toChar).mkString
case HttpContent.Chunked(_) => "<Chunked>"
case HttpData.CompleteData(data) => data.map(_.toChar).mkString
case HttpData.StreamData(_) => "<Chunked>"
case HttpData.Empty => ""
}
}
} yield ()
Expand Down
33 changes: 33 additions & 0 deletions example/src/main/scala/StreamingResponse.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import zhttp.http._
import zhttp.service.Server
import zio._
import zio.stream.ZStream

/**
* Example to encode content using a ZStream
*/
object StreamingResponse extends App {
// Create a message as a Chunk[Byte]
val message = Chunk.fromArray("Hello world !\r\n".getBytes(HTTP_CHARSET))

// Use `Http.collect` to match on route
val app = Http.collect {

// Simple (non-stream) based route
case Method.GET -> Root / "health" => Response.ok

// ZStream powered response
case Method.GET -> Root / "stream" =>
Response.http(
status = Status.OK,
headers = List(Header.contentLength(message.length.toLong)),
content = HttpData.fromStream(ZStream.succeed(message)), // Encoding content using a ZStream
)

}
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {

// Starting the server (for more advanced startup configuration checkout `HelloWorldAdvanced`)
Server.start(8090, app.silent).exitCode
}
}
tusharmath marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions zio-http/src/main/scala/zhttp/http/Header.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ object Header {
val contentTypeXml: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_XML)
val contentTypeXhtmlXml: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_XHTML)
val contentTypeTextPlain: Header = Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.TEXT_PLAIN)
val transferEncodingChunked: Header = Header(JHttpHeaderNames.TRANSFER_ENCODING, JHttpHeaderValues.CHUNKED)
def contentLength(size: Long): Header = Header(JHttpHeaderNames.CONTENT_LENGTH, size.toString)
val contentTypeFormUrlEncoded: Header =
Header(JHttpHeaderNames.CONTENT_TYPE, JHttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED)

Expand Down
21 changes: 0 additions & 21 deletions zio-http/src/main/scala/zhttp/http/HttpContent.scala

This file was deleted.

35 changes: 35 additions & 0 deletions zio-http/src/main/scala/zhttp/http/HttpData.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package zhttp.http

import io.netty.buffer.ByteBuf
import zio.Chunk
import zio.stream.ZStream

/**
* Content holder for Requests and Responses
*/
sealed trait HttpData[-R, +E] extends Product with Serializable

object HttpData {
case object Empty extends HttpData[Any, Nothing]
final case class CompleteData(data: Chunk[Byte]) extends HttpData[Any, Nothing]
final case class StreamData[R, E](data: ZStream[R, E, Chunk[Byte]]) extends HttpData[R, E]

/**
* Helper to create CompleteData from ByteBuf
*/
def fromByteBuf(byteBuf: ByteBuf): HttpData[Any, Nothing] = {
val bytes = new Array[Byte](byteBuf.readableBytes)
byteBuf.readBytes(bytes)
HttpData.CompleteData(Chunk.fromArray(bytes))
}

/**
* Helper to create StreamData from Stream of Chunks
*/
def fromStream[R, E](data: ZStream[R, E, Chunk[Byte]]): HttpData[R, E] = HttpData.StreamData(data)

/**
* Helper to create Empty HttpData
*/
def empty: HttpData[Any, Nothing] = Empty
}
11 changes: 4 additions & 7 deletions zio-http/src/main/scala/zhttp/http/Request.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package zhttp.http

import zio.Chunk

// REQUEST
final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.empty) { self =>
val headers: List[Header] = data.headers
Expand All @@ -10,16 +8,15 @@ final case class Request(endpoint: Endpoint, data: Request.Data = Request.Data.e
val route: Route = method -> url.path

def getBodyAsString: Option[String] = data.content match {
case HttpContent.Complete(data) => Option(data.map(_.toChar).mkString)
case _ => Option.empty
case HttpData.CompleteData(data) => Option(data.map(_.toChar).mkString)
case _ => Option.empty
}

}

object Request {
val emptyContent: HttpContent.Complete[Byte] = HttpContent.Complete(Chunk.empty)
final case class Data(headers: List[Header], content: HttpContent[Any, Byte])
final case class Data(headers: List[Header], content: HttpData[Any, Nothing])
object Data {
val empty: Data = Data(Nil, emptyContent)
val empty: Data = Data(Nil, HttpData.empty)
}
}
4 changes: 2 additions & 2 deletions zio-http/src/main/scala/zhttp/http/Response.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ sealed trait Response[-R, +E] extends Product with Serializable { self => }

object Response extends ResponseOps {
// Constructors
final case class HttpResponse[R](status: Status, headers: List[Header], content: HttpContent[R, Byte])
extends Response[R, Nothing]
final case class HttpResponse[-R, +E](status: Status, headers: List[Header], content: HttpData[R, E])
extends Response[R, E]
final case class SocketResponse[-R, +E](socket: Socket[R, E]) extends Response[R, E]
}
15 changes: 7 additions & 8 deletions zio-http/src/main/scala/zhttp/http/ResponseOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@ import java.io.{PrintWriter, StringWriter}
trait ResponseOps {
private val defaultStatus = Status.OK
private val defaultHeaders = Nil
private val emptyContent = HttpContent.Complete(Chunk.empty)

// Helpers

/**
* Creates a new Http Response
*/
def http[R](
def http[R, E](
status: Status = defaultStatus,
headers: List[Header] = defaultHeaders,
content: HttpContent[R, Byte] = emptyContent,
): Response.HttpResponse[R] =
content: HttpData[R, E] = HttpData.empty,
): Response.HttpResponse[R, E] =
HttpResponse(status, headers, content)

/**
Expand All @@ -34,15 +33,15 @@ trait ResponseOps {
http(
error.status,
Nil,
HttpContent.Complete(cause.cause match {
HttpData.CompleteData(cause.cause match {
case Some(throwable) =>
val sw = new StringWriter
throwable.printStackTrace(new PrintWriter(sw))
Chunk.fromArray(s"${cause.message}:\n${sw.toString}".getBytes(HTTP_CHARSET))
case None => Chunk.fromArray(s"${cause.message}".getBytes(HTTP_CHARSET))
}),
)
case _ => http(error.status, Nil, HttpContent.Complete(Chunk.fromArray(error.message.getBytes(HTTP_CHARSET))))
case _ => http(error.status, Nil, HttpData.CompleteData(Chunk.fromArray(error.message.getBytes(HTTP_CHARSET))))
}

}
Expand All @@ -51,13 +50,13 @@ trait ResponseOps {

def text(text: String): UResponse =
http(
content = HttpContent.Complete(Chunk.fromArray(text.getBytes(HTTP_CHARSET))),
content = HttpData.CompleteData(Chunk.fromArray(text.getBytes(HTTP_CHARSET))),
headers = List(Header.contentTypeTextPlain),
)

def jsonString(data: String): UResponse =
http(
content = HttpContent.Complete(Chunk.fromArray(data.getBytes(HTTP_CHARSET))),
content = HttpData.CompleteData(Chunk.fromArray(data.getBytes(HTTP_CHARSET))),
headers = List(Header.contentTypeJson),
)

Expand Down
2 changes: 1 addition & 1 deletion zio-http/src/main/scala/zhttp/http/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package object http extends PathModule with RequestSyntax {
type Route = (Method, Path)
type SilentResponse[-E] = CanBeSilenced[E, UResponse]
type UResponse = Response[Any, Nothing]
type UHttpResponse = Response.HttpResponse[Any]
type UHttpResponse = Response.HttpResponse[Any, Nothing]
type ResponseM[-R, +E] = ZIO[R, E, Response[R, E]]
type PartialRequest[+E] = CanSupportPartial[Request, E]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ trait DecodeJRequest {
endpoint = method -> url
data = Request.Data(
headers,
HttpContent.complete(jReq.content()),
HttpData.fromByteBuf(jReq.content()),
)
} yield Request(endpoint, data)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ trait DecodeJResponse {
def decodeJResponse(jRes: JFullHttpResponse): Either[Throwable, UHttpResponse] = Try {
val status = Status.fromJHttpResponseStatus(jRes.status())
val headers = Header.parse(jRes.headers())
val content = HttpContent.complete(jRes.content())
val content = HttpData.fromByteBuf(jRes.content())

Response.http(status, headers, content): UHttpResponse
}.toEither
Expand Down
35 changes: 18 additions & 17 deletions zio-http/src/main/scala/zhttp/service/EncodeResponse.scala
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
package zhttp.service

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.{HttpHeaderNames => JHttpHeaderNames, HttpVersion => JHttpVersion}
import zhttp.core.{JDefaultFullHttpResponse, JDefaultHttpHeaders, JFullHttpResponse, JHttpHeaders}
import zhttp.http.{HttpContent, Response}
import io.netty.handler.codec.http.{
DefaultHttpResponse => JDefaultHttpResponse,
HttpHeaderNames => JHttpHeaderNames,
HttpVersion => JHttpVersion,
}
import zhttp.core.{JDefaultHttpHeaders, JHttpHeaders}
import zhttp.http.{HttpData, Response}

import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter

trait EncodeResponse {
private val SERVER_NAME: String = "ZIO-Http"
private val jTrailingHeaders = new JDefaultHttpHeaders(false)

/**
* Encode the [[zhttp.http.UHttpResponse]] to [io.netty.handler.codec.http.FullHttpResponse]
*/
def encodeResponse[R](jVersion: JHttpVersion, res: Response.HttpResponse[R]): JFullHttpResponse = {
val jHttpHeaders =
def encodeResponse[R, E](jVersion: JHttpVersion, res: Response.HttpResponse[R, E]): JDefaultHttpResponse = {
val jHttpHeaders =
res.headers.foldLeft[JHttpHeaders](new JDefaultHttpHeaders()) { (jh, hh) =>
jh.set(hh.name, hh.value)
}
val jStatus = res.status.toJHttpStatus
val jContentBytBuf = res.content match {
case HttpContent.Complete(data) =>
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
val jStatus = res.status.toJHttpStatus
res.content match {
case HttpData.CompleteData(data) =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, data.length)
jHttpHeaders.set(JHttpHeaderNames.SERVER, SERVER_NAME)
jHttpHeaders.set(JHttpHeaderNames.DATE, s"${DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now)}")
JUnpooled.copiedBuffer(data.toArray)

case _ =>
case HttpData.StreamData(_) => ()

case HttpData.Empty =>
jHttpHeaders.set(JHttpHeaderNames.CONTENT_LENGTH, 0)
JUnpooled.buffer(0)
}

new JDefaultFullHttpResponse(jVersion, jStatus, jContentBytBuf, jHttpHeaders, jTrailingHeaders)
new JDefaultHttpResponse(jVersion, jStatus, jHttpHeaders)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,16 @@ import zio.{Exit, Fiber, URIO, ZIO}
*/
final class UnsafeChannelExecutor[R](runtime: zio.Runtime[R]) {
def unsafeExecute_(ctx: JChannelHandlerContext)(program: ZIO[R, Throwable, Unit]): Unit = {
unsafeExecute(ctx, program)(_ => ())
unsafeExecute(ctx, program)({
case Exit.Success(_) => ()
case Exit.Failure(cause) =>
cause.failureOption match {
case Some(error: Throwable) => ctx.fireExceptionCaught(error)
case _ => ()
}
ctx.close()
()
})
}

def unsafeExecute[A](ctx: JChannelHandlerContext, program: ZIO[R, Throwable, A])(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package zhttp.service.server

import io.netty.buffer.{Unpooled => JUnpooled}
import io.netty.handler.codec.http.websocketx.{WebSocketServerProtocolHandler => JWebSocketServerProtocolHandler}
import io.netty.handler.codec.http.{LastHttpContent => JLastHttpContent}
import zhttp.core._
import zhttp.http._
import zhttp.service._
import zio.Exit
import zio.stream.ZStream

/**
* Helper class with channel methods
Expand Down Expand Up @@ -53,11 +56,23 @@ final case class ServerRequestHandler[R](
*/
override def channelRead0(ctx: JChannelHandlerContext, jReq: JFullHttpRequest): Unit = {
executeAsync(ctx, jReq) {
case res @ Response.HttpResponse(_, _, _) =>
ctx.writeAndFlush(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
case res @ Response.HttpResponse(_, _, content) =>
ctx.write(encodeResponse(jReq.protocolVersion(), res), ctx.channel().voidPromise())
releaseOrIgnore(jReq)
content match {
tusharmath marked this conversation as resolved.
Show resolved Hide resolved
case HttpData.StreamData(data) =>
zExec.unsafeExecute_(ctx) {
(data.map(c => JUnpooled.copiedBuffer(c.toArray)) ++ ZStream(JLastHttpContent.EMPTY_LAST_CONTENT))
.mapM(t => ChannelFuture.unit(ctx.writeAndFlush(t)))
.runDrain
}
case HttpData.CompleteData(data) =>
ctx.writeAndFlush(JUnpooled.copiedBuffer(data.toArray), ctx.channel().voidPromise())
case HttpData.Empty => ctx.flush()
}
()
case res @ Response.SocketResponse(_) =>

case res @ Response.SocketResponse(_) =>
ctx
.channel()
.pipeline()
Expand Down