diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index 24016db1..c1729781 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -35,6 +35,7 @@ import com.github.shyiko.mysql.binlog.network.SocketFactory; import com.github.shyiko.mysql.binlog.network.protocol.ErrorPacket; import com.github.shyiko.mysql.binlog.network.protocol.GreetingPacket; +import com.github.shyiko.mysql.binlog.network.protocol.Packet; import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel; import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket; import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand; @@ -73,6 +74,9 @@ */ public class BinaryLogClient implements BinaryLogClientMXBean { + // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html + private static final int MAX_PACKET_LENGTH = 16777215; + private final Logger logger = Logger.getLogger(getClass().getName()); private final String hostname; @@ -592,7 +596,9 @@ private void listenForEventPackets() throws IOException { } Event event; try { - event = eventDeserializer.nextEvent(inputStream); + event = eventDeserializer.nextEvent(packetLength == MAX_PACKET_LENGTH ? + new ByteArrayInputStream(readPacketSplitInChunks(inputStream, packetLength - 1)) : + inputStream); } catch (Exception e) { Throwable cause = e instanceof EventDataDeserializationException ? e.getCause() : e; if (cause instanceof EOFException || cause instanceof SocketException) { @@ -628,6 +634,18 @@ private void listenForEventPackets() throws IOException { } } + private byte[] readPacketSplitInChunks(ByteArrayInputStream inputStream, int packetLength) throws IOException { + byte[] result = inputStream.read(packetLength); + int chunkLength; + do { + chunkLength = inputStream.readInteger(3); + inputStream.skip(1); // 1 byte for sequence + result = Arrays.copyOf(result, result.length + chunkLength); + inputStream.fill(result, result.length - chunkLength, chunkLength); + } while (chunkLength == Packet.MAX_LENGTH); + return result; + } + private void updateClientBinlogFilenameAndPosition(Event event) { EventHeader eventHeader = event.getHeader(); if (eventHeader.getEventType() == EventType.ROTATE) { diff --git a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java index c40186f9..65ce3127 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/io/ByteArrayInputStream.java @@ -84,15 +84,20 @@ public String readZeroTerminatedString() throws IOException { return new String(s.toByteArray()); } - /** - * Alias for read(result, 0, length). - */ public byte[] read(int length) throws IOException { byte[] bytes = new byte[length]; - read(bytes, 0, length); + fill(bytes, 0, length); return bytes; } + public void fill(byte[] bytes, int offset, int length) throws IOException { + int remaining = length; + while (remaining != 0) { + int read = read(bytes, offset + length - remaining, remaining); + remaining -= read; + } + } + public BitSet readBitSet(int length, boolean bigEndian) throws IOException { // according to MySQL internals the amount of storage required for N columns is INT((N+7)/8) bytes byte[] bytes = read((length + 7) >> 3); diff --git a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/Packet.java b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/Packet.java index c2c42ddd..5e0863b3 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/Packet.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/network/protocol/Packet.java @@ -20,4 +20,6 @@ */ public interface Packet { + // https://dev.mysql.com/doc/internals/en/sending-more-than-16mbyte.html + int MAX_LENGTH = 16777215; } diff --git a/supplement/codequality/checkstyle.xml b/supplement/codequality/checkstyle.xml index 69cda6f5..cfa43508 100644 --- a/supplement/codequality/checkstyle.xml +++ b/supplement/codequality/checkstyle.xml @@ -121,7 +121,7 @@ - +