Skip to content

Commit

Permalink
[CONJ-828] new option ensureSocketState
Browse files Browse the repository at this point in the history
When a query is issued, the connector ensures that any streaming result-set will be fully read from the socket before issuing a new command.
This permits to ensure the socket state.

Adding a new option `ensureSocketState` that when enable will ensure that the socket buffer is empty before issuing new command. (this doesn't concern pipelining commands). If data is present in socket. An error will be raised, throwing the content of socket data to permit identification of error. The goal of this option is mainly for debugging.

This functionality add a few overheads so will be disabled by default
  • Loading branch information
rusher committed Sep 10, 2020
1 parent 6f02333 commit 6601971
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 6 deletions.
Expand Up @@ -324,4 +324,8 @@ public void setServerThreadId(long serverThreadId, Boolean isMaster) {
public void setTraceCache(LruTraceCache traceCache) {
this.traceCache = traceCache;
}

public InputStream getInputStream() {
return inputStream;
}
}
Expand Up @@ -53,6 +53,7 @@
package org.mariadb.jdbc.internal.io.input;

import java.io.IOException;
import java.io.InputStream;
import org.mariadb.jdbc.internal.com.read.Buffer;
import org.mariadb.jdbc.internal.io.LruTraceCache;

Expand All @@ -71,4 +72,6 @@ public interface PacketInputStream {
void setServerThreadId(long serverThreadId, Boolean isMaster);

void setTraceCache(LruTraceCache traceCache);

InputStream getInputStream();
}
Expand Up @@ -137,7 +137,7 @@ public synchronized long skip(long n) throws IOException {
}

public synchronized int available() throws IOException {
throw new IOException("available from socket not implemented");
return end - pos + super.available();
}

public synchronized void reset() throws IOException {
Expand Down
Expand Up @@ -55,10 +55,7 @@
import static org.mariadb.jdbc.internal.com.Packet.*;
import static org.mariadb.jdbc.internal.util.SqlStates.*;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.*;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URL;
Expand Down Expand Up @@ -1940,6 +1937,37 @@ private void cmdPrologue() throws SQLException {
throw exceptionFactory.create("Connection is closed", "08000", 1220);
}
interrupted = false;
if (this.options.ensureSocketState) {
// ensure that the socket buffer is empty before issuing new command.
// (this doesn't concern pipelining commands).
// If data is present in socket, an error will be raised, throwing the content of socket data to permit
// identification of error.
try {
int avail = this.reader.getInputStream().available();
if (avail > 0) {
// unexpected data in socket buffer

// reading socket buffer to add content to error.
byte[] data = new byte[Math.min(avail, 16000)];
int remaining = avail;
int off = 0;
do {
int count = this.reader.getInputStream().read(data, off, remaining);
if (count < 0) {
break;
}
remaining -= count;
off += count;
} while (remaining > 0);

throw exceptionFactory.create(
"Unexpected data in socket buffer:" + Utils.hexdump(data), "HY000");
}

} catch (IOException ioe) {
throw exceptionFactory.create("Unexpected socket error", "08000", ioe);
}
}
}

/**
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/mariadb/jdbc/util/DefaultOptions.java
Expand Up @@ -685,8 +685,13 @@ public enum DefaultOptions {
Boolean.TRUE,
"2.6.0",
"manage session_track_schema setting when server has CLIENT_SESSION_TRACK capability",
false),
ENSURE_SOCKET_STATE(
"ensureSocketState",
Boolean.FALSE,
"2.7.0",
"ensure socket state before issuing a new command",
false);

private final String optionName;
private final String description;
private final boolean required;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/mariadb/jdbc/util/Options.java
Expand Up @@ -140,6 +140,7 @@ public class Options implements Cloneable {
// MySQL sha authentication
public String serverRsaPublicKeyFile;
public boolean allowPublicKeyRetrieval;
public boolean ensureSocketState;

@Override
public String toString() {
Expand Down
38 changes: 38 additions & 0 deletions src/test/java/org/mariadb/jdbc/StatementTest.java
Expand Up @@ -743,4 +743,42 @@ public void escaping() throws Exception {
}
}
}

@Test
public void ensureStreamingState() throws Exception {
Assume.assumeTrue(isMariadbServer() && minVersion(10, 1));
createProcedure(
"ensureStreamingState",
"(INOUT p1 INT) BEGIN SELECT * from seq_1_to_3; SELECT * from " + "seq_5_to_7; END");
Statement stmt = sharedConnection.createStatement();
PreparedStatement prep = sharedConnection.prepareCall("CALL ensureStreamingState(?)");
prep.setObject(1, 5);
prep.setFetchSize(1);
prep.execute();

ResultSet rs = stmt.executeQuery("SELECT 50");
assertTrue(rs.next());
assertEquals(50, rs.getInt(1));
rs = prep.getResultSet();
assertTrue(rs.next());
assertEquals(1, rs.getInt(1));
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
assertTrue(rs.next());
assertEquals(3, rs.getInt(1));
assertFalse(rs.next());

assertTrue(prep.getMoreResults());

rs = prep.getResultSet();
assertTrue(rs.next());
assertEquals(5, rs.getInt(1));
assertTrue(rs.next());
assertEquals(6, rs.getInt(1));
assertTrue(rs.next());
assertEquals(7, rs.getInt(1));
assertFalse(rs.next());

assertFalse(prep.getMoreResults());
}
}

0 comments on commit 6601971

Please sign in to comment.