Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support multipart body with streaming content #9852 #10309

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

smadarasmi
Copy link

@smadarasmi smadarasmi commented May 27, 2020

This PR is not finished. I'm hoping to get some early feedback that I am on the right track before I continue and also to get some questions answered. I will leave the questions in line.

Things left to do aside from the comments I left in this PR:

  1. Add to Scala and Java documentation for uploading files
  2. Integration test

Pull Request Checklist

Helpful things

Fixes

Fixes #xxxx

Purpose

What does this PR do?

Background Context

Why did you take this approach?

References

Are there any relevant issues / PRs / mailing lists discussions?

@lightbend-cla-validator

Hi @smadarasmi,

Thank you for your contribution! We really value the time you've taken to put this together.

Before we proceed with reviewing this pull request, please sign the Lightbend Contributors License Agreement:

http://www.lightbend.com/contribute/cla

@smadarasmi smadarasmi marked this pull request as draft May 27, 2020 14:09
@@ -183,6 +176,15 @@ object Multipart {
}
}

def handleFilePartAsStream: FilePartHandler[Source[ByteString, NotUsed]] = {
case FileInfo(partName, filename, contentType, dispositionType) =>
val sink = Sink.asPublisher[ByteString](fanout = true)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to make this fanout, otherwise my unit test was failing with

Right(MultipartFormData(HashMap(),Vector(FilePart(file1,file1.txt,Some(text/plain),Source(SourceShape(PublisherSource.out(1840397757))),-1,form-data), FilePart(file2,file2.txt,Some(text/plain),Source(SourceShape(PublisherSource.out(276528415))),-1,form-data)),Vector())) is Right but java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)
java.lang.Exception: Right(MultipartFormData(HashMap(),Vector(FilePart(file1,file1.txt,Some(text/plain),Source(SourceShape(PublisherSource.out(1840397757))),-1,form-data), FilePart(file2,file2.txt,Some(text/plain),Source(SourceShape(PublisherSource.out(276528415))),-1,form-data)),Vector())) is Right but java.lang.IllegalStateException: Sink.asPublisher(fanout = false) only supports one subscriber (which is allowed, see reactive-streams specification, rule 1.12)
	at org.specs2.matcher.MatchResultStackTrace.setStacktrace(Expectations.scala:57)
	at org.specs2.matcher.MatchResultStackTrace.setStacktrace$(Expectations.scala:55)
	at play.api.mvc.MultipartBodyParserSpec.setStacktrace(MultipartBodyParserSpec.scala:21)

I'm not sure where the other subscriber is coming from.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message Sink.asPublisher(fanout = false) only supports one subscriber is not correct, see: akka/akka#30908 In fact, if you say fanout = false, the subscription times out and the created source is cancelled.

val sink = Sink.asPublisher[ByteString](fanout = true)
.mapMaterializedValue(p => Future.successful(Source.fromPublisher(p)))
Accumulator(sink).mapFuture { b =>
Future.successful(FilePart(partName, filename, contentType, b, -1, dispositionType))
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still need to figure out how to determine the file size since this is streaming content. Or could we just leave it as -1?

@@ -132,7 +132,7 @@ object Dependencies {
slf4j ++
Seq("akka-actor", "akka-actor-typed", "akka-slf4j", "akka-serialization-jackson")
.map("com.typesafe.akka" %% _ % akkaVersion) ++
Seq("akka-testkit", "akka-actor-testkit-typed")
Seq("akka-testkit", "akka-actor-testkit-typed", "akka-stream-testkit")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add akka-stream-testkit for this test: https://github.com/playframework/playframework/pull/10309/files#diff-353e7858dc8605c961746609570dc2c9R65

When I constructed the test with a Source and running through my parser, which returns FilePartHandler[Source[ByteString, NotUsed]] and I collect and do assertion on that, I get an error similar to this:

Processor actor [Actor[akka://test-server/system/Materializers/StreamSupervisor-1/flow-0-1-fanoutPublisherSink#622867610]] terminated abruptly.

Any ideas what's causing this? I suspect this is an issue with the way my test was constructed.

@smadarasmi
Copy link
Author

smadarasmi commented May 27, 2020

@renatocaval Hey, this is regarding #9852. It's still a WIP and a lot of work left to do but I was hoping to get some help and see if I am on the right track.
Look forward to your reply :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants