Skip to content

Commit

Permalink
feat: add caller push of binary data (rebase of #953) (#1659)
Browse files Browse the repository at this point in the history
* feat: add caller push of binary data

Allows a caller to push binary data to an output stream rather than
having to provide a byte array or input stream. This gives the caller
more control over buffering strategy and allows explicit cleanup of
off-heap buffers or other non-garbage-collected resources.

* feat: Introduce ByteStreamTarget

* Fix rename in PGStream

* Change copyright dates of added files to 2020

Co-authored-by: Tom Dunstan <tomdcc@users.noreply.github.com>
  • Loading branch information
2 people authored and davecramer committed Dec 26, 2019
1 parent a44ab4b commit db228a4
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 0 deletions.
@@ -0,0 +1,52 @@
/*
* Copyright (c) 2020, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.core;

import java.io.IOException;
import java.io.OutputStream;

/**
* A stream that refuses to write more than a maximum number of bytes.
*/
public class FixedLengthOutputStream extends OutputStream {

private final int size;
private final OutputStream target;
private int written;

public FixedLengthOutputStream(int size, OutputStream target) {
this.size = size;
this.target = target;
}

@Override
public void write(int b) throws IOException {
verifyAllowed(1);
written++;
target.write(b);
}

public void write(byte[] buf, int offset, int len) throws IOException {
if ((offset < 0) || (len < 0) || ((offset + len) > buf.length)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
verifyAllowed(len);
target.write(buf, offset, len);
written += len;
}

public int remaining() {
return size - written;
}

private void verifyAllowed(int wanted) throws IOException {
if (remaining() < wanted) {
throw new IOException("Attempt to write more than the specified " + size + " bytes");
}
}
}
27 changes: 27 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Expand Up @@ -5,6 +5,7 @@

package org.postgresql.core;

import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.HostSpec;
import org.postgresql.util.PSQLException;
Expand Down Expand Up @@ -311,6 +312,32 @@ public void send(byte[] buf, int off, int siz) throws IOException {
}
}

/**
* Send a fixed-size array of bytes to the backend. If {@code length < siz}, pad with zeros. If
* {@code length > siz}, truncate the array.
*
* @param writer the stream writer to invoke to send the bytes
* @throws IOException if an I/O error occurs
*/
public void send(ByteStreamWriter writer) throws IOException {
final FixedLengthOutputStream fixedLengthStream = new FixedLengthOutputStream(writer.getLength(), pgOutput);
try {
writer.writeTo(new ByteStreamWriter.ByteStreamTarget() {
@Override
public OutputStream getOutputStream() {
return fixedLengthStream;
}
});
} catch (IOException ioe) {
throw ioe;
} catch (Exception re) {
throw new IOException("Error writing bytes to stream", re);
}
for (int i = fixedLengthStream.remaining(); i > 0; i--) {
pgOutput.write(0);
}
}

/**
* Receives a single character from the backend, without advancing the current protocol stream
* position.
Expand Down
12 changes: 12 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/ParameterList.java
Expand Up @@ -6,6 +6,8 @@

package org.postgresql.core;

import org.postgresql.util.ByteStreamWriter;

import java.io.InputStream;
import java.sql.SQLException;

Expand Down Expand Up @@ -122,6 +124,16 @@ public interface ParameterList {
*/
void setBytea(int index, InputStream stream) throws SQLException;

/**
* Binds a binary bytea value stored as a ByteStreamWriter. The parameter's type is implicitly set to
* 'bytea'. The stream should remain valid until query execution has completed.
*
* @param index the 1-based parameter index to bind.
* @param writer a writer that can write the bytes for the parameter
* @throws SQLException on error or if <code>index</code> is out of range
*/
void setBytea(int index, ByteStreamWriter writer) throws SQLException;

/**
* Binds a text value stored as an InputStream that is a valid UTF-8 byte stream.
* Any byte-order marks (BOM) in the stream are passed to the backend.
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.postgresql.core.v3;

import org.postgresql.core.ParameterList;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -107,6 +108,11 @@ public void setBytea(int index, InputStream stream) throws SQLException {
subparams[sub].setBytea(index - offsets[sub], stream);
}

public void setBytea(int index, ByteStreamWriter writer) throws SQLException {
int sub = findSubParam(index);
subparams[sub].setBytea(index - offsets[sub], writer);
}

public void setText(int index, InputStream stream) throws SQLException {
int sub = findSubParam(index);
subparams[sub].setText(index - offsets[sub], stream);
Expand Down
Expand Up @@ -14,6 +14,7 @@
import org.postgresql.geometric.PGpoint;
import org.postgresql.jdbc.UUIDArrayAssistant;
import org.postgresql.util.ByteConverter;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
Expand Down Expand Up @@ -146,6 +147,11 @@ public void setBytea(int index, InputStream stream) throws SQLException {
bind(index, new StreamWrapper(stream), Oid.BYTEA, BINARY);
}

@Override
public void setBytea(int index, ByteStreamWriter writer) throws SQLException {
bind(index, writer, Oid.BYTEA, BINARY);
}

@Override
public void setText(int index, InputStream stream) throws SQLException {
bind(index, new StreamWrapper(stream), Oid.TEXT, TEXT);
Expand Down Expand Up @@ -291,6 +297,14 @@ private static void streamBytea(PGStream pgStream, StreamWrapper wrapper) throws
pgStream.sendStream(wrapper.getStream(), wrapper.getLength());
}

//
// byte stream writer support
//

private static void streamBytea(PGStream pgStream, ByteStreamWriter writer) throws IOException {
pgStream.send(writer);
}

public int[] getTypeOIDs() {
return paramTypes;
}
Expand Down Expand Up @@ -352,6 +366,11 @@ int getV3Length(int index) {
return ((StreamWrapper) paramValues[index]).getLength();
}

// Binary-format bytea?
if (paramValues[index] instanceof ByteStreamWriter) {
return ((ByteStreamWriter) paramValues[index]).getLength();
}

// Already encoded?
if (encoded[index] == null) {
// Encode value and compute actual length using UTF-8.
Expand Down Expand Up @@ -381,6 +400,12 @@ void writeV3Value(int index, PGStream pgStream) throws IOException {
return;
}

// Streamed bytea?
if (paramValues[index] instanceof ByteStreamWriter) {
streamBytea(pgStream, (ByteStreamWriter) paramValues[index]);
return;
}

// Encoded string.
if (encoded[index] == null) {
encoded[index] = Utils.encodeUTF8((String) paramValues[index]);
Expand Down
Expand Up @@ -18,6 +18,7 @@
import org.postgresql.largeobject.LargeObject;
import org.postgresql.largeobject.LargeObjectManager;
import org.postgresql.util.ByteConverter;
import org.postgresql.util.ByteStreamWriter;
import org.postgresql.util.GT;
import org.postgresql.util.HStoreConverter;
import org.postgresql.util.PGBinaryObject;
Expand Down Expand Up @@ -363,6 +364,10 @@ public void setBytes(int parameterIndex, byte[] x) throws SQLException {
preparedParameters.setBytea(parameterIndex, copy, 0, x.length);
}

private void setByteStreamWriter(int parameterIndex, ByteStreamWriter x) throws SQLException {
preparedParameters.setBytea(parameterIndex, x);
}

public void setDate(int parameterIndex, java.sql.Date x) throws SQLException {
setDate(parameterIndex, x, null);
}
Expand Down Expand Up @@ -930,6 +935,8 @@ public void setObject(int parameterIndex, Object x) throws SQLException {
setDouble(parameterIndex, (Double) x);
} else if (x instanceof byte[]) {
setBytes(parameterIndex, (byte[]) x);
} else if (x instanceof ByteStreamWriter) {
setByteStreamWriter(parameterIndex, (ByteStreamWriter) x);
} else if (x instanceof java.sql.Date) {
setDate(parameterIndex, (java.sql.Date) x);
} else if (x instanceof Time) {
Expand Down
@@ -0,0 +1,48 @@
/*
* Copyright (c) 2020, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.util;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;

/**
* A {@link ByteStreamWriter} that writes a {@link ByteBuffer java.nio.ByteBuffer} to a byte array
* parameter.
*/
public class ByteBufferByteStreamWriter implements ByteStreamWriter {

private final ByteBuffer buf;
private final int length;

/**
* Construct the writer with the given {@link ByteBuffer}
*
* @param buf the buffer to use.
*/
public ByteBufferByteStreamWriter(ByteBuffer buf) {
this.buf = buf;
this.length = buf.remaining();
}

@Override
public int getLength() {
return length;
}

@Override
public void writeTo(ByteStreamTarget target) throws IOException {
// this _does_ involve some copying to a temporary buffer, but that's unavoidable
// as OutputStream itself only accepts single bytes or heap allocated byte arrays
WritableByteChannel c = Channels.newChannel(target.getOutputStream());
try {
c.write(buf);
} finally {
c.close();
}
}
}
66 changes: 66 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/util/ByteStreamWriter.java
@@ -0,0 +1,66 @@
/*
* Copyright (c) 2020, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* A class that can be used to set a byte array parameter by writing to an OutputStream.
*
* <p>The intended use case is wanting to write data to a byte array parameter that is stored off
* heap in a direct memory pool or in some other form that is inconvenient to assemble into a single
* heap-allocated buffer.</p>
* <p> Users should write their own implementation depending on the
* original data source. The driver provides a built-in implementation supporting the {@link
* java.nio.ByteBuffer} class, see {@link ByteBufferByteStreamWriter}.</p>
* <p> Intended usage is to simply pass in an instance using
* {@link java.sql.PreparedStatement#setObject(int, Object)}:</p>
* <pre>
* int bufLength = someBufferObject.length();
* preparedStatement.setObject(1, new MyByteStreamWriter(bufLength, someBufferObject));
* </pre>
* <p>The length must be known ahead of the stream being written to. </p>
* <p>This provides the application more control over memory management than calling
* {@link java.sql.PreparedStatement#setBinaryStream(int, InputStream)} as with the latter the
* caller has no control over the buffering strategy. </p>
*/
public interface ByteStreamWriter {

/**
* Returns the length of the stream.
*
* <p> This must be known ahead of calling {@link #writeTo(ByteStreamTarget)}. </p>
*
* @return the number of bytes in the stream.
*/
int getLength();

/**
* Write the data to the provided {@link OutputStream}.
*
* <p> Should not write more than {@link #getLength()} bytes. If attempted, the provided stream
* will throw an {@link java.io.IOException}. </p>
*
* @param target the stream to write the data to
* @throws IOException if the underlying stream throws or there is some other error.
*/
void writeTo(ByteStreamTarget target) throws IOException;

/**
* Provides a target to write bytes to.
*/
interface ByteStreamTarget {

/**
* Provides an output stream to write bytes to.
*
* @return an output stream
*/
OutputStream getOutputStream();
}
}

0 comments on commit db228a4

Please sign in to comment.