From 402b71fd31150f3bf6c4a193e8c201c3174e6651 Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Thu, 30 Nov 2017 16:35:42 +0100 Subject: [PATCH] =htc abstract over HttpEntity.captureTermination and move it to StreamUtils --- .../akka/http/impl/util/StreamUtils.scala | 57 +++++++++++++++++++ .../akka/http/scaladsl/model/HttpEntity.scala | 16 +----- 2 files changed, 58 insertions(+), 15 deletions(-) diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index 9b6130cd4d2..952d38fbd70 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -8,6 +8,7 @@ import akka.NotUsed import akka.actor.Cancellable import akka.annotation.InternalApi import akka.event.Logging +import akka.http.scaladsl.model.HttpEntity import akka.stream._ import akka.stream.impl.fusing.GraphInterpreter import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage @@ -280,6 +281,62 @@ private[http] object StreamUtils { * fusing is neither supported nor necessary). */ def fuseAggressive[S <: Shape, M](g: Graph[S, M]): Graph[S, M] = fuser.aggressive(g) + + /** + * INTERNAL API + */ + @InternalApi + object CaptureMaterializationAndTerminationOp extends EntityStreamOp[(Future[Unit], Future[Unit])] { + def strictM: (Future[Unit], Future[Unit]) = (Future.successful(()), Future.successful(())) + def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], (Future[Unit], Future[Unit])) = { + val materializationPromise = Promise[Unit]() + val (newSource, completion) = + StreamUtils.captureTermination(source.mapMaterializedValue { mat ⇒ + materializationPromise.trySuccess(()) + mat + }) + (newSource, (materializationPromise.future, completion)) + } + } + + /** + * INTERNAL API + */ + @InternalApi + object CaptureTerminationOp extends EntityStreamOp[Future[Unit]] { + def strictM: Future[Unit] = Future.successful(()) + def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], Future[Unit]) = StreamUtils.captureTermination(source) + } + + /** + * INTERNAL API + */ + @InternalApi + private[http] trait EntityStreamOp[M] { + def strictM: M + def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], M) + } + + /** + * INTERNAL API + */ + @InternalApi + private[http] def transformEntityStream[T <: HttpEntity, M](entity: T, streamOp: EntityStreamOp[M]): (T, M) = + entity match { + case x: HttpEntity.Strict ⇒ x.asInstanceOf[T] → streamOp.strictM + case x: HttpEntity.Default ⇒ + val (newData, whenCompleted) = streamOp(x.data) + x.copy(data = newData).asInstanceOf[T] → whenCompleted + case x: HttpEntity.Chunked ⇒ + val (newChunks, whenCompleted) = streamOp(x.chunks) + x.copy(chunks = newChunks).asInstanceOf[T] → whenCompleted + case x: HttpEntity.CloseDelimited ⇒ + val (newData, whenCompleted) = streamOp(x.data) + x.copy(data = newData).asInstanceOf[T] → whenCompleted + case x: HttpEntity.IndefiniteLength ⇒ + val (newData, whenCompleted) = streamOp(x.data) + x.copy(data = newData).asInstanceOf[T] → whenCompleted + } } /** diff --git a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala index c6ddaf3ab8a..eb2dece66cc 100644 --- a/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala +++ b/akka-http-core/src/main/scala/akka/http/scaladsl/model/HttpEntity.scala @@ -646,21 +646,7 @@ object HttpEntity { */ @InternalApi private[http] def captureTermination[T <: HttpEntity](entity: T): (T, Future[Unit]) = - entity match { - case x: HttpEntity.Strict ⇒ x.asInstanceOf[T] → FastFuture.successful(()) - case x: HttpEntity.Default ⇒ - val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) - x.copy(data = newData).asInstanceOf[T] → whenCompleted - case x: HttpEntity.Chunked ⇒ - val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks) - x.copy(chunks = newChunks).asInstanceOf[T] → whenCompleted - case x: HttpEntity.CloseDelimited ⇒ - val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) - x.copy(data = newData).asInstanceOf[T] → whenCompleted - case x: HttpEntity.IndefiniteLength ⇒ - val (newData, whenCompleted) = StreamUtils.captureTermination(x.data) - x.copy(data = newData).asInstanceOf[T] → whenCompleted - } + StreamUtils.transformEntityStream(entity, StreamUtils.CaptureTerminationOp) /** * Represents the currently being-drained HTTP Entity which triggers completion of the contained