Skip to content

Commit

Permalink
=htc abstract over HttpEntity.captureTermination and move it to Strea…
Browse files Browse the repository at this point in the history
…mUtils
  • Loading branch information
jrudolph committed Nov 30, 2017
1 parent a365fdc commit 402b71f
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 15 deletions.
Expand Up @@ -8,6 +8,7 @@ import akka.NotUsed
import akka.actor.Cancellable
import akka.annotation.InternalApi
import akka.event.Logging
import akka.http.scaladsl.model.HttpEntity
import akka.stream._
import akka.stream.impl.fusing.GraphInterpreter
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
Expand Down Expand Up @@ -280,6 +281,62 @@ private[http] object StreamUtils {
* fusing is neither supported nor necessary).
*/
def fuseAggressive[S <: Shape, M](g: Graph[S, M]): Graph[S, M] = fuser.aggressive(g)

/**
* INTERNAL API
*/
@InternalApi
object CaptureMaterializationAndTerminationOp extends EntityStreamOp[(Future[Unit], Future[Unit])] {
def strictM: (Future[Unit], Future[Unit]) = (Future.successful(()), Future.successful(()))
def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], (Future[Unit], Future[Unit])) = {
val materializationPromise = Promise[Unit]()
val (newSource, completion) =
StreamUtils.captureTermination(source.mapMaterializedValue { mat
materializationPromise.trySuccess(())
mat
})
(newSource, (materializationPromise.future, completion))
}
}

/**
* INTERNAL API
*/
@InternalApi
object CaptureTerminationOp extends EntityStreamOp[Future[Unit]] {
def strictM: Future[Unit] = Future.successful(())
def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], Future[Unit]) = StreamUtils.captureTermination(source)
}

/**
* INTERNAL API
*/
@InternalApi
private[http] trait EntityStreamOp[M] {
def strictM: M
def apply[T, Mat](source: Source[T, Mat]): (Source[T, Mat], M)
}

/**
* INTERNAL API
*/
@InternalApi
private[http] def transformEntityStream[T <: HttpEntity, M](entity: T, streamOp: EntityStreamOp[M]): (T, M) =
entity match {
case x: HttpEntity.Strict x.asInstanceOf[T] streamOp.strictM
case x: HttpEntity.Default
val (newData, whenCompleted) = streamOp(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
case x: HttpEntity.Chunked
val (newChunks, whenCompleted) = streamOp(x.chunks)
x.copy(chunks = newChunks).asInstanceOf[T] whenCompleted
case x: HttpEntity.CloseDelimited
val (newData, whenCompleted) = streamOp(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
case x: HttpEntity.IndefiniteLength
val (newData, whenCompleted) = streamOp(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
}
}

/**
Expand Down
Expand Up @@ -646,21 +646,7 @@ object HttpEntity {
*/
@InternalApi
private[http] def captureTermination[T <: HttpEntity](entity: T): (T, Future[Unit]) =
entity match {
case x: HttpEntity.Strict x.asInstanceOf[T] FastFuture.successful(())
case x: HttpEntity.Default
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
case x: HttpEntity.Chunked
val (newChunks, whenCompleted) = StreamUtils.captureTermination(x.chunks)
x.copy(chunks = newChunks).asInstanceOf[T] whenCompleted
case x: HttpEntity.CloseDelimited
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
case x: HttpEntity.IndefiniteLength
val (newData, whenCompleted) = StreamUtils.captureTermination(x.data)
x.copy(data = newData).asInstanceOf[T] whenCompleted
}
StreamUtils.transformEntityStream(entity, StreamUtils.CaptureTerminationOp)

/**
* Represents the currently being-drained HTTP Entity which triggers completion of the contained
Expand Down

0 comments on commit 402b71f

Please sign in to comment.