Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement dynamic resizing of StreamingForm.Buffer #2882

Merged
merged 1 commit into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions zio-http/jvm/src/test/scala/zio/http/FormSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,33 @@ object FormSpec extends ZIOHttpSpec {
collected.get("file").get.asInstanceOf[FormField.Binary].data == bytes,
)
},
test("StreamingForm dynamically resizes") {
val N = 1000
val expected = Chunk.fromArray(Array.fill(N)(scala.util.Random.nextInt()).map(_.toByte))
val form =
Form(
Chunk(
FormField.binaryField(
name = "identifier",
data = Chunk(10.toByte),
mediaType = MediaType.application.`octet-stream`,
),
FormField.StreamingBinary(
name = "blob",
data = ZStream.fromChunk(expected),
contentType = MediaType.application.`octet-stream`,
),
),
)
val boundary = Boundary("X-INSOMNIA-BOUNDARY")
for {
formBytes <- form.multipartBytes(boundary).runCollect
formByteStream = ZStream.fromChunk(formBytes)
streamingForm = StreamingForm(formByteStream, boundary, 16)
out <- streamingForm.collectAll
res = out.get("blob").get.asInstanceOf[FormField.Binary].data
} yield assertTrue(res == expected)
} @@ timeout(3.seconds),
test("decoding random form") {
check(Gen.chunkOfBounded(2, 8)(formField)) { fields =>
for {
Expand Down
23 changes: 21 additions & 2 deletions zio-http/shared/src/main/scala/zio/http/StreamingForm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package zio.http

import java.nio.charset.Charset

import scala.annotation.tailrec

import zio._
import zio.stacktracer.TracingImplicits.disableAutoTrace

Expand Down Expand Up @@ -172,14 +174,31 @@ object StreamingForm {
new State(FormState.fromBoundary(boundary), None, _inNonStreamingPart = false)
}

private final class Buffer(bufferSize: Int) {
private val buffer: Array[Byte] = new Array[Byte](bufferSize)
private final class Buffer(initialSize: Int) {
private var buffer: Array[Byte] = new Array[Byte](initialSize)
private var length: Int = 0

private def ensureHasCapacity(requiredCapacity: Int): Unit = {
@tailrec
def calculateNewCapacity(existing: Int, required: Int): Int = {
val newCap = existing * 2
if (newCap < required) calculateNewCapacity(newCap, required)
else newCap
}

val l = buffer.length
if (l <= requiredCapacity) {
val newArray = Array.ofDim[Byte](calculateNewCapacity(l, requiredCapacity))
java.lang.System.arraycopy(buffer, 0, newArray, 0, l)
buffer = newArray
} else ()
}

def addByte(
crlfBoundary: Chunk[Byte],
byte: Byte,
): Chunk[Take[Nothing, Byte]] = {
ensureHasCapacity(length + crlfBoundary.length)
buffer(length) = byte
if (length < (crlfBoundary.length - 1)) {
// Not enough bytes to check if we have the boundary
Expand Down
Loading