Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ-1015] pipelining now write to socket only one array, permitting…
… faster command execution when on distant server
  • Loading branch information
rusher committed Sep 26, 2022
1 parent 072e412 commit eee5184
Show file tree
Hide file tree
Showing 12 changed files with 364 additions and 40 deletions.
8 changes: 5 additions & 3 deletions src/benchmark/java/org/mariadb/jdbc/Common.java
Expand Up @@ -76,11 +76,13 @@ public void createConnections() throws Exception {
connectionText =
((java.sql.Driver) Class.forName(className).getDeclaredConstructor().newInstance())
.connect(jdbcUrlText, new Properties());
Properties rewriteStmtProperties = new Properties();
rewriteStmtProperties.setProperty("rewriteBatchedStatements", "true");
String jdbcUrlTextRewrite =
String.format(
jdbcBase,
driver, host, port, database, username, password, false, false, "&rewriteBatchedStatements=true&useBulkStmts=false" + other);
connectionTextRewrite =
((java.sql.Driver) Class.forName(className).getDeclaredConstructor().newInstance())
.connect(jdbcUrlText, rewriteStmtProperties);
.connect(jdbcUrlTextRewrite, new Properties());
connectionBinary =
((java.sql.Driver) Class.forName(className).getDeclaredConstructor().newInstance())
.connect(jdbcUrlBinary, new Properties());
Expand Down
8 changes: 4 additions & 4 deletions src/benchmark/java/org/mariadb/jdbc/Do_1000_params.java
Expand Up @@ -26,10 +26,10 @@ public int text(MyState state) throws Throwable {
return run(state.connectionText);
}

@Benchmark
public int binary(MyState state) throws Throwable {
return run(state.connectionBinary);
}
// @Benchmark
// public int binary(MyState state) throws Throwable {
// return run(state.connectionBinary);
// }

private int run(Connection con) throws Throwable {

Expand Down
30 changes: 28 additions & 2 deletions src/main/java/org/mariadb/jdbc/Configuration.java
Expand Up @@ -101,7 +101,7 @@ public class Configuration {
private boolean useCompression = false;
private boolean useAffectedRows = false;
private boolean useBulkStmts = true;

private boolean disablePipeline = false;
// prepare
private boolean cachePrepStmts = true;
private int prepStmtCacheSize = 250;
Expand Down Expand Up @@ -189,6 +189,7 @@ private Configuration(
boolean useCompression,
boolean useAffectedRows,
boolean useBulkStmts,
boolean disablePipeline,
boolean cachePrepStmts,
int prepStmtCacheSize,
boolean useServerPrepStmts,
Expand Down Expand Up @@ -259,6 +260,7 @@ private Configuration(
this.useCompression = useCompression;
this.useAffectedRows = useAffectedRows;
this.useBulkStmts = useBulkStmts;
this.disablePipeline = disablePipeline;
this.cachePrepStmts = cachePrepStmts;
this.prepStmtCacheSize = prepStmtCacheSize;
this.useServerPrepStmts = useServerPrepStmts;
Expand Down Expand Up @@ -327,6 +329,7 @@ private Configuration(
Boolean useServerPrepStmts,
String connectionAttributes,
Boolean useBulkStmts,
Boolean disablePipeline,
Boolean autocommit,
Boolean useMysqlMetadata,
Boolean createDatabaseIfNotExist,
Expand Down Expand Up @@ -406,6 +409,7 @@ private Configuration(
if (useServerPrepStmts != null) this.useServerPrepStmts = useServerPrepStmts;
this.connectionAttributes = connectionAttributes;
if (useBulkStmts != null) this.useBulkStmts = useBulkStmts;
if (disablePipeline != null) this.disablePipeline = disablePipeline;
if (autocommit != null) this.autocommit = autocommit;
if (useMysqlMetadata != null) this.useMysqlMetadata = useMysqlMetadata;
if (createDatabaseIfNotExist != null) this.createDatabaseIfNotExist = createDatabaseIfNotExist;
Expand Down Expand Up @@ -749,6 +753,7 @@ public Configuration clone(String username, String password) {
this.useCompression,
this.useAffectedRows,
this.useBulkStmts,
this.disablePipeline,
this.cachePrepStmts,
this.prepStmtCacheSize,
this.useServerPrepStmts,
Expand Down Expand Up @@ -1160,6 +1165,15 @@ public boolean useBulkStmts() {
return useBulkStmts;
}

/**
* Disable pipeline.
*
* @return is pipeline disabled.
*/
public boolean disablePipeline() {
return disablePipeline;
}

/**
* Force session autocommit on connection creation
*
Expand Down Expand Up @@ -1667,7 +1681,7 @@ public static final class Builder implements Cloneable {
private Boolean useCompression;
private Boolean useAffectedRows;
private Boolean useBulkStmts;

private Boolean disablePipeline;
// prepare
private Boolean cachePrepStmts;
private Integer prepStmtCacheSize;
Expand Down Expand Up @@ -2231,6 +2245,17 @@ public Builder useBulkStmts(Boolean useBulkStmts) {
return this;
}

/**
* Disable pipeline
*
* @param disablePipeline disable pipeline.
* @return this {@link Builder}
*/
public Builder disablePipeline(Boolean disablePipeline) {
this.disablePipeline = disablePipeline;
return this;
}

/**
* Permit to force autocommit connection value
*
Expand Down Expand Up @@ -2574,6 +2599,7 @@ public Configuration build() throws SQLException {
this.useServerPrepStmts,
this.connectionAttributes,
this.useBulkStmts,
this.disablePipeline,
this.autocommit,
this.useMysqlMetadata,
this.createDatabaseIfNotExist,
Expand Down
15 changes: 5 additions & 10 deletions src/main/java/org/mariadb/jdbc/ServerPreparedStatement.java
Expand Up @@ -19,6 +19,7 @@
import org.mariadb.jdbc.message.ClientMessage;
import org.mariadb.jdbc.message.client.BulkExecutePacket;
import org.mariadb.jdbc.message.client.ExecutePacket;
import org.mariadb.jdbc.message.client.PrepareExecutePacket;
import org.mariadb.jdbc.message.client.PreparePacket;
import org.mariadb.jdbc.message.server.OkPacket;
import org.mariadb.jdbc.message.server.PrepareResultPacket;
Expand Down Expand Up @@ -90,12 +91,8 @@ protected void executeInternal() throws SQLException {
if (prepareResult == null)
if (canCachePrepStmts) prepareResult = con.getContext().getPrepareCache().get(cmd, this);
try {
if (prepareResult == null && con.getContext().hasClientCapability(STMT_BULK_OPERATIONS)) {
try {
executePipeline(cmd);
} catch (BatchUpdateException b) {
throw (SQLException) b.getCause();
}
if (prepareResult == null && con.getContext().permitPipeline()) {
executePipeline(cmd);
} else {
executeStandard(cmd);
}
Expand All @@ -114,13 +111,11 @@ protected void executeInternal() throws SQLException {
private void executePipeline(String cmd) throws SQLException {
// server is 10.2+, permitting to execute last prepare with (-1) statement id.
// Server send prepare, followed by execute, in one exchange.
PreparePacket prepare = new PreparePacket(cmd);
ExecutePacket execute = new ExecutePacket(null, parameters, cmd, this, localInfileInputStream);
try {
List<Completion> res =
con.getClient()
.executePipeline(
new ClientMessage[] {prepare, execute},
.execute(
new PrepareExecutePacket(cmd, parameters, this, localInfileInputStream),
this,
fetchSize,
maxRows,
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/mariadb/jdbc/client/Context.java
Expand Up @@ -37,6 +37,13 @@ public interface Context {
*/
boolean hasClientCapability(long flag);

/**
* Does server and client permit pipeline
*
* @return true if permitted
*/
boolean permitPipeline();

/**
* Get server connection state
*
Expand Down
Expand Up @@ -4,6 +4,8 @@

package org.mariadb.jdbc.client.context;

import static org.mariadb.jdbc.util.constants.Capabilities.STMT_BULK_OPERATIONS;

import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.client.Context;
import org.mariadb.jdbc.client.PrepareCache;
Expand Down Expand Up @@ -90,6 +92,10 @@ public boolean hasClientCapability(long flag) {
return (clientCapabilities & flag) > 0;
}

public boolean permitPipeline() {
return !conf.disablePipeline() && (clientCapabilities & STMT_BULK_OPERATIONS) > 0;
}

public int getServerStatus() {
return serverStatus;
}
Expand Down
53 changes: 34 additions & 19 deletions src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java
Expand Up @@ -4,10 +4,7 @@

package org.mariadb.jdbc.client.impl;

import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.*;
import java.net.Socket;
import java.net.SocketException;
import java.sql.ResultSet;
Expand Down Expand Up @@ -95,8 +92,7 @@ public StandardClient(
this.lock = lock;
this.hostAddress = hostAddress;
this.exceptionFactory = new ExceptionFactory(conf, hostAddress);
this.disablePipeline =
Boolean.parseBoolean(conf.nonMappedOptions().getProperty("disablePipeline", "false"));
this.disablePipeline = conf.disablePipeline();

String host = hostAddress != null ? hostAddress.host : null;
this.socketTimeout = conf.socketTimeout();
Expand All @@ -106,7 +102,7 @@ public StandardClient(
// **********************************************************************
// creating socket
// **********************************************************************
OutputStream out = socket.getOutputStream();
OutputStream out = new BufferedOutputStream(socket.getOutputStream(), 16384);
InputStream in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(socket.getInputStream())
Expand Down Expand Up @@ -157,7 +153,7 @@ public StandardClient(
hostAddress, socket, clientCapabilities, exchangeCharset, context, writer);

if (sslSocket != null) {
out = sslSocket.getOutputStream();
out = new BufferedOutputStream(sslSocket.getOutputStream(), 16384);
in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(sslSocket.getInputStream())
Expand Down Expand Up @@ -652,18 +648,37 @@ public List<Completion> execute(
streamStmt = null;
}
List<Completion> completions = new ArrayList<>();
while (nbResp-- > 0) {
readResults(
stmt,
message,
completions,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion);
try {
while (nbResp-- > 0) {
readResults(
stmt,
message,
completions,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion);
}
return completions;
} catch (SQLException e) {
while (nbResp-- > 0) {
try {
readResults(
stmt,
message,
completions,
fetchSize,
maxRows,
resultSetConcurrency,
resultSetType,
closeOnCompletion);
} catch (SQLException ee) {
// eat
}
}
throw e;
}
return completions;
}
}

Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/mariadb/jdbc/client/socket/Writer.java
Expand Up @@ -162,6 +162,13 @@ public interface Writer {
*/
void flush() throws IOException;

/**
* Send packet to buffered outputstream without flushing
*
* @throws IOException if socket error occur.
*/
void flushPipeline() throws IOException;

/**
* must a max allowed length exception be thrown
*
Expand Down
Expand Up @@ -714,6 +714,20 @@ public void flush() throws IOException {
mark = -1;
}

public void flushPipeline() throws IOException {
writeSocket(false);

// if buf is big, and last query doesn't use at least half of it, resize buf to default
// value
if (buf.length > SMALL_BUFFER_SIZE && cmdLength * 2 < buf.length) {
buf = new byte[SMALL_BUFFER_SIZE];
}

pos = 4;
cmdLength = 0;
mark = -1;
}

/**
* Count query size. If query size is greater than max_allowed_packet and nothing has been already
* send, throw an exception to avoid having the connection closed.
Expand Down

0 comments on commit eee5184

Please sign in to comment.