-
Notifications
You must be signed in to change notification settings - Fork 38.7k
Description
Affects: 5.2.0.RELEASE and earlier
TLDR
Issue is related to interrogation with org.synchronoss.cloud.nio.multipart.NioMultipartParser
In particular to code around line
SynchronossPartHttpMessageReader.java#L173
Suggested fix could be
Replace
try {
parser.close();
}
With something like
try {
parser.close();
// ensure that parts-sink will be notified about data-stream completion
listener.onAllPartsFinished();
}
Attached tests code (maven projects)
Steps to reproduce - description
- Prepare valid multipart payload,
- Then truncate few bytes of that payload and post this truncated version (with matching to truncation) content-length to some multipart-accepting controller
Expected behavior
some exception raised/emitted (reporting about corrupted payload)
Actual behavior
Flux<Part>
passed to controller "never completes" (hanged forever). For some combinations of artifacts it is properly canceled when connection reset (termination) occurs, but for code snippet (attached) below - it looks like it doesn't (more likely this flux just "leaks" - on-finally does not executed on it)
Steps to reproduce - code (test case)
Simple single-file example could look like one below (it is also available in attached files)
Run with maven
mvn clean test
File reactor-multipart-issue-brief.zip/reactor-multipart-issue-brief/src/test/kotlin/issue/multipart/WebFluxMultipartTest.kt
package issue.multipart
//import org.junit.jupiter.api.Timeout
import com.fasterxml.jackson.databind.ObjectMapper
import io.netty.buffer.ByteBufAllocator
import io.netty.channel.ChannelOption
import issue.multipart.MainConfig.Companion.SERVER_PORT
import org.junit.jupiter.api.Test
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.io.buffer.DataBufferFactory
import org.springframework.core.io.buffer.NettyDataBufferFactory
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.Part
import org.springframework.http.server.reactive.HttpHandler
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
import org.springframework.web.reactive.config.EnableWebFlux
import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.adapter.WebHttpHandlerBuilder
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toMono
import reactor.netty.DisposableServer
import reactor.netty.http.server.HttpServer
import reactor.netty.resources.LoopResources
import reactor.netty.tcp.TcpServer
import java.net.InetSocketAddress
import java.net.URL
import java.nio.file.Files
import java.nio.file.Paths
import java.time.Duration
import kotlin.test.assertNotNull
@SpringJUnitConfig(MainConfig::class)
class WebFluxMultipartTest {
val rootUrl = "http://localhost:$SERVER_PORT"
val logger = LoggerFactory.getLogger(javaClass)
// this will use original (NOT-patched) version of `org.springframework.http.codec.multipart.SynchronossPartHttpMessageReader`
@Test
//@Timeout(15)
fun testFailNotPatchedPostFormDataWithJdkClient() {
val res = runCatching { testPostFormDataWithJdkClient(corruptMultipartData = true) }
logger.info(res.toString(), res.exceptionOrNull())
assertNotNull(res.exceptionOrNull())
}
fun testPostFormDataWithJdkClient(corruptMultipartData: Boolean) {
val url = URL("$rootUrl/io-target/post-form-data")
val con = url.openConnection()
con.doOutput = true
con.addRequestProperty("Content-Type", "multipart/form-data;boundary=NbjrKgjbsaMLdnMxMfDpD6myWomYc0qNX0w;charset=UTF-8")
val body = Files.readAllBytes(Paths.get(javaClass.getResource("post-form-data-valid").toURI()))
con.getOutputStream().use { out ->
if (corruptMultipartData) {
out.write(body, 0, body.size - 8)
} else {
out.write(body)
}
}
logger.info("bytes[0..4]:" + con.getInputStream().readNBytes(4)!!.run { "${contentToString()}:${String(this)}" })
logger.info("bytes[4...]:" + String(con.getInputStream().readAllBytes()))
}
}
@RestController
@RequestMapping("/io-target")
class IoTargetController {
val logger = LoggerFactory.getLogger(javaClass)
@PostMapping("/post-form-data")
fun postFormData(
@RequestBody parts: Flux<Part>,
serverExchange: ServerWebExchange
): Mono<ResponseEntity<Flux<DataBuffer>>> = run {
logger.info("postFormData: request.headers.content-length: ${serverExchange.request.headers["content-length"]}")
ResponseEntity.status(HttpStatus.OK).body(
Mono.defer {
"raw:data:1234567".toDataBuffer().toMono().delayElement(Duration.ofSeconds(1)).log("respond-0")
}.concatWith(
"r01:data:1234567".toDataBuffer().toMono().delayElement(Duration.ofSeconds(1)).log("respond-1")
).concatWith(
parts.log("parts").flatMap {
it.content().log("part-content").then().thenReturn("ok")
}.then(
"r02:data:1234567".toDataBuffer().toMono().delayElement(Duration.ofSeconds(1)).log("respond-2")
)
).concatWith(
"r03:data:1234567".toDataBuffer().toMono().delayElement(Duration.ofSeconds(1)).log("respond-3")
).log("response-body")
)
.toMono().delayElement(Duration.ofSeconds(1)).doFinally { signal ->
logger.info("${::postFormData.name}: doFinally: $signal")
}
}
}
@Configuration
@EnableWebFlux
@Import(IoTargetController::class)
open class MainConfig {
companion object {
const val SERVER_PORT = 9011
}
@Bean
open fun objectMapper(): ObjectMapper = ObjectMapper()
@Bean
open fun httpHandler(applicationContext: ApplicationContext): HttpHandler =
WebHttpHandlerBuilder.applicationContext(applicationContext).build()
@Bean
open fun loopResources(): LoopResources =
LoopResources.create("issue.multipart-LoopResources", 16, false)
@Bean(destroyMethod = "dispose")
open fun httpServer(
loopResources: LoopResources,
httpHandler: HttpHandler
): DisposableServer = run {
val tcpServer = TcpServer.create()
//.wiretap(true)
.option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT)
.runOn(loopResources)
.addressSupplier { InetSocketAddress(SERVER_PORT) }
HttpServer.from(tcpServer)
.handle(ReactorHttpHandlerAdapter(httpHandler))
.bindNow()
}
}
val byteBufAllocator: ByteBufAllocator = ByteBufAllocator.DEFAULT
val bufferFactory: DataBufferFactory = NettyDataBufferFactory(byteBufAllocator)
fun String.toDataBuffer(): DataBuffer = this.toByteArray().let { bytes ->
bufferFactory.allocateBuffer(bytes.size).apply { write(bytes) }
}
Results of this test execution are following:
2019-10-08 21:35:12.505 INFO org.springframework.test.context.support.DefaultTestContextBootstrapper Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener, org.springframework.test.context.event.EventPublishingTestExecutionListener]
2019-10-08 21:35:12.515 INFO org.springframework.test.context.support.DefaultTestContextBootstrapper Using TestExecutionListeners: [org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@5db4c359, org.springframework.test.context.support.DependencyInjectionTestExecutionListener@209775a9, org.springframework.test.context.support.DirtiesContextTestExecutionListener@18e7143f, org.springframework.test.context.event.EventPublishingTestExecutionListener@f9b7332]
2019-10-08 21:35:14.083 INFO issue.multipart.IoTargetController postFormData: request.headers.content-length: [257]
2019-10-08 21:35:15.118 INFO response-body onSubscribe(FluxConcatArray.ConcatArraySubscriber)
2019-10-08 21:35:15.119 INFO response-body request(1)
2019-10-08 21:35:15.126 INFO respond-0 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:35:15.126 INFO respond-0 request(1)
2019-10-08 21:35:15.127 INFO issue.multipart.IoTargetController postFormData: doFinally: onComplete
2019-10-08 21:35:16.128 INFO respond-0 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:35:16.128 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:35:16.134 INFO respond-0 onComplete()
2019-10-08 21:35:16.134 INFO respond-1 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:35:16.142 INFO response-body request(127)
2019-10-08 21:35:16.142 INFO respond-1 request(127)
2019-10-08 21:35:16.144 INFO issue.multipart.WebFluxMultipartTest bytes[0..4]:[114, 97, 119, 58]:raw:
2019-10-08 21:35:18.014 INFO respond-1 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:35:18.015 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:35:18.018 INFO respond-1 onComplete()
2019-10-08 21:35:18.023 INFO parts onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-10-08 21:35:18.023 INFO parts request(256)
... here it hangs "forever" ...
In case when not-corrupted multipart payload is used, execution become valid, and looks like
2019-10-08 21:44:54.316 INFO org.springframework.test.context.support.DefaultTestContextBootstrapper Loaded default TestExecutionListener class names from location [META-INF/spring.factories]: [org.springframework.test.context.web.ServletTestExecutionListener, org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener, org.springframework.test.context.support.DependencyInjectionTestExecutionListener, org.springframework.test.context.support.DirtiesContextTestExecutionListener, org.springframework.test.context.transaction.TransactionalTestExecutionListener, org.springframework.test.context.jdbc.SqlScriptsTestExecutionListener, org.springframework.test.context.event.EventPublishingTestExecutionListener]
2019-10-08 21:44:54.327 INFO org.springframework.test.context.support.DefaultTestContextBootstrapper Using TestExecutionListeners: [org.springframework.test.context.support.DirtiesContextBeforeModesTestExecutionListener@3d08f3f5, org.springframework.test.context.support.DependencyInjectionTestExecutionListener@119f1f2a, org.springframework.test.context.support.DirtiesContextTestExecutionListener@1a1da881, org.springframework.test.context.event.EventPublishingTestExecutionListener@5b970f7]
2019-10-08 21:44:55.978 INFO issue.multipart.IoTargetController postFormData: request.headers.content-length: [265]
2019-10-08 21:44:57.021 INFO response-body onSubscribe(FluxConcatArray.ConcatArraySubscriber)
2019-10-08 21:44:57.022 INFO response-body request(1)
2019-10-08 21:44:57.029 INFO respond-0 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:44:57.029 INFO respond-0 request(1)
2019-10-08 21:44:57.030 INFO issue.multipart.IoTargetController postFormData: doFinally: onComplete
2019-10-08 21:44:58.031 INFO respond-0 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:44:58.031 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:44:58.037 INFO respond-0 onComplete()
2019-10-08 21:44:58.038 INFO respond-1 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:44:58.044 INFO response-body request(127)
2019-10-08 21:44:58.045 INFO respond-1 request(127)
2019-10-08 21:44:58.047 INFO issue.multipart.WebFluxMultipartTest bytes[0..4]:[114, 97, 119, 58]:raw:
2019-10-08 21:45:22.997 INFO respond-1 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:22.998 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:22.998 INFO respond-1 onComplete()
2019-10-08 21:45:23.003 INFO parts onSubscribe(FluxSwitchIfEmpty.SwitchIfEmptySubscriber)
2019-10-08 21:45:23.003 INFO parts request(256)
2019-10-08 21:45:23.029 INFO parts onNext(Part 'part-00-name=post-payload-text-23456789ABCDEF:post-payload-0001-3456789ABCDEF:post-payload-0002-3456789ABCDEF:post-payload-0003-3456789ABCDEF')
2019-10-08 21:45:23.033 INFO part-content | onSubscribe([Fuseable] FluxJust.WeakScalarSubscription)
2019-10-08 21:45:23.033 INFO part-content | request(unbounded)
2019-10-08 21:45:23.033 INFO part-content | onNext(DefaultDataBuffer (r: 0, w: 128, c: 128))
2019-10-08 21:45:23.033 INFO part-content | onComplete()
2019-10-08 21:45:23.034 INFO parts request(1)
2019-10-08 21:45:23.034 INFO parts onComplete()
2019-10-08 21:45:23.034 INFO respond-2 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:45:23.034 INFO respond-2 request(unbounded)
2019-10-08 21:45:24.034 INFO respond-2 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:24.034 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:24.034 INFO respond-3 onSubscribe([Fuseable] MonoDelayElement.DelayElementSubscriber)
2019-10-08 21:45:24.034 INFO respond-3 request(125)
2019-10-08 21:45:24.035 INFO respond-2 onComplete()
2019-10-08 21:45:25.036 INFO respond-3 onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:25.036 INFO response-body onNext(PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16))
2019-10-08 21:45:25.036 INFO respond-3 onComplete()
2019-10-08 21:45:25.036 INFO response-body onComplete()
2019-10-08 21:45:25.039 INFO issue.multipart.WebFluxMultipartTest bytes[4...]:data:1234567r01:data:1234567r02:data:1234567r03:data:1234567
2019-10-08 21:45:25.040 INFO issue.multipart.WebFluxMultipartTest Success(kotlin.Unit)
... here it completes normally ...