Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multipart formdata #114

Closed
wants to merge 1 commit into from
Closed

Multipart formdata #114

wants to merge 1 commit into from

Conversation

DJLemkes
Copy link

@DJLemkes DJLemkes commented Apr 2, 2021

Hi!

I created a multipart/form-data request handler to also support those requests. Although I tried to minimize the amount of changes I couldn't get around changing the Request.Data.content to HttpContent[Any, Byte] instead of HttpContent[Any, String]. You probably have some better ideas about this setup than I do so I just want to put this out here and hear your thoughts on this.

Best,

DJ

val emptyContent: HttpContent.Complete[String] = HttpContent.Complete("")
final case class Data(headers: List[Header], content: HttpContent[Any, String])
val emptyContent: HttpContent.Chunked[Any, Byte] = HttpContent.Chunked(data = ZStream.empty)
final case class Data(headers: List[Header], content: HttpContent[Any, Byte])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing just changing the content to HttpContent[Any, ByteBuf] should allow us to make any kind of request. The parsing can be left to the developer or specific codecs can be provided as a part of the library.

@tusharmath
Copy link
Collaborator

io.netty.Buffer.Bytebuf is a mutable, unsafe data structure and also exposes implementation details. It would be great if we could come up with a performant immutable indirection.

@DJLemkes
Copy link
Author

DJLemkes commented Apr 6, 2021

@tusharmath io.netty.Buffer.Bytebuf is indeed mutable and pretty tricky to work with. Take the messagecodec for example: you need to use this utility function because just using .content.array() will throw exceptions caused by mixed reader and writer indices.

Personally, I like offering HttpContent[Any, Byte] because it allows for lots of flexibility, also for non-text content-type. With some added utils like asString it still pretty easy to use. I do understand though that this train of thought conflicts with the current API. Also, you probably already have ideas on how to offer streaming request bodies as I saw some discussion on Discord? I am happy to wait for those design details and then adjust this PR to that interface? The handler doesn't have to change that much; just the way the data is offered to the zio-http API.

@DJLemkes
Copy link
Author

DJLemkes commented Apr 9, 2021

@tusharmath what are your thoughts on my comment?

Also another question: I see that headers is part of Request.Data and not of Request directly. In the Response, it's directly part of Response and not of its data. I'm probably missing something; what's the reason for that?

@tusharmath
Copy link
Collaborator

I'll take a look at this PR today.

Also another question: I see that headers is part of Request.Data and not of Request directly. In the Response, it's directly part of Response and not of its data. I'm probably missing something; what's the reason for that?

Yes, I also think we don't need Request.Data anymore.

Comment on lines 19 to 20
final case class FileData(name: String, contentType: String, content: ZStream[Blocking, Throwable, Byte])
extends FormDataContent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final case class FileData(name: String, contentType: String, content: ZStream[Blocking, Throwable, Byte])
extends FormDataContent
final case class FileData[R](name: String, contentType: String, content: ZStream[Blocking, Nothing, Chunk[A]])
extends FormDataContent

Why should the stream throw?

// It's set to Blocking for now because we need it when reading from a tmp file.
final case class FileData(name: String, contentType: String, content: ZStream[Blocking, Throwable, Byte])
extends FormDataContent
final case class AttributeData(name: String, content: ZStream[Blocking, Throwable, Byte]) extends FormDataContent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
final case class AttributeData(name: String, content: ZStream[Blocking, Throwable, Byte]) extends FormDataContent
final case class AttributeData[R](name: String, content: ZStream[R, Nothing, Chunk[A]]) extends FormDataContent

}
final case class MultipartFormData(
attributes: Map[String, AttributeData],
files: Map[String, FileData],
Copy link
Collaborator

@tusharmath tusharmath Apr 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the name is there in both FileData and AttributeData do we need a Map here? or vice versa?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thought the Map would be convenient for by-name lookup. Can make it a Seq as well but then you have to do a find. I personally like keeping the name in the Data types as well such that you can pass these around and keep everything together.

@DJLemkes DJLemkes marked this pull request as draft April 28, 2021 06:24
@DJLemkes
Copy link
Author

Rebased and updated. Removing Draft as more structural changes required at the time have landed in main.

I think this Tapir PR is waiting for this.

Note that it seems we're also affected by this Netty bug that should be resolved in the next patch release.

@DJLemkes DJLemkes marked this pull request as ready for review May 16, 2021 21:58
@DJLemkes DJLemkes requested a review from tusharmath May 18, 2021 10:14
@DJLemkes
Copy link
Author

Rebased again such that #220 is included. Netty 4.1.65 resolves the upload speed bug; we're getting proper upload speeds with constant memory usage now.

@tusharmath can you please let me know what you think? There's some shared behaviour with the ServerRequestHandler. I'm reluctant to lift that as you probably have a direction in mind already :).

final case class MultipartFormData(
attributes: Map[String, AttributeData],
files: Map[String, FileData],
) extends HttpData[Any, Nothing]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add Blocking and Throwable to HttpData.

Suggested change
) extends HttpData[Any, Nothing]
) extends HttpData[Blocking, Throwable]

