Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
base: fix several memory leaks in streamMarshaller
Browse files Browse the repository at this point in the history
  • Loading branch information
sirthias committed May 16, 2012
1 parent b263a93 commit a65dca6
Showing 1 changed file with 11 additions and 10 deletions.
Expand Up @@ -78,32 +78,33 @@ trait DefaultMarshallers extends MultipartMarshallers {
marshaller(selector) match {
case x: CantMarshal => x
case MarshalWith(converter) => MarshalWith { ctx => stream =>
refFactory.actorOf(Props(new ChunkingActor(ctx, stream, converter))) ! stream
refFactory.actorOf(Props(new ChunkingActor(ctx, converter))) ! stream
}
}
}

class ChunkingActor(ctx: MarshallingContext, stream: Stream[T],
converter: MarshallingContext => T => Unit) extends Actor {
class ChunkingActor(ctx: MarshallingContext, converter: MarshallingContext => T => Unit) extends Actor {
var chunkSender: Option[ChunkSender] = None
def receive = { case current #:: remaining =>
converter {
new MarshallingContext {
def startChunkedMessage(contentType: ContentType) = sys.error("Cannot marshal a stream of streams")
def handleError(error: Throwable) { ctx.handleError(error) }
def handleError(error: Throwable) {
ctx.handleError(error)
context.stop(self)
}
def marshalTo(content: HttpContent) {
chunkSender orElse {
chunkSender = Some(ctx.startChunkedMessage(content.contentType))
chunkSender
} foreach { sender =>
sender.sendChunk(MessageChunk(content.buffer)).onSuccess { case () =>
// we only send the next chunk when the previous has actually gone out
self ! {
sender.sendChunk(MessageChunk(content.buffer)).onComplete {
case _: Right[_, _] =>
if (remaining.isEmpty) {
sender.close()
PoisonPill
} else remaining
}
context.stop(self)
} else self ! remaining // we only send the next chunk when the previous has actually gone out
case _: Left[_, _] => context.stop(self)
}
}
}
Expand Down

0 comments on commit a65dca6

Please sign in to comment.