From a65dca69064ab0cda9a2819730aa3e0fa18fd721 Mon Sep 17 00:00:00 2001 From: Mathias Date: Wed, 16 May 2012 14:49:53 +0200 Subject: [PATCH] base: fix several memory leaks in streamMarshaller --- .../typeconversion/DefaultMarshallers.scala | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/spray-base/src/main/scala/cc/spray/typeconversion/DefaultMarshallers.scala b/spray-base/src/main/scala/cc/spray/typeconversion/DefaultMarshallers.scala index 52af6e9890..78fff40676 100644 --- a/spray-base/src/main/scala/cc/spray/typeconversion/DefaultMarshallers.scala +++ b/spray-base/src/main/scala/cc/spray/typeconversion/DefaultMarshallers.scala @@ -78,32 +78,33 @@ trait DefaultMarshallers extends MultipartMarshallers { marshaller(selector) match { case x: CantMarshal => x case MarshalWith(converter) => MarshalWith { ctx => stream => - refFactory.actorOf(Props(new ChunkingActor(ctx, stream, converter))) ! stream + refFactory.actorOf(Props(new ChunkingActor(ctx, converter))) ! stream } } } - class ChunkingActor(ctx: MarshallingContext, stream: Stream[T], - converter: MarshallingContext => T => Unit) extends Actor { + class ChunkingActor(ctx: MarshallingContext, converter: MarshallingContext => T => Unit) extends Actor { var chunkSender: Option[ChunkSender] = None def receive = { case current #:: remaining => converter { new MarshallingContext { def startChunkedMessage(contentType: ContentType) = sys.error("Cannot marshal a stream of streams") - def handleError(error: Throwable) { ctx.handleError(error) } + def handleError(error: Throwable) { + ctx.handleError(error) + context.stop(self) + } def marshalTo(content: HttpContent) { chunkSender orElse { chunkSender = Some(ctx.startChunkedMessage(content.contentType)) chunkSender } foreach { sender => - sender.sendChunk(MessageChunk(content.buffer)).onSuccess { case () => - // we only send the next chunk when the previous has actually gone out - self ! { + sender.sendChunk(MessageChunk(content.buffer)).onComplete { + case _: Right[_, _] => if (remaining.isEmpty) { sender.close() - PoisonPill - } else remaining - } + context.stop(self) + } else self ! remaining // we only send the next chunk when the previous has actually gone out + case _: Left[_, _] => context.stop(self) } } }