.addLast(HTTP_KEEPALIVE_HANDLER, new HttpServerKeepAliveHandler)
// This handler should always be in front of the `JHttpObjectAggregator` or we won't be able to
// decode multipart/form-data requests correctly.
.addLast(MULTIPART_FORMDATA_HANDLER, hhtpMfdH)
Copy link
Collaborator

@tusharmath tusharmath May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add this handler conditionally ie.

  1. If the request is of multipart type (this is easy to check thru headers)
  2. The user actually wants to parse it as a multipart request. It's possible that the user wants to send a 403 error or something else even before the data is uploaded (There is a way to check this also, see my comment for Response.scala).

Continued #114 (comment)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reviewing!

I see your points. Regarding 1), we let Netty detect whether it's a multipart because they have a built-in decoder already which probably also covers some edge cases. The multipart handler is then removed as soon as we know it's not a multipart request here and here.

Removing a handler is much easier than adding at exactly the right position in the chain. Especially the latter is important as long as we have the aggregator handler.

Curious to see the details on 2)!

.addLast(
SERVER_CODEC_HANDLER,
new JHttpServerCodec(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, DEFAULT_MAX_CHUNK_SIZE * 8),
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add this handler also conditionally similar to the MULTIPART_FORMDATA_HANDLER


final case class FileData(name: String, contentType: String, content: ZStream[Blocking, Throwable, Byte])
extends FormDataContent
final case class AttributeData(name: String, content: ZStream[Blocking, Throwable, Byte]) extends FormDataContent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way building a stream without depending on Blocking?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couldn't find one because we're doing a ZStream.fromFile when decoding the multipart. But please let me know if we can provide the R earlier already.

val files = fd.files.map { case (name, file) => streamSize(file.content).map(size => s"$name: $size") }
ZIO.collectAllParN(3)(attributes ++ files).map(sizes => Response.text(sizes.mkString(",")))
case _ => UIO.succeed(Response.status(Status.BAD_REQUEST))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you create a separate example to showcase multipart usage.

private val headersKey: AttributeKey[HttpHeaders] = AttributeKey.valueOf("headers")
private val protocolVersionKey: AttributeKey[HttpVersion] = AttributeKey.valueOf("protocolVersion")

// TODO: nearly the samve as the one in `ServerRequestHandler`. Generalize and lift?
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it as a separate refactor PR and can have this merged first :)

@tusharmath tusharmath added the enhancement New feature or request label May 21, 2021
@tusharmath
Copy link
Collaborator

tusharmath commented May 21, 2021

Response.scala

sealed trait Response[-R, +E]

object Response {
  case class HttpResponse(status: Status, headers: List[Header], content: HttpData[R, E]) extends Response[R, E]
  case class SocketResponse[-R, +E](socket: SocketApp[R, E]) extends Response[R, E]

  // Responses with the prefix `ReadAs` is used to signal the server to change the channel handler.
  // Based on the response various channel handlers can be created.
  case class ReadAsComplete(cb: HttpData.CompleteData[] => HttpApp[R, E]) extends Response[R, E]
  case class ReadAsMultipart(cb: HttpData.MultipartFormData => HttpApp[R, E]) extends Response[R, E]
}

This is how we will respond to a multipart request.
MultipartExample.scala

def size(fd: HttpData.MultipartFormData): Int = ???

val app = Http.collect[Request] {
  case GET -> Root / "upload" => Response.AsMultipart { fd => 
    HttpApp.response(Response.HttpResponse(Status.OK, Nil, size(fd)))
  }
}

This is how we can process the responses
ServerRequestHandler.scala

def channelRead(ctx: ChannelHandlerContext, msg: HttpRequest) = {
  ctx.channel.config.setAutoRead(false) // Stop reading bytes for now.
  executeAsync(app(msg)) {
     case HttpResponse() => ??? /// continue as before
     case ReadAsMultipart() => 
       /// Remove the current handler
       /// Add the multipart handler
       ctx.channel.read() // Read the channel in chunks now.
     case ReadAsComplete() =>
       /// Add HttpObjectAggregator
      ctx.channel.read()
  }
}

We can further cleanup the MultipartExample as follows —

Request.scala

sealed trait Request[R, E]
object Request {
  case class AnyRequest(endpoint: Endpoint, headers: List[Header], data: HttpData[R, E]) extends Request[R, E]
  case class MultipartRequest(endpoint: Endpoint, headers: List[Header], fd: HttpData.MultipartFormData[R, E]) extends Request[R, E]
}
def size(fd: HttpData.MultipartFormData): Int = ???

def asMultipart = 
  Http.collect[Request] {
    case req @ GET -> Root / "upload" => Response.AsMultipart { fd => 
      MultipartRequest(req.status, req.headers, fd)
  }
}

val multipart = Http.collect[MultipartRequest] {
  case req @ GET -> Root / "upload" => Response.HttpResponse(Status.OK, Nil, size(req.fd))
}

val app = multipar.cmap(asMultipart)

@DJLemkes
Copy link
Author

Nice approach! Have been busy but I expect to be able to give this some time later this week.

@tusharmath
Copy link
Collaborator

We will support this via the content-decoder API. Thanks @DJLemkes !

@tusharmath tusharmath closed this Dec 16, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants