Skip to content

Commit

Permalink
misc - multi host streaming error test
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Apr 16, 2021
1 parent db9de26 commit e33127f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 29 deletions.
13 changes: 3 additions & 10 deletions src/main/java/org/mariadb/jdbc/client/MultiPrimaryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,7 @@ public void readStreamingResults(
completions, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
} catch (SQLNonTransientConnectionException e) {
reConnect();
currentClient.readStreamingResults(
completions, fetchSize, maxRows, resultSetConcurrency, resultSetType, closeOnCompletion);
throw getExceptionFactory().create("Socket error during result streaming", "HY000");
}
}

Expand All @@ -390,13 +389,7 @@ public void abort(Executor executor) throws SQLException {
if (closed) {
throw new SQLNonTransientConnectionException("Connection is closed", "08000", 1220);
}

try {
currentClient.abort(executor);
} catch (SQLNonTransientConnectionException e) {
reConnect();
currentClient.abort(executor);
}
currentClient.abort(executor);
}

@Override
Expand Down Expand Up @@ -460,7 +453,7 @@ public HostAddress getHostAddress() {
}

public boolean isPrimary() {
return getHostAddress().primary;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ private SQLException createException(
case "40":
returnEx = new SQLTransactionRollbackException(msg, sqlState, errorCode, cause);
break;
case "HY":
returnEx = new SQLException(msg, sqlState, errorCode, cause);
break;
default:
returnEx = new SQLTransientConnectionException(msg, sqlState, errorCode, cause);
break;
Expand Down
4 changes: 3 additions & 1 deletion src/test/java/org/mariadb/jdbc/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class Common {
public static Connection sharedConn;
public static Connection sharedConnBinary;
public static String hostname;
public static int port;
public static String user;
public static String password;
public static TcpProxy proxy;
Expand All @@ -48,12 +49,13 @@ public class Common {
}
hostname = get("DB_HOST", prop);
user = get("DB_USER", prop);
port = Integer.parseInt(get("DB_PORT", prop));
password = get("DB_PASSWORD", prop);
mDefUrl =
String.format(
"jdbc:mariadb://%s:%s/%s?user=%s&password=%s&restrictedAuth=false&%s",
hostname,
get("DB_PORT", prop),
port,
get("DB_DATABASE", prop),
user,
password,
Expand Down
156 changes: 138 additions & 18 deletions src/test/java/org/mariadb/jdbc/integration/MultiHostTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import static org.junit.jupiter.api.Assertions.*;

import java.io.IOException;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.*;

import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.mariadb.jdbc.*;
import org.mariadb.jdbc.Connection;
import org.mariadb.jdbc.Statement;
import org.mariadb.jdbc.integration.tools.TcpProxy;

public class MultiHostTest extends Common {
Expand Down Expand Up @@ -106,7 +106,7 @@ public void replicaNotSet() throws Exception {
@Test
public void masterFailover() throws Exception {
Assumptions.assumeTrue(
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));

Configuration conf = Configuration.parse(mDefUrl);
HostAddress hostAddress = conf.addresses().get(0);
Expand All @@ -117,21 +117,21 @@ public void masterFailover() throws Exception {
}

String url =
mDefUrl.replaceAll(
"//([^/]*)/",
String.format(
"//address=(host=localhost)(port=9999)(type=master),address=(host=localhost)(port=%s)(type=master),address=(host=%s)(port=%s)(type=master)/",
proxy.getLocalPort(), hostAddress.host, hostAddress.port));
mDefUrl.replaceAll(
"//([^/]*)/",
String.format(
"//address=(host=localhost)(port=9999)(type=master),address=(host=localhost)(port=%s)(type=master),address=(host=%s)(port=%s)(type=master)/",
proxy.getLocalPort(), hostAddress.host, hostAddress.port));
url = url.replaceAll("jdbc:mariadb:", "jdbc:mariadb:sequential:");
if (conf.sslMode() == SslMode.VERIFY_FULL) {
url = url.replaceAll("sslMode=verify-full", "sslMode=verify-ca");
}

try (Connection con =
(Connection)
DriverManager.getConnection(
url
+ "waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=4&connectTimeout=500")) {
(Connection)
DriverManager.getConnection(
url
+ "waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=4&connectTimeout=500")) {
Statement stmt = con.createStatement();
stmt.execute("SET @con=1");
proxy.restart(50);
Expand All @@ -144,10 +144,10 @@ public void masterFailover() throws Exception {

// with transaction replay
try (Connection con =
(Connection)
DriverManager.getConnection(
url
+ "transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=4&connectTimeout=500")) {
(Connection)
DriverManager.getConnection(
url
+ "transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=4&connectTimeout=500")) {
Statement stmt = con.createStatement();
stmt.execute("DROP TABLE IF EXISTS testReplay");
stmt.execute("CREATE TABLE testReplay(id INT)");
Expand Down Expand Up @@ -183,6 +183,65 @@ public void masterFailover() throws Exception {
}
}

@Test
public void masterStreamingFailover() throws Exception {
Assumptions.assumeTrue(
isMariaDBServer() &&
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));

Configuration conf = Configuration.parse(mDefUrl);
HostAddress hostAddress = conf.addresses().get(0);
try {
proxy = new TcpProxy(hostAddress.host, hostAddress.port);
} catch (IOException i) {
throw new SQLException("proxy error", i);
}

String url =
mDefUrl.replaceAll(
"//([^/]*)/",
String.format(
"//address=(host=localhost)(port=%s)(type=master)/",
proxy.getLocalPort(), hostAddress.host, hostAddress.port));
url = url.replaceAll("jdbc:mariadb:", "jdbc:mariadb:sequential:");
if (conf.sslMode() == SslMode.VERIFY_FULL) {
url = url.replaceAll("sslMode=verify-full", "sslMode=verify-ca");
}

Connection con =
(Connection)
DriverManager.getConnection(
url
+ "allowMultiQueries&transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=40&connectTimeout=500&useReadAheadInput=false");
long threadId = con.getThreadId();
Statement stmt = con.createStatement();
stmt.setFetchSize(2);
ResultSet rs = stmt.executeQuery("SELECT * FROM seq_1_to_50; SELECT * FROM seq_1_to_50000");
rs.next();
assertEquals(1, rs.getInt(1));
proxy.restart(50);
Statement stmt2 = con.createStatement();
assertThrowsContains(SQLException.class, () -> stmt2.executeQuery("SELECT * from mysql.user"), "Socket error during result streaming");
assertNotEquals(threadId, con.getThreadId());

// additional small test
assertEquals(0, con.getNetworkTimeout());
con.setNetworkTimeout(Runnable::run,10);
assertEquals(10, con.getNetworkTimeout());

con.setReadOnly(true);
con.close();
assertThrowsContains(SQLNonTransientConnectionException.class, () -> con.setReadOnly(false), "Connection is closed");
assertThrowsContains(SQLNonTransientConnectionException.class, () -> con.abort(Runnable::run), "Connection is closed");

Connection con2 =
(Connection)
DriverManager.getConnection(
url
+ "allowMultiQueries&transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=40&connectTimeout=500&useReadAheadInput=false");
con2.abort(Runnable::run);
}

@Test
public void masterReplicationFailover() throws Exception {
Assumptions.assumeTrue(
Expand Down Expand Up @@ -242,6 +301,67 @@ public void masterReplicationFailover() throws Exception {
}
}


@Test
public void masterReplicationStreamingFailover() throws Exception {
Assumptions.assumeTrue(
isMariaDBServer() &&
!"skysql".equals(System.getenv("srv")) && !"skysql-ha".equals(System.getenv("srv")));

Configuration conf = Configuration.parse(mDefUrl);
HostAddress hostAddress = conf.addresses().get(0);
try {
proxy = new TcpProxy(hostAddress.host, hostAddress.port);
} catch (IOException i) {
throw new SQLException("proxy error", i);
}

String url =
mDefUrl.replaceAll(
"//([^/]*)/",
String.format(
"//address=(host=localhost)(port=%s)(type=primary),address=(host=%s)(port=%s)(type=replica)/",
proxy.getLocalPort(), hostAddress.host, hostAddress.port, hostname, port));
url = url.replaceAll("jdbc:mariadb:", "jdbc:mariadb:replication:");
if (conf.sslMode() == SslMode.VERIFY_FULL) {
url = url.replaceAll("sslMode=verify-full", "sslMode=verify-ca");
}

Connection con =
(Connection)
DriverManager.getConnection(
url
+ "allowMultiQueries&transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=40&connectTimeout=500&useReadAheadInput=false");
long threadId = con.getThreadId();
Statement stmt = con.createStatement();
stmt.setFetchSize(2);
ResultSet rs = stmt.executeQuery("SELECT * FROM seq_1_to_50; SELECT * FROM seq_1_to_50000");
rs.next();
assertEquals(1, rs.getInt(1));
proxy.restart(50);
Statement stmt2 = con.createStatement();
assertThrowsContains(SQLException.class, () -> stmt2.executeQuery("SELECT * from mysql.user"), "Socket error during result streaming");
assertNotEquals(threadId, con.getThreadId());

// additional small test
assertEquals(0, con.getNetworkTimeout());
con.setNetworkTimeout(Runnable::run,10);
assertEquals(10, con.getNetworkTimeout());

con.setReadOnly(true);
con.close();
assertThrowsContains(SQLNonTransientConnectionException.class, () -> con.setReadOnly(false), "Connection is closed");
assertThrowsContains(SQLNonTransientConnectionException.class, () -> con.abort(Runnable::run), "Connection is closed");

Connection con2 =
(Connection)
DriverManager.getConnection(
url
+ "allowMultiQueries&transactionReplay=true&waitReconnectTimeout=300&deniedListTimeout=300&retriesAllDown=40&connectTimeout=500&useReadAheadInput=false");
con2.abort(Runnable::run);
}


public Connection createProxyConKeep(String opts) throws SQLException {
Configuration conf = Configuration.parse(mDefUrl);
HostAddress hostAddress = conf.addresses().get(0);
Expand Down

0 comments on commit e33127f

Please sign in to comment.