Skip to content

Commit

Permalink
Remove ContinuationWebSocketFrame.aggregatedText()
Browse files Browse the repository at this point in the history
Motivation:
Before we aggregated the full text in the WebSocket08FrameDecoder just to fill in the ContinuationWebSocketFrame.aggregatedText(). The problem was that there was no upper-limit and so it would be possible to see an OOME if the remote peer sends a TextWebSocketFrame + a never ending stream of ContinuationWebSocketFrames. Furthermore the aggregation does not really belong in the WebSocket08FrameDecoder, as we provide an extra ChannelHandler for this anyway (WebSocketFrameAggregator).

Modification:
Remove the ContinuationWebSocketFrame.aggregatedText() method and corresponding constructor. Also refactored WebSocket08FrameDecoder a bit to me more efficient which is now possible as we not need to aggregate here.

Result:
No more risk of OOME because of frames.
  • Loading branch information
Norman Maurer committed Apr 30, 2014
1 parent 76355a2 commit 787a85f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 127 deletions.
Expand Up @@ -25,13 +25,11 @@
*/
public class ContinuationWebSocketFrame extends WebSocketFrame {

private String aggregatedText;

/**
* Creates a new empty continuation frame.
*/
public ContinuationWebSocketFrame() {
super(Unpooled.buffer(0));
this(Unpooled.buffer(0));
}

/**
Expand All @@ -58,25 +56,6 @@ public ContinuationWebSocketFrame(boolean finalFragment, int rsv, ByteBuf binary
super(finalFragment, rsv, binaryData);
}

/**
* Creates a new continuation frame with the specified binary data
*
* @param finalFragment
* flag indicating if this frame is the final fragment
* @param rsv
* reserved bits used for protocol extensions
* @param binaryData
* the content of the frame.
* @param aggregatedText
* Aggregated text set by decoder on the final continuation frame of a fragmented
* text message
*/
public ContinuationWebSocketFrame(
boolean finalFragment, int rsv, ByteBuf binaryData, String aggregatedText) {
super(finalFragment, rsv, binaryData);
this.aggregatedText = aggregatedText;
}

/**
* Creates a new continuation frame with the specified text data
*
Expand All @@ -88,7 +67,7 @@ public ContinuationWebSocketFrame(
* text content of the frame.
*/
public ContinuationWebSocketFrame(boolean finalFragment, int rsv, String text) {
this(finalFragment, rsv, fromText(text), null);
this(finalFragment, rsv, fromText(text));
}

/**
Expand All @@ -112,21 +91,14 @@ private static ByteBuf fromText(String text) {
}
}

/**
* Aggregated text returned by decoder on the final continuation frame of a fragmented text message
*/
public String aggregatedText() {
return aggregatedText;
}

@Override
public ContinuationWebSocketFrame copy() {
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content().copy(), aggregatedText());
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content().copy());
}

@Override
public ContinuationWebSocketFrame duplicate() {
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content().duplicate(), aggregatedText());
return new ContinuationWebSocketFrame(isFinalFragment(), rsv(), content().duplicate());
}

@Override
Expand Down

This file was deleted.

Expand Up @@ -36,11 +36,13 @@
package io.netty.handler.codec.http.websocketx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.handler.codec.CorruptedFrameException;

