Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuation frame handling for WebSockets #2683

Merged

Conversation

Projects
None yet
3 participants
@jroper
Copy link
Member

commented Apr 17, 2014

Also implemented backpressure.

@@ -76,7 +76,7 @@ object WebSocketClient {

private class DefaultWebSocketClient extends WebSocketClient {
val bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor()))
Executors.newSingleThreadExecutor(), 1, 1))

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

Source of WebSocketSpec taking so long...

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Glad to have it fixed!

@@ -220,7 +220,9 @@ private[play] class PlayDefaultUpstreamHandler(server: Server, allChannels: Defa
val a = EssentialAction(_ => Done(result, Input.Empty))
handleAction(a, Some(app))
case Right(socket) =>
val enumerator = websocketHandshake(ctx, nettyHttpRequest, e)(ws.inFormatter)
val bufferLimit = app.configuration.getBytes("play.websocket.buffer.limit").getOrElse(65536L)

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

We now have a configurable buffer limit for WebSocket frames.


val binaryFrame = FrameFormatter[Array[Byte]](
bytes => new BinaryWebSocketFrame(true, 0, org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer(bytes)),
{ case frame: BinaryWebSocketFrame => frame.getBinaryData().array() })
{
case frame: BinaryWebSocketFrame => channelBufferToArray(frame.getBinaryData)

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

This used to get the entire array from the buffer, even if the buffer wasn't full...

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Nice work!

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Did that result in corrupted frames?

val factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req),
"*", /* wildcard to accept all subprotocols */
true /* allowExtensions */ )
true /* allowExtensions */ ,
bufferLimit

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

So previously this was Integer.MAX_INT.

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Hmm (again)

