Permalink
Browse files

feat: add support for PreparedStatement.setCharacterStream(int, Reader)

In pgjdbc 9.4 (1211) and earlier, the PreparedStatement method
setCharacterStream(int, Reader) throws a not implemented
exception. The setCharacterStream(int, Reader, int) method does not
throw an exception, but it materializes the Reader as a String and
then uses the same code path as setString(int, String). Materializing
the readers can cause memory usage issues (OutOfMemoryError) when
working with large text values.
This patch implements the setCharacterStream(int, Reader) method
using in memory buffers for small streams and temp files for large
streams. To do so, it uses code added in pull request #220 that was
used to implement setBinaryStream(int, InputStream). This patch
adds a utility class to convert the Reader's UTF-16 character stream
into a UTF-8 binary stream.
The setCharacterStream(int, Reader) method implemented in this patch
supports both simple and extended query modes. However, in simple
mode, it materializes the reader as a String just like
setCharacterStream(int, Reader, int).

fixes #671
  • Loading branch information...
vlsi committed Nov 24, 2016
1 parent 62e25fb commit ee4c4265aebc1c73a1d1fabac5ba259d1fbfd1e4
@@ -124,6 +124,18 @@
*/
void setBytea(int index, InputStream stream) throws SQLException;

/**
* Binds a text value stored as an InputStream that is a valid UTF-8 byte stream.
* Any byte-order marks (BOM) in the stream are passed to the backend.
* The parameter's type is implicitly set to 'text'.
* The stream should remain valid until query execution has completed.
*
* @param index the 1-based parameter index to bind.
* @param stream a stream containing the parameter data.
* @throws SQLException on error or if <code>index</code> is out of range
*/
void setText(int index, InputStream stream) throws SQLException;

/**
* Binds given byte[] value to a parameter. The bytes must already be in correct format matching
* the OID.
@@ -107,6 +107,11 @@ public void setBytea(int index, InputStream stream) throws SQLException {
subparams[sub].setBytea(index - offsets[sub], stream);
}

public void setText(int index, InputStream stream) throws SQLException {
int sub = findSubParam(index);
subparams[sub].setText(index - offsets[sub], stream);
}

public void setNull(int index, int oid) throws SQLException {
int sub = findSubParam(index);
subparams[sub].setNull(index - offsets[sub], oid);
@@ -147,6 +147,11 @@ public void setBytea(int index, InputStream stream) throws SQLException {
bind(index, new StreamWrapper(stream), Oid.BYTEA, BINARY);
}

@Override
public void setText(int index, InputStream stream) throws SQLException {
bind(index, new StreamWrapper(stream), Oid.TEXT, TEXT);
}

@Override
public void setNull(int index, int oid) throws SQLException {

@@ -25,6 +25,7 @@
import org.postgresql.util.PGobject;
import org.postgresql.util.PSQLException;
import org.postgresql.util.PSQLState;
import org.postgresql.util.ReaderInputStream;

import java.io.IOException;
import java.io.InputStream;
@@ -568,9 +569,15 @@ public void setObject(int parameterIndex, Object in, int targetSqlType, int scal
setString(parameterIndex, castToString(in), Oid.BPCHAR);
break;
case Types.VARCHAR:
case Types.LONGVARCHAR:
setString(parameterIndex, castToString(in), getStringType());
break;
case Types.LONGVARCHAR:
if (in instanceof InputStream) {
preparedParameters.setText(parameterIndex, (InputStream)in);
} else {
setString(parameterIndex, castToString(in), getStringType());
}
break;
case Types.DATE:
if (in instanceof java.sql.Date) {
setDate(parameterIndex, (java.sql.Date) in);
@@ -1192,6 +1199,24 @@ public void setBlob(int i, Blob x) throws SQLException {
}
}

private String readerToString(Reader value, int maxLength) throws SQLException {
try {
int bufferSize = Math.min(maxLength, 1024);
StringBuilder v = new StringBuilder(bufferSize);
char[] buf = new char[bufferSize];
int nRead = 0;
while (nRead > -1 && v.length() < maxLength) {
nRead = value.read(buf, 0, Math.min(bufferSize, maxLength - v.length()));
if (nRead > 0) {
v.append(buf, 0, nRead);
}
}
return v.toString();
} catch (IOException ioe) {
throw new PSQLException(GT.tr("Provided Reader failed."), PSQLState.UNEXPECTED_ERROR, ioe);
}
}

public void setCharacterStream(int i, java.io.Reader x, int length) throws SQLException {
checkClosed();

@@ -1211,26 +1236,7 @@ public void setCharacterStream(int i, java.io.Reader x, int length) throws SQLEx
// long varchar datatype, but with toast all the text datatypes are capable of
// handling very large values. Thus the implementation ends up calling
// setString() since there is no current way to stream the value to the server
char[] l_chars = new char[length];
int l_charsRead = 0;
try {
while (true) {
int n = x.read(l_chars, l_charsRead, length - l_charsRead);
if (n == -1) {
break;
}

l_charsRead += n;

if (l_charsRead == length) {
break;
}
}
} catch (IOException l_ioe) {
throw new PSQLException(GT.tr("Provided Reader failed."), PSQLState.UNEXPECTED_ERROR,
l_ioe);
}
setString(i, new String(l_chars, 0, l_charsRead));
setString(i, readerToString(x, length));
}

public void setClob(int i, Clob x) throws SQLException {
@@ -1466,7 +1472,13 @@ public void setCharacterStream(int parameterIndex, Reader value, long length)
}

public void setCharacterStream(int parameterIndex, Reader value) throws SQLException {
throw Driver.notImplemented(this.getClass(), "setCharacterStream(int, Reader)");
if (connection.getPreferQueryMode() == PreferQueryMode.SIMPLE) {
String s = (value != null) ? readerToString(value, Integer.MAX_VALUE) : null;
setString(parameterIndex, s);
return;
}
InputStream is = (value != null) ? new ReaderInputStream(value) : null;
setObject(parameterIndex, is, Types.LONGVARCHAR);
}

public void setBinaryStream(int parameterIndex, InputStream value, long length)
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2016, PostgreSQL Global Development Group
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CoderResult;

/**
* ReaderInputStream accepts a UTF-16 char stream (Reader) as input and
* converts it to a UTF-8 byte stream (InputStream) as output.
*
* This is the inverse of java.io.InputStreamReader which converts a
* binary stream to a character stream.
*/
public class ReaderInputStream extends InputStream {
private static final int DEFAULT_CHAR_BUFFER_SIZE = 8 * 1024;

private static final Charset UTF_8 = Charset.forName("UTF-8");

private final Reader reader;
private final CharsetEncoder encoder;
private final ByteBuffer bbuf;
private final CharBuffer cbuf;

/**
* true when all of the characters have been read from the reader into inbuf
*/
private boolean endOfInput;
private final byte[] oneByte = new byte[1];

public ReaderInputStream(Reader reader) {
this(reader, DEFAULT_CHAR_BUFFER_SIZE);
}

/**
* Allow ReaderInputStreamTest to use small buffers to force UTF-16
* surrogate pairs to cross buffer boundaries in interesting ways.
* Because this constructor is package-private, the unit test must be in
* the same package.
*/
ReaderInputStream(Reader reader, int charBufferSize) {
if (reader == null) {
throw new IllegalArgumentException("reader cannot be null");
}

// The standard UTF-8 encoder will only encode a UTF-16 surrogate pair
// when both surrogates are available in the CharBuffer.
if (charBufferSize < 2) {
throw new IllegalArgumentException("charBufferSize must be at least 2 chars");
}

this.reader = reader;
this.encoder = UTF_8.newEncoder();
// encoder.maxBytesPerChar() always returns 3.0 for UTF-8
this.bbuf = ByteBuffer.allocate(3 * charBufferSize);
this.bbuf.flip(); // prepare for subsequent write
this.cbuf = CharBuffer.allocate(charBufferSize);
this.cbuf.flip(); // prepare for subsequent write
}

private void advance() throws IOException {
assert !endOfInput;
assert !bbuf.hasRemaining()
: "advance() should be called when output byte buffer is empty. bbuf: " + bbuf + ", as string: " + bbuf.asCharBuffer().toString();
assert cbuf.remaining() < 2;

// given that bbuf.capacity = 3 x cbuf.capacity, the only time that we should have a
// remaining char is if the last char read was the 1st half of a surrogate pair
if (cbuf.remaining() == 0) {
cbuf.clear();
} else {
cbuf.compact();
}

int n = reader.read(cbuf); // read #1
cbuf.flip();

CoderResult result;

endOfInput = n == -1;

bbuf.clear();
result = encoder.encode(cbuf, bbuf, endOfInput);
checkEncodeResult(result);

if (endOfInput) {
result = encoder.flush(bbuf);
checkEncodeResult(result);
}

bbuf.flip();
}

private void checkEncodeResult(CoderResult result) throws CharacterCodingException {
if (result.isError()) {
result.throwException();
}
}

@Override
public int read() throws IOException {
int res = 0;
while (res != -1) {
res = read(oneByte);
if (res != 0) {
return oneByte[0];
}
}
return -1;
}

// The implementation of InputStream.read(byte[], int, int) silently ignores
// an IOException thrown by overrides of the read() method.
@Override
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if (off < 0 || len < 0 || len > b.length - off) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
if (endOfInput && !bbuf.hasRemaining()) {
return -1;
}

int totalRead = 0;
while (len > 0 && !endOfInput) {
if (bbuf.hasRemaining()) {
int remaining = Math.min(len, bbuf.remaining());
bbuf.get(b, off, remaining);
totalRead += remaining;
off += remaining;
len -= remaining;
if (len == 0) {
return totalRead;
}
}
advance();
}
return totalRead;
}

@Override
public void close() throws IOException {
endOfInput = true;
reader.close();
}
}
Oops, something went wrong.

0 comments on commit ee4c426

Please sign in to comment.