Skip to content

Commit

Permalink
perf: replace BufferedOutputStream with unsynchronized PgBufferedOutp…
Browse files Browse the repository at this point in the history
…utStream, increase the send buffer size

This fixes two issues:
1) Java 21 uses ReentrantLock for BufferedOutputStream, however, we do not need synchronization
as we synchronize at the level of QueryExecutor.execute

2) The default buffer size of 8192 was small for the modern connections

Fixes #3245
  • Loading branch information
vlsi committed May 19, 2024
1 parent 450488c commit 130457f
Show file tree
Hide file tree
Showing 4 changed files with 286 additions and 65 deletions.
29 changes: 15 additions & 14 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.postgresql.gss.GSSInputStream;
import org.postgresql.gss.GSSOutputStream;
import org.postgresql.util.internal.PgBufferedOutputStream;

Check failure on line 10 in pgjdbc/src/main/java/org/postgresql/core/PGStream.java

View workflow job for this annotation

GitHub Actions / Code style

Remove 1 line: 10..10
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
Expand All @@ -16,9 +17,9 @@

Check failure on line 17 in pgjdbc/src/main/java/org/postgresql/core/PGStream.java

View workflow job for this annotation

GitHub Actions / Code style

Insert 1 line: import org.postgresql.util.internal.PgBufferedOutputStream;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.MessageProp;

