diff --git a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java index ce8a1abd..2fe57f53 100644 --- a/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java +++ b/src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java @@ -647,14 +647,19 @@ public Object call() throws Exception { }; } - private GreetingPacket receiveGreeting() throws IOException { - byte[] initialHandshakePacket = channel.read(); - if (initialHandshakePacket[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(initialHandshakePacket, 1, initialHandshakePacket.length); + private void checkError(byte[] packet) throws IOException { + if (packet[0] == (byte) 0xFF /* error */) { + byte[] bytes = Arrays.copyOfRange(packet, 1, packet.length); ErrorPacket errorPacket = new ErrorPacket(bytes); throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), - errorPacket.getSqlState()); + errorPacket.getSqlState()); } + } + + private GreetingPacket receiveGreeting() throws IOException { + byte[] initialHandshakePacket = channel.read(); + checkError(initialHandshakePacket); + return new GreetingPacket(initialHandshakePacket); } @@ -690,12 +695,7 @@ private boolean tryUpgradeToSSL(GreetingPacket greetingPacket) throws IOExceptio private void enableHeartbeat() throws IOException { channel.write(new QueryCommand("set @master_heartbeat_period=" + heartbeatInterval * 1000000)); byte[] statementResult = channel.read(); - if (statementResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); - ErrorPacket errorPacket = new ErrorPacket(bytes); - throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), - errorPacket.getSqlState()); - } + checkError(statementResult); } private void setMasterServerId() throws IOException { @@ -904,12 +904,7 @@ private ChecksumType fetchBinlogChecksum() throws IOException { private void confirmSupportOfChecksum(ChecksumType checksumType) throws IOException { channel.write(new QueryCommand("set @master_binlog_checksum= @@global.binlog_checksum")); byte[] statementResult = channel.read(); - if (statementResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); - ErrorPacket errorPacket = new ErrorPacket(bytes); - throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), - errorPacket.getSqlState()); - } + checkError(statementResult); eventDeserializer.setChecksumType(checksumType); } @@ -1051,16 +1046,13 @@ private void commitGtid() { } private ResultSetRowPacket[] readResultSet() throws IOException { - List resultSet = new LinkedList(); + List resultSet = new LinkedList<>(); byte[] statementResult = channel.read(); - if (statementResult[0] == (byte) 0xFF /* error */) { - byte[] bytes = Arrays.copyOfRange(statementResult, 1, statementResult.length); - ErrorPacket errorPacket = new ErrorPacket(bytes); - throw new ServerException(errorPacket.getErrorMessage(), errorPacket.getErrorCode(), - errorPacket.getSqlState()); - } + checkError(statementResult); + while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ } for (byte[] bytes; (bytes = channel.read())[0] != (byte) 0xFE /* eof */; ) { + checkError(bytes); resultSet.add(new ResultSetRowPacket(bytes)); } return resultSet.toArray(new ResultSetRowPacket[resultSet.size()]);