From 6601971b1d0252662475c443dd00219332138dd7 Mon Sep 17 00:00:00 2001 From: rusher Date: Thu, 10 Sep 2020 17:29:42 +0200 Subject: [PATCH] [CONJ-828] new option `ensureSocketState` When a query is issued, the connector ensures that any streaming result-set will be fully read from the socket before issuing a new command. This permits to ensure the socket state. Adding a new option `ensureSocketState` that when enable will ensure that the socket buffer is empty before issuing new command. (this doesn't concern pipelining commands). If data is present in socket. An error will be raised, throwing the content of socket data to permit identification of error. The goal of this option is mainly for debugging. This functionality add a few overheads so will be disabled by default --- .../io/input/DecompressPacketInputStream.java | 4 ++ .../internal/io/input/PacketInputStream.java | 3 ++ .../io/input/ReadAheadBufferedStream.java | 2 +- .../protocol/AbstractQueryProtocol.java | 36 ++++++++++++++++-- .../org/mariadb/jdbc/util/DefaultOptions.java | 7 +++- .../java/org/mariadb/jdbc/util/Options.java | 1 + .../java/org/mariadb/jdbc/StatementTest.java | 38 +++++++++++++++++++ 7 files changed, 85 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/mariadb/jdbc/internal/io/input/DecompressPacketInputStream.java b/src/main/java/org/mariadb/jdbc/internal/io/input/DecompressPacketInputStream.java index 9703b665d..88c85ee53 100644 --- a/src/main/java/org/mariadb/jdbc/internal/io/input/DecompressPacketInputStream.java +++ b/src/main/java/org/mariadb/jdbc/internal/io/input/DecompressPacketInputStream.java @@ -324,4 +324,8 @@ public void setServerThreadId(long serverThreadId, Boolean isMaster) { public void setTraceCache(LruTraceCache traceCache) { this.traceCache = traceCache; } + + public InputStream getInputStream() { + return inputStream; + } } diff --git a/src/main/java/org/mariadb/jdbc/internal/io/input/PacketInputStream.java b/src/main/java/org/mariadb/jdbc/internal/io/input/PacketInputStream.java index c34b04372..71b23570e 100644 --- a/src/main/java/org/mariadb/jdbc/internal/io/input/PacketInputStream.java +++ b/src/main/java/org/mariadb/jdbc/internal/io/input/PacketInputStream.java @@ -53,6 +53,7 @@ package org.mariadb.jdbc.internal.io.input; import java.io.IOException; +import java.io.InputStream; import org.mariadb.jdbc.internal.com.read.Buffer; import org.mariadb.jdbc.internal.io.LruTraceCache; @@ -71,4 +72,6 @@ public interface PacketInputStream { void setServerThreadId(long serverThreadId, Boolean isMaster); void setTraceCache(LruTraceCache traceCache); + + InputStream getInputStream(); } diff --git a/src/main/java/org/mariadb/jdbc/internal/io/input/ReadAheadBufferedStream.java b/src/main/java/org/mariadb/jdbc/internal/io/input/ReadAheadBufferedStream.java index 94bfe02cc..547f5d5cf 100644 --- a/src/main/java/org/mariadb/jdbc/internal/io/input/ReadAheadBufferedStream.java +++ b/src/main/java/org/mariadb/jdbc/internal/io/input/ReadAheadBufferedStream.java @@ -137,7 +137,7 @@ public synchronized long skip(long n) throws IOException { } public synchronized int available() throws IOException { - throw new IOException("available from socket not implemented"); + return end - pos + super.available(); } public synchronized void reset() throws IOException { diff --git a/src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java b/src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java index 3ab9f0ceb..0f84fd3fe 100644 --- a/src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java +++ b/src/main/java/org/mariadb/jdbc/internal/protocol/AbstractQueryProtocol.java @@ -55,10 +55,7 @@ import static org.mariadb.jdbc.internal.com.Packet.*; import static org.mariadb.jdbc.internal.util.SqlStates.*; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; +import java.io.*; import java.net.SocketException; import java.net.SocketTimeoutException; import java.net.URL; @@ -1940,6 +1937,37 @@ private void cmdPrologue() throws SQLException { throw exceptionFactory.create("Connection is closed", "08000", 1220); } interrupted = false; + if (this.options.ensureSocketState) { + // ensure that the socket buffer is empty before issuing new command. + // (this doesn't concern pipelining commands). + // If data is present in socket, an error will be raised, throwing the content of socket data to permit + // identification of error. + try { + int avail = this.reader.getInputStream().available(); + if (avail > 0) { + // unexpected data in socket buffer + + // reading socket buffer to add content to error. + byte[] data = new byte[Math.min(avail, 16000)]; + int remaining = avail; + int off = 0; + do { + int count = this.reader.getInputStream().read(data, off, remaining); + if (count < 0) { + break; + } + remaining -= count; + off += count; + } while (remaining > 0); + + throw exceptionFactory.create( + "Unexpected data in socket buffer:" + Utils.hexdump(data), "HY000"); + } + + } catch (IOException ioe) { + throw exceptionFactory.create("Unexpected socket error", "08000", ioe); + } + } } /** diff --git a/src/main/java/org/mariadb/jdbc/util/DefaultOptions.java b/src/main/java/org/mariadb/jdbc/util/DefaultOptions.java index c370e85ab..a74a039b6 100644 --- a/src/main/java/org/mariadb/jdbc/util/DefaultOptions.java +++ b/src/main/java/org/mariadb/jdbc/util/DefaultOptions.java @@ -685,8 +685,13 @@ public enum DefaultOptions { Boolean.TRUE, "2.6.0", "manage session_track_schema setting when server has CLIENT_SESSION_TRACK capability", + false), + ENSURE_SOCKET_STATE( + "ensureSocketState", + Boolean.FALSE, + "2.7.0", + "ensure socket state before issuing a new command", false); - private final String optionName; private final String description; private final boolean required; diff --git a/src/main/java/org/mariadb/jdbc/util/Options.java b/src/main/java/org/mariadb/jdbc/util/Options.java index 26b14e7ab..8e7980943 100644 --- a/src/main/java/org/mariadb/jdbc/util/Options.java +++ b/src/main/java/org/mariadb/jdbc/util/Options.java @@ -140,6 +140,7 @@ public class Options implements Cloneable { // MySQL sha authentication public String serverRsaPublicKeyFile; public boolean allowPublicKeyRetrieval; + public boolean ensureSocketState; @Override public String toString() { diff --git a/src/test/java/org/mariadb/jdbc/StatementTest.java b/src/test/java/org/mariadb/jdbc/StatementTest.java index c7ff57201..d956c2603 100644 --- a/src/test/java/org/mariadb/jdbc/StatementTest.java +++ b/src/test/java/org/mariadb/jdbc/StatementTest.java @@ -743,4 +743,42 @@ public void escaping() throws Exception { } } } + + @Test + public void ensureStreamingState() throws Exception { + Assume.assumeTrue(isMariadbServer() && minVersion(10, 1)); + createProcedure( + "ensureStreamingState", + "(INOUT p1 INT) BEGIN SELECT * from seq_1_to_3; SELECT * from " + "seq_5_to_7; END"); + Statement stmt = sharedConnection.createStatement(); + PreparedStatement prep = sharedConnection.prepareCall("CALL ensureStreamingState(?)"); + prep.setObject(1, 5); + prep.setFetchSize(1); + prep.execute(); + + ResultSet rs = stmt.executeQuery("SELECT 50"); + assertTrue(rs.next()); + assertEquals(50, rs.getInt(1)); + rs = prep.getResultSet(); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(3, rs.getInt(1)); + assertFalse(rs.next()); + + assertTrue(prep.getMoreResults()); + + rs = prep.getResultSet(); + assertTrue(rs.next()); + assertEquals(5, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(6, rs.getInt(1)); + assertTrue(rs.next()); + assertEquals(7, rs.getInt(1)); + assertFalse(rs.next()); + + assertFalse(prep.getMoreResults()); + } }