From b1448bdc60a518d4c7dedf61addd89900fe19476 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Sat, 16 Feb 2013 09:20:51 -0500 Subject: [PATCH] INTEXT-45 Add Work Around for INT-2936 Need to check for closed stream with NIO. --- .../x/ip/websocket/WebSocketSerializer.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java index 809f405f..2acf77ce 100644 --- a/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java +++ b/spring-integration-ip-extensions/src/main/java/org/springframework/integration/x/ip/websocket/WebSocketSerializer.java @@ -28,6 +28,7 @@ import java.util.Set; import org.apache.commons.codec.binary.Base64; +import org.springframework.beans.DirectFieldAccessor; import org.springframework.core.serializer.Serializer; import org.springframework.integration.MessagingException; import org.springframework.integration.ip.tcp.serializer.SoftEndOfStreamException; @@ -202,6 +203,7 @@ private DataFrame doDeserialize(InputStream inputStream, DataFrame protoFrame) t while (!done ) { bite = inputStream.read() & 0xff; // logger.debug("Read:" + Integer.toHexString(bite)); + bite = checkclosed(bite, inputStream); if (bite < 0 && n == 0) { throw new SoftEndOfStreamException("Stream closed between payloads"); } @@ -416,6 +418,31 @@ else if (close) { return frame; } + /** + * TODO: workaround for INT-2936 + */ + private int checkclosed(int bite, InputStream inputStream) { + int theBite = bite; + if (theBite == 0xff) { // possibly a closed stream + String streamClass = inputStream.getClass().getName(); + if (streamClass.endsWith("TcpNioConnection$ChannelInputStream")) { + DirectFieldAccessor dfa = new DirectFieldAccessor(inputStream); + try { + if ((Boolean) dfa.getPropertyValue("isClosed") && + inputStream.available() == 0) { + theBite = -1; + } + } + catch (Exception e) { + if (logger.isDebugEnabled()) { + logger.debug("Failed to check closed", e); + } + } + } + } + return theBite; + } + private boolean validateUtf8IfNecessary(byte[] buffer, int offset, String data) { if (this.validateUtf8) { try {