Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: replace BufferedOutputStream with unsynchronized PgBufferedOutputStream, allow configuring different Java and SO_SNDBUF buffer sizes #3248

Merged
merged 4 commits into from
Jun 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ In addition to the standard connection parameters the driver supports a number o
| sslpasswordcallback | String | null | The name of a class (for use in [Class.forName(String)](https://docs.oracle.com/javase/6/docs/api/java/lang/Class.html#forName%28java.lang.String%29)) that implements javax.security.auth.callback.CallbackHandler and can handle PasswordCallback for the ssl password. |
| sslpassword | String | null | The password for the client's ssl key (ignored if sslpasswordcallback is set) |
| sendBufferSize | Integer | -1 | Socket write buffer size |
| maxSendBufferSize | Integer | 65536 | Maximum amount of bytes buffered before sending to the backend. pgjdbc uses `least(maxSendBufferSize, greatest(8192, SO_SNDBUF))` to determine the buffer size. |
| receiveBufferSize | Integer | -1 | Socket read buffer size |
| logServerErrorDetail | Boolean | true | Allows server error detail (such as sql statements and values) to be logged and passed on in exceptions. Setting to false will mask these errors so they won't be exposed to users, or logs. |
| allowEncodingChanges | Boolean | false | Allow for changes in client_encoding |
Expand Down
9 changes: 7 additions & 2 deletions docs/content/documentation/use.md
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,15 @@ This parameter is ignored on non-Windows platforms.
* **`useSpnego (`*boolean*`)`** *Default `false`*\
Use SPNEGO in SSPI authentication requests

* **`sendBufferSize (`*int*`)`** *Default* `-1`*\
* **`sendBufferSize (`*int*`)`** *Default `-1`*\
Sets SO_SNDBUF on the connection stream

* **`receiveBufferSize (`*int*`)`** *Default* `-1`*\
* **`maxSendBufferSize (`*int*`)`** *Default `8192`*\
Configures the maximum amount of bytes buffered before sending to the backend. pgjdbc uses
`least(maxSendBufferSize, greatest(8192, SO_SNDBUF))` to determine the buffer size.
Since: 42.7.4

* **`receiveBufferSize (`*int*`)`** *Default `-1`*\
Sets SO_RCVBUF on the connection stream

* **`readOnly (`*boolean*`)`** *Default `false`*\
Expand Down
8 changes: 8 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/PGProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ public enum PGProperty {
null,
"Specifies size of buffer during fetching result set. Can be specified as specified size or percent of heap memory."),

/**
* Maximum amount of bytes buffered before sending to the backend, default is 8192.
*/
MAX_SEND_BUFFER_SIZE(
"maxSendBufferSize",
"8192",
"Maximum amount of bytes buffered before sending to the backend"),

/**
* Specify 'options' connection initialization parameter.
* The value of this parameter may contain spaces and other special characters or their URL representation.
Expand Down
105 changes: 54 additions & 51 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.postgresql.util.PGPropertyMaxResultBufferParser;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.postgresql.util.internal.PgBufferedOutputStream;
import org.postgresql.util.internal.SourceStreamIOException;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.MessageProp;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FilterOutputStream;
Expand Down Expand Up @@ -46,25 +48,32 @@
public class PGStream implements Closeable, Flushable {
private final SocketFactory socketFactory;
private final HostSpec hostSpec;
private final int maxSendBufferSize;

private final byte[] int4Buf;
private final byte[] int2Buf;

private Socket connection;
private VisibleBufferedInputStream pgInput;
private OutputStream pgOutput;
private byte @Nullable [] streamBuffer;
private PgBufferedOutputStream pgOutput;

public boolean isGssEncrypted() {
return gssEncrypted;
}

boolean gssEncrypted;

public void setSecContext(GSSContext secContext) {
public void setSecContext(GSSContext secContext) throws GSSException {
MessageProp messageProp = new MessageProp(0, true);
pgInput = new VisibleBufferedInputStream(new GSSInputStream(pgInput.getWrapped(), secContext, messageProp ), 8192);
pgOutput = new GSSOutputStream(pgOutput, secContext, messageProp, 16384);
// See https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-GSSAPI
// Note that the server will only accept encrypted packets from the client which are less than
// 16kB; gss_wrap_size_limit() should be used by the client to determine the size of
// the unencrypted message which will fit within this limit and larger messages should be
// broken up into multiple gss_wrap() calls
// See https://github.com/postgres/postgres/blob/acecd6746cdc2df5ba8dcc2c2307c6560c7c2492/src/backend/libpq/be-secure-gssapi.c#L348
// Backend includes "int4 messageSize" into 16384 limit, so we subtract 4.
pgOutput = new GSSOutputStream(pgOutput, secContext, messageProp, 16384 - 4);
gssEncrypted = true;

}
Expand All @@ -89,11 +98,29 @@ public void setSecContext(GSSContext secContext) {
* @param hostSpec the host and port to connect to
* @param timeout timeout in milliseconds, or 0 if no timeout set
* @throws IOException if an IOException occurs below it.
* @deprecated use {@link #PGStream(SocketFactory, org.postgresql.util.HostSpec, int, int)}
*/
@Deprecated
@SuppressWarnings({"method.invocation", "initialization.fields.uninitialized"})
public PGStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout) throws IOException {
this(socketFactory, hostSpec, timeout, 8192);
}

/**
* Constructor: Connect to the PostgreSQL back end and return a stream connection.
*
* @param socketFactory socket factory to use when creating sockets
* @param hostSpec the host and port to connect to
* @param timeout timeout in milliseconds, or 0 if no timeout set
* @param maxSendBufferSize maximum amount of bytes buffered before sending to the backend
* @throws IOException if an IOException occurs below it.
*/
@SuppressWarnings({"method.invocation", "initialization.fields.uninitialized"})
public PGStream(SocketFactory socketFactory, HostSpec hostSpec, int timeout,
int maxSendBufferSize) throws IOException {
this.socketFactory = socketFactory;
this.hostSpec = hostSpec;
this.maxSendBufferSize = maxSendBufferSize;

Socket socket = createSocket(timeout);
changeSocket(socket);
Expand Down Expand Up @@ -133,6 +160,7 @@ public PGStream(PGStream pgStream, int timeout) throws IOException {

this.socketFactory = pgStream.socketFactory;
this.hostSpec = pgStream.hostSpec;
this.maxSendBufferSize = pgStream.maxSendBufferSize;

Socket socket = createSocket(timeout);
changeSocket(socket);
Expand All @@ -155,7 +183,7 @@ public PGStream(PGStream pgStream, int timeout) throws IOException {
* @param socketFactory socket factory
* @param hostSpec the host and port to connect to
* @throws IOException if an IOException occurs below it.
* @deprecated use {@link #PGStream(SocketFactory, org.postgresql.util.HostSpec, int)}
* @deprecated use {@link #PGStream(SocketFactory, org.postgresql.util.HostSpec, int, int)}
*/
@Deprecated
public PGStream(SocketFactory socketFactory, HostSpec hostSpec) throws IOException {
Expand Down Expand Up @@ -274,9 +302,9 @@ public void changeSocket(Socket socket) throws IOException {
// really need to.
connection.setTcpNoDelay(true);

// Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no>
pgInput = new VisibleBufferedInputStream(connection.getInputStream(), 8192);
pgOutput = new BufferedOutputStream(connection.getOutputStream(), 8192);
int sendBufferSize = Math.min(maxSendBufferSize, Math.max(8192, socket.getSendBufferSize()));
pgOutput = new PgBufferedOutputStream(connection.getOutputStream(), sendBufferSize);

if (encoding != null) {
setEncoding(encoding);
Expand Down Expand Up @@ -355,11 +383,7 @@ public void sendChar(int val) throws IOException {
* @throws IOException if an I/O error occurs
*/
public void sendInteger4(int val) throws IOException {
int4Buf[0] = (byte) (val >>> 24);
int4Buf[1] = (byte) (val >>> 16);
int4Buf[2] = (byte) (val >>> 8);
int4Buf[3] = (byte) (val);
pgOutput.write(int4Buf);
pgOutput.writeInt4(val);
}

/**
Expand All @@ -372,9 +396,7 @@ public void sendInteger2(int val) throws IOException {
if (val < 0 || val > 65535) {
throw new IllegalArgumentException("Tried to send an out-of-range integer as a 2-byte unsigned int value: " + val);
}
int2Buf[0] = (byte) (val >>> 8);
int2Buf[1] = (byte) val;
pgOutput.write(int2Buf);
pgOutput.writeInt2(val);
}

/**
Expand Down Expand Up @@ -410,9 +432,9 @@ public void send(byte[] buf, int siz) throws IOException {
*/
public void send(byte[] buf, int off, int siz) throws IOException {
int bufamt = buf.length - off;
pgOutput.write(buf, off, bufamt < siz ? bufamt : siz);
for (int i = bufamt; i < siz; i++) {
pgOutput.write(0);
pgOutput.write(buf, off, Math.min(bufamt, siz));
if (siz > bufamt) {
pgOutput.writeZeros(siz - bufamt);
}
}

Expand All @@ -437,9 +459,7 @@ public OutputStream getOutputStream() {
} catch (Exception re) {
throw new IOException("Error writing bytes to stream", re);
}
for (int i = fixedLengthStream.remaining(); i > 0; i--) {
pgOutput.write(0);
}
pgOutput.writeZeros(fixedLengthStream.remaining());
}

/**
Expand Down Expand Up @@ -677,38 +697,21 @@ public void skip(int size) throws IOException {
*
* @param inStream the stream to read data from
* @param remaining the number of bytes to copy
* @throws IOException if a data I/O error occurs
* @throws IOException if error occurs when writing the data to the output stream
* @throws SourceStreamIOException if error occurs when reading the data from the input stream
*/
public void sendStream(InputStream inStream, int remaining) throws IOException {
int expectedLength = remaining;
byte[] streamBuffer = this.streamBuffer;
if (streamBuffer == null) {
this.streamBuffer = streamBuffer = new byte[8192];
}

while (remaining > 0) {
int count = remaining > streamBuffer.length ? streamBuffer.length : remaining;
int readCount;

try {
readCount = inStream.read(streamBuffer, 0, count);
if (readCount < 0) {
throw new EOFException(
GT.tr("Premature end of input stream, expected {0} bytes, but only read {1}.",
expectedLength, expectedLength - remaining));
}
} catch (IOException ioe) {
while (remaining > 0) {
send(streamBuffer, count);
remaining -= count;
count = remaining > streamBuffer.length ? streamBuffer.length : remaining;
}
throw new PGBindException(ioe);
}
pgOutput.write(inStream, remaining);
}

send(streamBuffer, readCount);
remaining -= readCount;
}
/**
* Writes the given amount of zero bytes to the output stream
* @param length the number of zeros to write
* @throws IOException in case writing to the output stream fails
* @throws SourceStreamIOException in case reading from the source stream fails
*/
public void sendZeros(int length) throws IOException {
pgOutput.writeZeros(length);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ public void sendQueryCancel() throws SQLException {
LOGGER.log(Level.FINEST, " FE=> CancelRequest(pid={0},ckey={1})", new Object[]{cancelPid, cancelKey});
}

// Cancel signal is 16 bytes only, so we use 16 as the max send buffer size
cancelStream =
new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout);
new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout, 16);
Comment on lines +193 to +195
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is the only place where we can justify the buffer size :)

if (cancelSignalTimeout > 0) {
cancelStream.setNetworkTimeout(cancelSignalTimeout);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ private PGStream tryConnect(Properties info, SocketFactory socketFactory, HostSp
throw new PSQLException(GT.tr("Database cannot be null"), PSQLState.INVALID_NAME);
}

PGStream newStream = new PGStream(socketFactory, hostSpec, connectTimeout);
int maxSendBufferSize = PGProperty.MAX_SEND_BUFFER_SIZE.getInt(info);
PGStream newStream = new PGStream(socketFactory, hostSpec, connectTimeout, maxSendBufferSize);
try {
// Set the socket timeout if the "socketTimeout" property has been set.
int socketTimeout = PGProperty.SOCKET_TIMEOUT.getInt(info);
Expand Down Expand Up @@ -520,7 +521,9 @@ private PGStream enableGSSEncrypted(PGStream pgStream, GSSEncMode gssEncMode, St

// We have to reconnect to continue.
pgStream.close();
return new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), connectTimeout);
int maxSendBufferSize = PGProperty.MAX_SEND_BUFFER_SIZE.getInt(info);
return new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), connectTimeout,
maxSendBufferSize);

case 'N':
LOGGER.log(Level.FINEST, " <=BE GSSEncrypted Refused");
Expand Down
12 changes: 10 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.postgresql.util.PSQLState;
import org.postgresql.util.PSQLWarning;
import org.postgresql.util.ServerErrorMessage;
import org.postgresql.util.internal.SourceStreamIOException;

import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -1771,8 +1772,15 @@ private void sendBind(SimpleQuery query, SimpleParameterList params, @Nullable P
pgStream.sendInteger4(params.getV3Length(i)); // Parameter size
try {
params.writeV3Value(i, pgStream); // Parameter value
} catch (PGBindException be) {
bindException = be;
} catch (SourceStreamIOException sse) {
// Remember the error for rethrow later
if (bindException == null) {
bindException = new PGBindException(sse.getCause());
} else {
bindException.addSuppressed(sse.getCause());
}
// Write out the missing bytes so the stream does not corrupt
pgStream.sendZeros(sse.getBytesRemaining());
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/ds/common/BaseDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,22 @@ public void setSendBufferSize(int nbytes) {
PGProperty.SEND_BUFFER_SIZE.set(properties, nbytes);
}

/**
* @return send max buffer size
* @see PGProperty#MAX_SEND_BUFFER_SIZE
*/
public int getMaxSendBufferSize() {
return PGProperty.MAX_SEND_BUFFER_SIZE.getIntNoCheck(properties);
}

/**
* @param nbytes send max buffer size
* @see PGProperty#MAX_SEND_BUFFER_SIZE
*/
public void setMaxSendBufferSize(int nbytes) {
PGProperty.MAX_SEND_BUFFER_SIZE.set(properties, nbytes);
}

/**
* @param count prepare threshold
* @see PGProperty#PREPARE_THRESHOLD
Expand Down
Loading
Loading