Skip to content

Commit

Permalink
Update deprecated akka calls (#6460)
Browse files Browse the repository at this point in the history
  • Loading branch information
wsargent authored and cchantep committed Aug 27, 2016
1 parent eb27481 commit 605d58c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ package play.it.http.websocket

import java.util.concurrent.atomic.AtomicBoolean

import akka.stream.FlowShape
import akka.stream.{ Attributes, FlowShape, Inlet, Outlet }
import akka.stream.scaladsl._
import akka.stream.stage.{ Context, PushStage }
import akka.stream.stage._
import akka.util.ByteString
import com.typesafe.netty.{ HandlerPublisher, HandlerSubscriber }
import io.netty.bootstrap.Bootstrap
Expand All @@ -24,13 +24,13 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.codec.http.websocketx._

import java.net.URI

import io.netty.util.ReferenceCountUtil
import play.api.http.websocket._
import play.it.http.websocket.WebSocketClient.ExtendedMessage

import scala.concurrent.{ Promise, Future }
import scala.concurrent.{ Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global

/**
Expand Down Expand Up @@ -58,6 +58,7 @@ object WebSocketClient {
def finalFragment: Boolean
}
object ExtendedMessage {
import scala.language.implicitConversions
implicit def messageToExtendedMessage(message: Message): ExtendedMessage =
SimpleMessage(message, finalFragment = true)
}
Expand Down Expand Up @@ -166,14 +167,25 @@ object WebSocketClient {
def webSocketProtocol(clientConnection: Flow[WebSocketFrame, WebSocketFrame, _]): Flow[ExtendedMessage, ExtendedMessage, _] = {
val clientInitiatedClose = new AtomicBoolean

val captureClientClose = Flow[WebSocketFrame].transform(() => new PushStage[WebSocketFrame, WebSocketFrame] {
def onPush(elem: WebSocketFrame, ctx: Context[WebSocketFrame]) = elem match {
case close: CloseWebSocketFrame =>
clientInitiatedClose.set(true)
ctx.push(close)
case other =>
ctx.push(other)
val captureClientClose = Flow[WebSocketFrame].via(new GraphStage[FlowShape[WebSocketFrame, WebSocketFrame]] {
val in = Inlet[WebSocketFrame]("WebSocketClose.in")
val out = Outlet[WebSocketFrame]("WebSocketClose.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
elem match {
case close: CloseWebSocketFrame =>
clientInitiatedClose.set(true)
push(out, close)
case other =>
push(out, other)
}
}

override def onPull(): Unit = pull(in)
}

override def shape: FlowShape[WebSocketFrame, WebSocketFrame] = FlowShape(in, out)
})

val messagesToFrames = Flow[ExtendedMessage].map {
Expand Down Expand Up @@ -215,21 +227,33 @@ object WebSocketClient {
}
}

val handleConnectionTerminated = Flow[WebSocketFrame].transform(() => new PushStage[WebSocketFrame, WebSocketFrame] {
def onPush(elem: WebSocketFrame, ctx: Context[WebSocketFrame]) = ctx.push(elem)
override def onUpstreamFinish(ctx: Context[WebSocketFrame]) = {
disconnected.trySuccess(())
super.onUpstreamFinish(ctx)
}
override def onUpstreamFailure(cause: Throwable, ctx: Context[WebSocketFrame]) = {
if (serverInitiatedClose.get()) {
// http://doc.akka.io/docs/akka/current/scala/stream/stream-customize.html#graphstage-scala
val handleConnectionTerminated = Flow[WebSocketFrame].via(new GraphStage[FlowShape[WebSocketFrame, WebSocketFrame]] {
val in = Inlet[WebSocketFrame]("WebSocketTerminated.in")
val out = Outlet[WebSocketFrame]("WebSocketTerminated.out")

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPush(): Unit = {
val elem = grab(in)
push(out, elem)
}
override def onUpstreamFinish(): Unit = {
disconnected.trySuccess(())
ctx.finish()
} else {
disconnected.tryFailure(cause)
ctx.fail(cause)
super.onUpstreamFinish()
}
override def onUpstreamFailure(cause: Throwable): Unit = {
if (serverInitiatedClose.get()) {
disconnected.trySuccess(())
complete(out)
} else {
disconnected.tryFailure(cause)
fail(out, cause)
}
}
override def onPull(): Unit = pull(in)
}

override def shape: FlowShape[WebSocketFrame, WebSocketFrame] = FlowShape(in, out)
})

/**
Expand Down
47 changes: 24 additions & 23 deletions framework/src/play/src/main/scala/play/api/mvc/RangeResult.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package play.api.mvc
import java.nio.charset.StandardCharsets

import akka.NotUsed
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ Attributes, FlowShape, SourceShape }
import akka.stream.scaladsl.{ FileIO, Flow, Source, StreamConverters }
import akka.stream.stage._
import akka.util.ByteString
Expand Down Expand Up @@ -326,7 +328,7 @@ object RangeResult {
* @param contentType The HTTP Content Type header for the response.
*/
def ofPath(path: java.nio.file.Path, rangeHeader: Option[String], fileName: String, contentType: Option[String]): Result = {
val source = FileIO.fromFile(path.toFile)
val source = FileIO.fromPath(path)
ofSource(path.toFile.length(), source, rangeHeader, Option(fileName), contentType)
}

Expand All @@ -350,7 +352,7 @@ object RangeResult {
* @param contentType The HTTP Content Type header for the response.
*/
def ofFile(file: java.io.File, rangeHeader: Option[String], fileName: String, contentType: Option[String]): Result = {
val source = FileIO.fromFile(file)
val source = FileIO.fromPath(file.toPath)
ofSource(file.length(), source, rangeHeader, Option(fileName), contentType)
}

Expand Down Expand Up @@ -413,38 +415,37 @@ object RangeResult {
}
}

// See https://github.com/akka/akka/blob/v2.4.2/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala#L83-L115
// See https://github.com/akka/akka/blob/release-2.4/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala#L76
private def sliceBytesTransformer(start: Long, length: Option[Long]): Flow[ByteString, ByteString, NotUsed] = {
val transformer = new StatefulStage[ByteString, ByteString] {
val transformer = new SimpleLinearGraphStage[ByteString] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
override def onPull() = pull(in)

def skipping = new State {
var toSkip = start

override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective =
if (element.length < toSkip) {
// Akka implementation takes Long rather than Option[Long] -- original StatefulStage impl
// used Int.MaxValue as the initial value, so carrying through here:
// https://github.com/playframework/playframework/blob/2.5.x/framework/src/play/src/main/scala/play/api/mvc/RangeResult.scala#L436
var remaining: Long = length.getOrElse(Int.MaxValue)

override def onPush(): Unit = {
val element = grab(in)
if (toSkip > 0 && element.length < toSkip) {
// keep skipping
toSkip -= element.length
ctx.pull()
pull(in)
} else {
become(taking(length))
// toSkip <= element.length <= Int.MaxValue
current.onPush(element.drop(toSkip.toInt), ctx)
val data = element.drop(toSkip.toInt).take(math.min(remaining, Int.MaxValue).toInt)
remaining -= data.size
push(out, data)
if (remaining <= 0) completeStage()
}
}

def taking(initiallyRemaining: Option[Long]) = new State {
var remaining: Long = initiallyRemaining.getOrElse(Int.MaxValue)

override def onPush(element: ByteString, ctx: Context[ByteString]): SyncDirective = {
val data = element.take(math.min(remaining, Int.MaxValue).toInt)
remaining -= data.size
if (remaining <= 0) ctx.pushAndFinish(data)
else ctx.push(data)
}
}

override def initial: State = if (start > 0) skipping else taking(length)
setHandlers(in, out, this)
}
}
Flow[ByteString].transform(() transformer).named("sliceBytes")
Flow[ByteString].via(transformer).named("sliceBytes")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object Multipart {
def handleFilePartAsTemporaryFile: FilePartHandler[TemporaryFile] = {
case FileInfo(partName, filename, contentType) =>
val tempFile = TemporaryFile("multipartBody", "asTemporaryFile")
Accumulator(FileIO.toFile(tempFile.file)).map { _ =>
Accumulator(FileIO.toPath(tempFile.file.toPath)).map { _ =>
FilePart(partName, filename, contentType, tempFile)
}
}
Expand Down

0 comments on commit 605d58c

Please sign in to comment.