@@ -350,7 +352,7 @@ private[play] class PlayDefaultUpstreamHandler(server: Server, allChannels: Defa
case e @ EOF =>
if (channel.isOpen) {
Iteratee.flatten(for {
_ <- channel.write(new CloseWebSocketFrame()).toScala
_ <- channel.write(new CloseWebSocketFrame(1000, "")).toScala

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Can we use a constant for the status code?

For those reading along, from RFC6455:

   1000

      1000 indicates a normal closure, meaning that the purpose for
      which the connection was established has been fulfilled.
*/
private val MaxInFlight = 3

def newWebSocketInHandler[A](frameFormatter: play.api.mvc.WebSocket.FrameFormatter[A], bufferLimit: Long) = {

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

I know you've just added an argument, but would you mind adding return type too? I think this method maybe returns something like an (Enumerator[?], ChannelHandler) but I'm a bit confused. :)

@@ -73,23 +105,64 @@ private[server] trait WebSocketHandler {
(enumerator,
new SimpleChannelUpstreamHandler {

type FrameCreator = ChannelBuffer => WebSocketFrame

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

What about making FrameCreator a trait and having TextFrameCreator and BinaryFrameCreator classes rather than anonymous functions?

You could also consider giving the FrameCreator some of the responsibility for doing the buffering.

e.g.

abstract class FrameBuilder(bufferSize: Int) {
  protected val buffer: ChannelBuffer = ChannelBuffers.dynamicBuffer(bufferSize)
  def append(buffer: ChannelBuffer): Boolean = { ... } // return false or throw exception on overflow
  def createFrame: WebSocketFrame
}
class TextFrameBuilder(rsv: Int) {
  def createFrame = new TextWebSocketFrame(true, rsv, buffer)
}
class BinaryFrameBuilder(rsv: Int) {
  def createFrame = new BinaryWebSocketFrame(true, rsv, buffer)
}

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

On second thoughts, a "BufferBuilder" seems like it should be a separate thing, if we want to bother. But I like the idea of naming the FrameCreators.

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

It seems like overkill - I was just going to use a function, but I aliased the function type to make it a bit clearer.

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

OK that's fine


// fragmented text
case (frame: TextWebSocketFrame, None) if !frame.isFinalFragment && nettyFrameFormatter.fromFrame.isDefinedAt(frame) =>
val buffer = ChannelBuffers.dynamicBuffer(frame.getBinaryData.readableBytes() * 2)

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

What's the rationale behind allocating a buffer of this size?

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

Well, I know that I'm going to get at least one more frame, so no point in allocating a buffer the same size as the current one.

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member
  1. Oh right, this is a dynamic buffer. Perhaps you could put a min(… , bufferLimit) around the allocation size too.
  2. I think we should also fail early for non-final frames when frame.getBinaryData.readableBytes() > bufferLimit, so maybe you could put another guard case up at the top of the match expressions?
case (frame, _) if !frame.isFinalFragment && frame.getBinaryData.readableBytes() > bufferSize =>
  closeWebSocket(ctx, WebSocketMessageTooLong, "Fragmented message too long, configured limit is " + bufferLimit)

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

Netty already enforces that a single frame is not greater than the buffer limit (by the time you've buffered it, there is no value in doing any checks, you're already out of memory).

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

OK, that sounds good for point 2. (I edited my comment to make the 2 points clearer.)

For point 1, I think you could do this:

val buffer = ChannelBuffers.dynamicBuffer(min(frame.getBinaryData.readableBytes() * 2, bufferLimit))
@@ -132,12 +132,13 @@ object WebSocketClient {

override def channelDisconnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
disconnected.trySuccess(())
ctx.sendUpstream(e)
ctx.sendDownstream(e)

This comment has been minimized.

Copy link
@richdougherty

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

Yeah, this is just the test client, I'm not even sure what the right thing should be - this is just to ensure the right events bubble to the right places when things aren't working - but there's some demons in the code somewhere still and I gave up trying to find them.

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Fair enough!

buffer.writeBytes(frame.getBinaryData)
continuationBuffer = None
val finalFrame = creator(buffer)
enumerator.frameReceived(ctx, El(nettyFrameFormatter.fromFrame(finalFrame)))

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Do we need to check nettyFrameFormatter.fromFrame.isDefinedAt(finalFrame)?

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

I see, it should be OK to check the initial frame but the final frame, since isDefinedAt only checks the frame class and since the final class should the same as for the initial class.

BTW, even though it won't make a difference, I feel like it would be more correct to check the final combined frame rather than the initial frames, since the final frame is what we're planning on formatting.

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

No, we already checked that it was defined for the right type when we got the first frame. The partial function is completely controlled by us too - we don't expose this to users, and it only checks whether it's Text or Binary.

This comment has been minimized.

Copy link
@jroper

jroper Apr 17, 2014

Author Member

Perhaps it would be more correct, but it would be more practical to fail sooner rather than letting the client send a big frame only to find out after sending the whole thing that it was going to be rejected.

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

OK that's fine with me

promise.asInstanceOf[scala.concurrent.Promise[Iteratee[A, R]]].future
}

def frameReceived(ctx: ChannelHandlerContext, input: Input[A]) {

if (inFlight.incrementAndGet() > MaxInFlight) {
ctx.getChannel.setReadable(false)
}

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Should be >= MaxInFlight to ensure we max out at MaxInFlight? (barring races)

k => {
if (inFlight.decrementAndGet() < MaxInFlight) {
ctx.getChannel.setReadable(true)
}

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

Maybe a comment, e.g. // Consumed message, can resume reading

private def channelBufferToArray(buffer: ChannelBuffer) = {
val bytes = new Array[Byte](buffer.readableBytes())
buffer.readBytes(bytes)
bytes

This comment has been minimized.

Copy link
@richdougherty

richdougherty Apr 17, 2014

Member

What about saving some copies when the buffers are full?

if (buffer.readableBytes() == buffer.capacity()) {
  // Use entire backing array
  buffer.array()
} else {
  // Copy relevant bytes only
  val bytes = new Array[Byte](buffer.readableBytes())
  buffer.readBytes(bytes)
  bytes
}

This comment has been minimized.

Copy link
@hepin1989

hepin1989 Apr 17, 2014

Contributor

maybe hasArray() should be called,if not?then the channelbuffer is not direct channelbuffer?

@richdougherty

This comment has been minimized.

Copy link
Member

commented Apr 17, 2014

Just one last comment from me, then 👍 to merge.

@jroper jroper reopened this Apr 17, 2014

Continuation frame handling for WebSockets
Also implemented backpressure.

richdougherty added a commit that referenced this pull request Apr 18, 2014

Merge pull request #2683 from jroper/websocket-text-continuations
Continuation frame handling for WebSockets

@richdougherty richdougherty merged commit 5369d65 into playframework:master Apr 18, 2014

1 check passed

default Merged build finished.
Details

@jroper jroper added this to the 2.2.3 milestone May 1, 2014

@jroper jroper deleted the jroper:websocket-text-continuations branch Sep 23, 2014

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.