/**
* Checks UTF8 bytes for validity before converting it into a string
* Checks UTF8 bytes for validity
*/
final class UTF8Output {
final class Utf8Validator implements ByteBufProcessor {
private static final int UTF8_ACCEPT = 0;
private static final int UTF8_REJECT = 12;

Expand All @@ -65,45 +67,38 @@ final class UTF8Output {
@SuppressWarnings("RedundantFieldInitialization")
private int state = UTF8_ACCEPT;
private int codep;
private boolean checking;

private final StringBuilder stringBuilder;

UTF8Output(ByteBuf buffer) {
stringBuilder = new StringBuilder(buffer.readableBytes());
write(buffer);
public void check(ByteBuf buffer) {
checking = true;
buffer.forEachByte(this);
}

public void write(ByteBuf buffer) {
for (int i = buffer.readerIndex(); i < buffer.writerIndex(); i++) {
write(buffer.getByte(i));
}
}

public void write(byte[] bytes) {
for (byte b : bytes) {
write(b);
public void finish() {
checking = false;
codep = 0;
if (state != UTF8_ACCEPT) {
state = UTF8_ACCEPT;
throw new CorruptedFrameException("bytes are not UTF-8");
}
}

public void write(int b) {
@Override
public boolean process(byte b) throws Exception {
byte type = TYPES[b & 0xFF];

codep = state != UTF8_ACCEPT ? b & 0x3f | codep << 6 : 0xff >> type & b;

state = STATES[state + type];

if (state == UTF8_ACCEPT) {
stringBuilder.append((char) codep);
} else if (state == UTF8_REJECT) {
throw new UTF8Exception("bytes are not UTF-8");
if (state == UTF8_REJECT) {
checking = false;
throw new CorruptedFrameException("bytes are not UTF-8");
}
return true;
}

@Override
public String toString() {
if (state != UTF8_ACCEPT) {
throw new UTF8Exception("bytes are not UTF-8");
}
return stringBuilder.toString();
public boolean isChecking() {
return checking;
}
}
Expand Up @@ -81,9 +81,7 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
private static final byte OPCODE_PING = 0x9;
private static final byte OPCODE_PONG = 0xA;

private UTF8Output fragmentedFramesText;
private int fragmentedFramesCount;

private final long maxFramePayloadLength;
private boolean frameFinalFlag;
private int frameRsv;
Expand All @@ -93,10 +91,10 @@ public class WebSocket08FrameDecoder extends ReplayingDecoder<WebSocket08FrameDe
private int framePayloadBytesRead;
private byte[] maskingKey;
private ByteBuf payloadBuffer;

private final boolean allowExtensions;
private final boolean maskedPayload;
private boolean receivedClosingHandshake;
private Utf8Validator utf8Validator;

enum State {
FRAME_START, MASKING_KEY, PAYLOAD, CORRUPT
Expand Down Expand Up @@ -325,37 +323,34 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t

// Processing for possible fragmented messages for text and binary
// frames
String aggregatedText = null;
if (frameFinalFlag) {
// Final frame of the sequence. Apparently ping frames are
// allowed in the middle of a fragmented message
if (frameOpcode != OPCODE_PING) {
fragmentedFramesCount = 0;

// Check text for UTF8 correctness
if (frameOpcode == OPCODE_TEXT || fragmentedFramesText != null) {
if (frameOpcode == OPCODE_TEXT ||
(utf8Validator != null && utf8Validator.isChecking())) {
// Check UTF-8 correctness for this payload
checkUTF8String(ctx, framePayload);

// This does a second check to make sure UTF-8
// correctness for entire text message
aggregatedText = fragmentedFramesText.toString();

fragmentedFramesText = null;
utf8Validator.finish();
}
}
} else {
// Not final frame so we can expect more frames in the
// fragmented sequence
if (fragmentedFramesCount == 0) {
// First text or binary frame for a fragmented set
fragmentedFramesText = null;
if (frameOpcode == OPCODE_TEXT) {
checkUTF8String(ctx, framePayload);
}
} else {
// Subsequent frames - only check if init frame is text
if (fragmentedFramesText != null) {
if (utf8Validator != null && utf8Validator.isChecking()) {
checkUTF8String(ctx, framePayload);
}
}
Expand All @@ -374,7 +369,7 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) t
framePayload = null;
return;
} else if (frameOpcode == OPCODE_CONT) {
out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload, aggregatedText));
out.add(new ContinuationWebSocketFrame(frameFinalFlag, frameRsv, framePayload));
framePayload = null;
return;
} else {
Expand Down Expand Up @@ -413,11 +408,15 @@ private void unmask(ByteBuf frame) {
}

private void protocolViolation(ChannelHandlerContext ctx, String reason) {
protocolViolation(ctx, new CorruptedFrameException(reason));
}

private void protocolViolation(ChannelHandlerContext ctx, CorruptedFrameException ex) {
checkpoint(State.CORRUPT);
if (ctx.channel().isActive()) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
throw new CorruptedFrameException(reason);
throw ex;
}

private static int toFrameLength(long l) {
Expand All @@ -430,13 +429,12 @@ private static int toFrameLength(long l) {

private void checkUTF8String(ChannelHandlerContext ctx, ByteBuf buffer) {
try {
if (fragmentedFramesText == null) {
fragmentedFramesText = new UTF8Output(buffer);
} else {
fragmentedFramesText.write(buffer);
if (utf8Validator == null) {
utf8Validator = new Utf8Validator();
}
} catch (UTF8Exception ex) {
protocolViolation(ctx, "invalid UTF-8 bytes");
utf8Validator.check(buffer);
} catch (CorruptedFrameException ex) {
protocolViolation(ctx, ex);
}
}

Expand Down Expand Up @@ -464,9 +462,9 @@ protected void checkCloseFrameBody(
// May have UTF-8 message
if (buffer.isReadable()) {
try {
new UTF8Output(buffer);
} catch (UTF8Exception ex) {
protocolViolation(ctx, "Invalid close frame reason text. Invalid UTF-8 bytes");
new Utf8Validator().check(buffer);
} catch (CorruptedFrameException ex) {
protocolViolation(ctx, ex);
}
}

Expand Down

0 comments on commit 787a85f

Please sign in to comment.