Skip to content
This repository has been archived by the owner on Mar 29, 2020. It is now read-only.

Keep strict entities strict #64

Merged
merged 1 commit into from Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4,7 +4,7 @@ import java.time.Duration
import java.util.concurrent.atomic.AtomicLong

import akka.NotUsed
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse}
import akka.stream.scaladsl.{BidiFlow, Flow, Keep}
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}
import akka.stream.{Attributes, BidiShape, Inlet, Outlet}
Expand Down Expand Up @@ -110,24 +110,30 @@ object ServerFlowWrapper {
responseWithContext.entity
} else {

val responseSizeCounter = new AtomicLong(0L)
requestSpan.mark("http.response.ready")

responseWithContext.entity.transformDataBytes(
Flow[ByteString]
.watchTermination()(Keep.right)
.wireTap(bs => responseSizeCounter.addAndGet(bs.size))
.mapMaterializedValue { f =>
f.andThen {
case Success(_) =>
requestHandler.responseSent(responseSizeCounter.get())
case Failure(e) =>
requestSpan.fail("Response entity stream failed", e)
requestHandler.responseSent(responseSizeCounter.get())

}(CallingThreadExecutionContext)
}
)
responseWithContext.entity match {
case strict@HttpEntity.Strict(_, bs) =>
requestHandler.responseSent(bs.size)
strict
case _ =>
val responseSizeCounter = new AtomicLong(0L)
responseWithContext.entity.transformDataBytes(
Flow[ByteString]
.watchTermination()(Keep.right)
.wireTap(bs => responseSizeCounter.addAndGet(bs.size))
.mapMaterializedValue { f =>
f.andThen {
case Success(_) =>
requestHandler.responseSent(responseSizeCounter.get())
case Failure(e) =>
requestSpan.fail("Response entity stream failed", e)
requestHandler.responseSent(responseSizeCounter.get())

}(CallingThreadExecutionContext)
}
)
}
}

_completedRequests += 1
Expand Down
@@ -0,0 +1,34 @@
package kamon.akka.http

import akka.actor.ActorSystem
import akka.http.scaladsl.model.{HttpEntity, HttpRequest, HttpResponse, StatusCodes}
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import kamon.instrumentation.akka.http.ServerFlowWrapper
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{Matchers, WordSpecLike}

class ServerFlowWrapperSpec extends WordSpecLike with Matchers with ScalaFutures {

implicit private val system = ActorSystem("http-client-instrumentation-spec")
implicit private val executor = system.dispatcher
implicit private val materializer = ActorMaterializer()

private val okReturningFlow = Flow[HttpRequest].map { _ =>
HttpResponse(status = StatusCodes.OK, entity = HttpEntity("OK"))
}

"the server flow wrapper" should {
"keep strict entities strict" in {
val flow = ServerFlowWrapper(okReturningFlow, "localhost", 8080)
val request = HttpRequest()
val response = Source.single(request)
.via(flow)
.runWith(Sink.head)
.futureValue
response.entity should matchPattern {
case HttpEntity.Strict(_, _) =>
}
}
}
}