import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.EOFException;
import java.io.FilterOutputStream;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class PGStream implements Closeable, Flushable {

private Socket connection;
private VisibleBufferedInputStream pgInput;
private OutputStream pgOutput;
private PgBufferedOutputStream pgOutput;
private byte @Nullable [] streamBuffer;

public boolean isGssEncrypted() {
Expand All @@ -61,9 +62,14 @@ public boolean isGssEncrypted() {

boolean gssEncrypted;

public void setSecContext(GSSContext secContext) {
public void setSecContext(GSSContext secContext) throws GSSException {
MessageProp messageProp = new MessageProp(0, true);
pgInput = new VisibleBufferedInputStream(new GSSInputStream(pgInput.getWrapped(), secContext, messageProp ), 8192);
// See https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-GSSAPI
// Note that the server will only accept encrypted packets from the client which are less than
// 16kB; gss_wrap_size_limit() should be used by the client to determine the size of
// the unencrypted message which will fit within this limit and larger messages should be
// broken up into multiple gss_wrap() calls
pgOutput = new GSSOutputStream(pgOutput, secContext, messageProp, 16384);
gssEncrypted = true;

Expand Down Expand Up @@ -274,9 +280,10 @@ public void changeSocket(Socket socket) throws IOException {
// really need to.
connection.setTcpNoDelay(true);

// Buffer sizes submitted by Sverre H Huseby <sverrehu@online.no>
pgInput = new VisibleBufferedInputStream(connection.getInputStream(), 8192);
pgOutput = new BufferedOutputStream(connection.getOutputStream(), 8192);
int receiveBufferSize = Math.max(8192, socket.getReceiveBufferSize());
pgInput = new VisibleBufferedInputStream(connection.getInputStream(), receiveBufferSize);
int sendBufferSize = Math.max(8192, socket.getSendBufferSize());
pgOutput = new PgBufferedOutputStream(connection.getOutputStream(), sendBufferSize);

if (encoding != null) {
setEncoding(encoding);
Expand Down Expand Up @@ -355,11 +362,7 @@ public void sendChar(int val) throws IOException {
* @throws IOException if an I/O error occurs
*/
public void sendInteger4(int val) throws IOException {
int4Buf[0] = (byte) (val >>> 24);
int4Buf[1] = (byte) (val >>> 16);
int4Buf[2] = (byte) (val >>> 8);
int4Buf[3] = (byte) (val);
pgOutput.write(int4Buf);
pgOutput.writeInt4(val);
}

/**
Expand All @@ -372,9 +375,7 @@ public void sendInteger2(int val) throws IOException {
if (val < 0 || val > 65535) {
throw new IllegalArgumentException("Tried to send an out-of-range integer as a 2-byte unsigned int value: " + val);
}
int2Buf[0] = (byte) (val >>> 8);
int2Buf[1] = (byte) val;
pgOutput.write(int2Buf);
pgOutput.writeInt2(val);
}

/**
Expand Down
104 changes: 53 additions & 51 deletions pgjdbc/src/main/java/org/postgresql/gss/GSSOutputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,81 +5,83 @@

package org.postgresql.gss;

import org.postgresql.util.internal.PgBufferedOutputStream;

import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.MessageProp;

import java.io.IOException;
import java.io.OutputStream;

public class GSSOutputStream extends OutputStream {
/**
* Output stream that wraps each packed with GSS encryption.
*/
public class GSSOutputStream extends PgBufferedOutputStream {
private final PgBufferedOutputStream pgOut;
private final GSSContext gssContext;
private final MessageProp messageProp;
private final byte[] buffer;
private final byte[] int4Buf = new byte[4];
private int index;
private final OutputStream wrapped;

public GSSOutputStream(OutputStream out, GSSContext gssContext, MessageProp messageProp, int bufferSize) {
wrapped = out;
/**
* Creates GSS output stream.
* @param out output stream for the encrypted data
* @param gssContext gss context
* @param messageProp message properties
* @param maxTokenSize maximum length of the encrypted messages
*/
public GSSOutputStream(PgBufferedOutputStream out, GSSContext gssContext, MessageProp messageProp, int maxTokenSize) throws GSSException {
super(out, getBufferSize(gssContext, messageProp, maxTokenSize));
this.pgOut = out;
this.gssContext = gssContext;
this.messageProp = messageProp;
buffer = new byte[bufferSize];
}

@Override
public void write(int b) throws IOException {
buffer[index++] = (byte) b;
if (index >= buffer.length) {
flush();
}
}

@Override
public void write(byte[] buf) throws IOException {
write(buf, 0, buf.length);
private static int getBufferSize(GSSContext gssContext, MessageProp messageProp, int maxTokenSize) throws GSSException {
return gssContext.getWrapSizeLimit(messageProp.getQOP(), messageProp.getPrivacy(), maxTokenSize);
}

@Override
public void write(byte[] b, int pos, int len) throws IOException {
int max;

while ( len > 0 ) {
int roomToWrite = buffer.length - index;
if ( len < roomToWrite ) {
System.arraycopy(b, pos, buffer, index, len);
index += len;
len -= roomToWrite;
} else {
System.arraycopy(b, pos, buffer, index, roomToWrite);
index += roomToWrite;
len -= roomToWrite;
}
if (roomToWrite == 0) {
flush();
}
protected void flushBuffer() throws IOException {
if (count > 0) {
writeWrapped(buf, 0, count);
count = 0;
}
}

@Override
public void flush() throws IOException {
private void writeWrapped(byte[] b, int off, int len) throws IOException {
try {
byte[] token = gssContext.wrap(buffer, 0, index, messageProp);
sendInteger4Raw(token.length);
wrapped.write(token, 0, token.length);
index = 0;
} catch ( GSSException ex ) {
byte[] token = gssContext.wrap(buf, off, len, messageProp);
pgOut.writeInt4(token.length);
pgOut.write(token, 0, token.length);
} catch (GSSException ex) {
throw new IOException(ex);
}
wrapped.flush();
}

private void sendInteger4Raw(int val) throws IOException {
int4Buf[0] = (byte) (val >>> 24);
int4Buf[1] = (byte) (val >>> 16);
int4Buf[2] = (byte) (val >>> 8);
int4Buf[3] = (byte) (val);
wrapped.write(int4Buf);
@Override
public void write(byte[] b, int off, int len) throws IOException {
if (count > 0) {
// If there's some data in the buffer, combine both
int avail = buf.length - count;
int prefixLength = Math.min(len, avail);
System.arraycopy(b, off, buf, count, prefixLength);
count += prefixLength;
off += prefixLength;
len -= prefixLength;
if (count == buf.length) {
flushBuffer();
}
}
// Write out the rest, chunk the writes, so we do not exceed the maximum encrypted message size
while (len >= buf.length) {
writeWrapped(b, off, buf.length);
off += buf.length;
len -= buf.length;
}
if (len == 0) {
return;
}
System.arraycopy(b, off, buf, 0, len);
count += len;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2024, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.util.internal;

import org.postgresql.util.ByteConverter;

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* Buffered output stream. The key difference from {@link java.io.BufferedOutputStream} is that
* {@code PgBufferedOutputStream} does not perform synchronization.
* This is an internal class, and it is not meant to be used as a public API.
*/
public class PgBufferedOutputStream extends FilterOutputStream {
/**
* Buffer for the data
*/
protected final byte[] buf;

/**
* Number of bytes stored in the buffer
*/
protected int count;

public PgBufferedOutputStream(OutputStream out, int bufferSize) {
super(out);
buf = new byte[bufferSize];
}

public void writeInt4(int val) throws IOException {
byte[] buf = this.buf;
if (buf.length - count < 4) {
flushBuffer();
}
int count = this.count;
ByteConverter.int4(buf, count, val);
this.count = count + 4;
}

public void writeInt2(int val) throws IOException {
byte[] buf = this.buf;
if (buf.length - count < 2) {
flushBuffer();
}
int count = this.count;
ByteConverter.int2(buf, count, val);
this.count = count + 2;
}

protected void flushBuffer() throws IOException {
if (count > 0) {
out.write(buf, 0, count);
count = 0;
}
}

@Override
public void flush() throws IOException {
flushBuffer();
super.flush();
}

@Override
public void write(int b) throws IOException {
if (count >= buf.length) {
flushBuffer();
}
buf[count++] = (byte) b;
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
if (len == 0) {
return;
}

if (count > 0) {
int avail = buf.length - count;
if (avail + buf.length <= len) {
// We have data in the buffer, however, even if we copy as much as it is possible
// the leftover will exceed buffer size, so we issue two write calls
// Sample test to trigger the branch:
// BatchedInsertReWriteEnabledTest.test32767Binds
flushBuffer();
out.write(b, off, len);
return;
}
int prefixLength = Math.min(len, avail);
System.arraycopy(b, off, buf, count, prefixLength);
count += prefixLength;
off += prefixLength;
len -= prefixLength;
if (count == buf.length) {
flushBuffer();
}
if (len == 0) {
return;
}
}

// At this point, the buffer is empty
if (len >= buf.length) {
// Write big chunk
// Sample tests to trigger the branch:
// LargeObjectManagerTest.objectWriteThenRead,
// BlobTransactionTest.concurrentReplace
out.write(b, off, len);
return;
}
// Buffer small one
System.arraycopy(b, off, buf, 0, len);
count = len;
}
}
Loading

0 comments on commit 130457f

Please sign in to comment.