Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: introduce tuple abstraction (rebased) #1701

Merged
merged 4 commits into from Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/BaseStatement.java
Expand Up @@ -24,7 +24,7 @@ public interface BaseStatement extends PGStatement, Statement {
* @return the new ResultSet
* @throws SQLException if something goes wrong
*/
ResultSet createDriverResultSet(Field[] fields, List<byte[][]> tuples) throws SQLException;
ResultSet createDriverResultSet(Field[] fields, List<Tuple> tuples) throws SQLException;

/**
* Create a resultset from data retrieved from the server.
Expand All @@ -38,7 +38,7 @@ public interface BaseStatement extends PGStatement, Statement {
* @return the new ResultSet
* @throws SQLException if something goes wrong
*/
ResultSet createResultSet(Query originalQuery, Field[] fields, List<byte[][]> tuples,
ResultSet createResultSet(Query originalQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) throws SQLException;

/**
Expand Down
4 changes: 2 additions & 2 deletions pgjdbc/src/main/java/org/postgresql/core/PGStream.java
Expand Up @@ -472,7 +472,7 @@ public String receiveString() throws IOException {
* @throws IOException if a data I/O error occurs
* @throws SQLException if read more bytes than set maxResultBuffer
*/
public byte[][] receiveTupleV3() throws IOException, OutOfMemoryError, SQLException {
public Tuple receiveTupleV3() throws IOException, OutOfMemoryError, SQLException {
int messageSize = receiveInteger4(); // MESSAGE SIZE
int nf = receiveInteger2();
//size = messageSize - 4 bytes of message size - 2 bytes of field count - 4 bytes for each column length
Expand All @@ -498,7 +498,7 @@ public byte[][] receiveTupleV3() throws IOException, OutOfMemoryError, SQLExcept
throw oom;
}

return answer;
return new Tuple(answer);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions pgjdbc/src/main/java/org/postgresql/core/ResultHandler.java
Expand Up @@ -36,8 +36,7 @@ public interface ResultHandler {
* @param cursor a cursor to use to fetch additional data; <code>null</code> if no further results
* are present.
*/
void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
ResultCursor cursor);
void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples, ResultCursor cursor);

/**
* Called when a query that did not return a resultset completes.
Expand Down
Expand Up @@ -25,7 +25,7 @@ public class ResultHandlerBase implements ResultHandler {
private SQLWarning lastWarning;

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
}

Expand Down
Expand Up @@ -23,7 +23,7 @@ public ResultHandlerDelegate(ResultHandler delegate) {
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
if (delegate != null) {
delegate.handleResultRows(fromQuery, fields, tuples, cursor);
Expand Down
10 changes: 5 additions & 5 deletions pgjdbc/src/main/java/org/postgresql/core/SetupQueryRunner.java
Expand Up @@ -21,13 +21,13 @@
public class SetupQueryRunner {

private static class SimpleResultHandler extends ResultHandlerBase {
private List<byte[][]> tuples;
private List<Tuple> tuples;

List<byte[][]> getResults() {
List<Tuple> getResults() {
return tuples;
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
this.tuples = tuples;
}
Expand All @@ -38,7 +38,7 @@ public void handleWarning(SQLWarning warning) {
}
}

public static byte[][] run(QueryExecutor executor, String queryString,
public static Tuple run(QueryExecutor executor, String queryString,
boolean wantResults) throws SQLException {
Query query = executor.createSimpleQuery(queryString);
SimpleResultHandler handler = new SimpleResultHandler();
Expand All @@ -59,7 +59,7 @@ public static byte[][] run(QueryExecutor executor, String queryString,
return null;
}

List<byte[][]> tuples = handler.getResults();
List<Tuple> tuples = handler.getResults();
if (tuples == null || tuples.size() != 1) {
throw new PSQLException(GT.tr("An unexpected result was returned by a query."),
PSQLState.CONNECTION_UNABLE_TO_CONNECT);
Expand Down
100 changes: 100 additions & 0 deletions pgjdbc/src/main/java/org/postgresql/core/Tuple.java
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2017, PostgreSQL Global Development Group
paplorinc marked this conversation as resolved.
Show resolved Hide resolved
* See the LICENSE file in the project root for more information.
*/

package org.postgresql.core;

/**
* Class representing a row in a {@link java.sql.ResultSet}.
*/
public class Tuple {
private final boolean forUpdate;
final byte[][] data;

/**
* Construct an empty tuple. Used in updatable result sets.
* @param length the number of fields in the tuple.
*/
public Tuple(int length) {
this(new byte[length][], true);
}

/**
* Construct a populated tuple. Used when returning results.
* @param data the tuple data
*/
public Tuple(byte[][] data) {
this(data, false);
}

private Tuple(byte[][] data, boolean forUpdate) {
this.data = data;
this.forUpdate = forUpdate;
}

/**
* Number of fields in the tuple
* @return number of fields
*/
public int fieldCount() {
return data.length;
}

/**
* Total length in bytes of the tuple data.
* @return the number of bytes in this tuple
*/
public int length() {
paplorinc marked this conversation as resolved.
Show resolved Hide resolved
int length = 0;
for (byte[] field : data) {
if (field != null) {
length += field.length;
}
}
return length;
}

/**
* Get the data for the given field
* @param index 0-based field position in the tuple
* @return byte array of the data
*/
public byte[] get(int index) {
return data[index];
}

/**
* Create a copy of the tuple for updating.
* @return a copy of the tuple that allows updates
*/
public Tuple updateableCopy() {
return copy(true);
}

/**
* Create a read-only copy of the tuple
* @return a copy of the tuple that does not allow updates
*/
public Tuple readOnlyCopy() {
return copy(false);
}

private Tuple copy(boolean forUpdate) {
byte[][] dataCopy = new byte[data.length][];
System.arraycopy(data, 0, dataCopy, 0, data.length);
return new Tuple(dataCopy, forUpdate);
}

/**
* Set the given field to the given data.
* @param index 0-based field position
* @param fieldData the data to set
*/
public void set(int index, byte[] fieldData) {
if (!forUpdate) {
throw new IllegalArgumentException("Attempted to write to readonly tuple");
}
data[index] = fieldData;
}
}
Expand Up @@ -13,6 +13,7 @@
import org.postgresql.core.ServerVersion;
import org.postgresql.core.SetupQueryRunner;
import org.postgresql.core.SocketFactoryFactory;
import org.postgresql.core.Tuple;
import org.postgresql.core.Utils;
import org.postgresql.core.Version;
import org.postgresql.hostchooser.CandidateHost;
Expand Down Expand Up @@ -753,8 +754,8 @@ private void runInitialQueries(QueryExecutor queryExecutor, Properties info)
}

private boolean isMaster(QueryExecutor queryExecutor) throws SQLException, IOException {
byte[][] results = SetupQueryRunner.run(queryExecutor, "show transaction_read_only", true);
String value = queryExecutor.getEncoding().decode(results[0]);
Tuple results = SetupQueryRunner.run(queryExecutor, "show transaction_read_only", true);
String value = queryExecutor.getEncoding().decode(results.get(0));
return value.equalsIgnoreCase("off");
}
}
27 changes: 11 additions & 16 deletions pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java
Expand Up @@ -31,6 +31,7 @@
import org.postgresql.core.SqlCommand;
import org.postgresql.core.SqlCommandType;
import org.postgresql.core.TransactionState;
import org.postgresql.core.Tuple;
import org.postgresql.core.Utils;
import org.postgresql.core.v3.replication.V3ReplicationProtocol;
import org.postgresql.jdbc.AutoSave;
Expand Down Expand Up @@ -555,7 +556,7 @@ private ResultHandler sendQueryPreamble(final ResultHandler delegateHandler, int
return new ResultHandlerDelegate(delegateHandler) {
private boolean sawBegin = false;

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
if (sawBegin) {
super.handleResultRows(fromQuery, fields, tuples, cursor);
Expand Down Expand Up @@ -1999,7 +2000,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
boolean noResults = (flags & QueryExecutor.QUERY_NO_RESULTS) != 0;
boolean bothRowsAndStatus = (flags & QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS) != 0;

List<byte[][]> tuples = null;
List<Tuple> tuples = null;

int c;
boolean endQuery = false;
Expand Down Expand Up @@ -2093,7 +2094,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
Field[] fields = currentQuery.getFields();

if (fields != null) { // There was a resultset.
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();
handler.handleResultRows(currentQuery, fields, tuples, null);
tuples = null;
}
Expand All @@ -2115,7 +2116,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (fields != null && tuples == null) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<byte[][]>();
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<Tuple>();
}

handler.handleResultRows(currentQuery, fields, tuples, currentPortal);
Expand Down Expand Up @@ -2166,7 +2167,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (fields != null && tuples == null) {
// When no results expected, pretend an empty resultset was returned
// Not sure if new ArrayList can be always replaced with emptyList
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<byte[][]>();
tuples = noResults ? Collections.<byte[][]>emptyList() : new ArrayList<Tuple>();
}

// If we received tuples we must know the structure of the
Expand Down Expand Up @@ -2203,7 +2204,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
}

case 'D': // Data Transfer (ongoing Execute response)
byte[][] tuple = null;
Tuple tuple = null;
try {
tuple = pgStream.receiveTupleV3();
} catch (OutOfMemoryError oome) {
Expand All @@ -2217,7 +2218,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
}
if (!noResults) {
if (tuples == null) {
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();
}
tuples.add(tuple);
}
Expand All @@ -2227,13 +2228,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti
if (tuple == null) {
length = -1;
} else {
length = 0;
for (byte[] aTuple : tuple) {
if (aTuple == null) {
continue;
}
length += aTuple.length;
}
length = tuple.length();
}
LOGGER.log(Level.FINEST, " <=BE DataRow(len={0})", length);
}
Expand Down Expand Up @@ -2287,7 +2282,7 @@ protected void processResults(ResultHandler handler, int flags) throws IOExcepti

case 'T': // Row Description (response to Describe)
Field[] fields = receiveFields();
tuples = new ArrayList<byte[][]>();
tuples = new ArrayList<Tuple>();

SimpleQuery query = pendingDescribePortalQueue.peekFirst();
if (!pendingExecuteQueue.isEmpty() && !pendingExecuteQueue.peekFirst().asSimple) {
Expand Down Expand Up @@ -2422,7 +2417,7 @@ public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int f
handler = new ResultHandlerDelegate(delegateHandler) {
@Override
public void handleCommandStatus(String status, long updateCount, long insertOID) {
handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null);
handleResultRows(portal.getQuery(), null, new ArrayList<Tuple>(), null);
}
};

Expand Down
13 changes: 7 additions & 6 deletions pgjdbc/src/main/java/org/postgresql/jdbc/BatchResultHandler.java
Expand Up @@ -10,6 +10,7 @@
import org.postgresql.core.Query;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.ResultHandlerBase;
import org.postgresql.core.Tuple;
import org.postgresql.core.v3.BatchedQuery;
import org.postgresql.util.GT;
import org.postgresql.util.PSQLException;
Expand Down Expand Up @@ -38,8 +39,8 @@ public class BatchResultHandler extends ResultHandlerBase {
private final boolean expectGeneratedKeys;
private PgResultSet generatedKeys;
private int committedRows; // 0 means no rows committed. 1 means row 0 was committed, and so on
private final List<List<byte[][]>> allGeneratedRows;
private List<byte[][]> latestGeneratedRows;
private final List<List<Tuple>> allGeneratedRows;
private List<Tuple> latestGeneratedRows;
private PgResultSet latestGeneratedKeysRs;

BatchResultHandler(PgStatement pgStatement, Query[] queries, ParameterList[] parameterLists,
Expand All @@ -49,11 +50,11 @@ public class BatchResultHandler extends ResultHandlerBase {
this.parameterLists = parameterLists;
this.longUpdateCounts = new long[queries.length];
this.expectGeneratedKeys = expectGeneratedKeys;
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<byte[][]>>();
this.allGeneratedRows = !expectGeneratedKeys ? null : new ArrayList<List<Tuple>>();
}

@Override
public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples,
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples,
ResultCursor cursor) {
// If SELECT, then handleCommandStatus call would just be missing
resultIndex++;
Expand All @@ -66,7 +67,7 @@ public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tup
// If SELECT, the resulting ResultSet is not valid
// Thus it is up to handleCommandStatus to decide if resultSet is good enough
latestGeneratedKeysRs = (PgResultSet) pgStatement.createResultSet(fromQuery, fields,
new ArrayList<byte[][]>(), cursor);
new ArrayList<Tuple>(), cursor);
} catch (SQLException e) {
handleError(e);
}
Expand Down Expand Up @@ -121,7 +122,7 @@ private void updateGeneratedKeys() {
if (allGeneratedRows == null || allGeneratedRows.isEmpty()) {
return;
}
for (List<byte[][]> rows : allGeneratedRows) {
for (List<Tuple> rows : allGeneratedRows) {
generatedKeys.addRows(rows);
}
allGeneratedRows.clear();
Expand Down
Expand Up @@ -9,6 +9,7 @@
import org.postgresql.core.ParameterList;
import org.postgresql.core.Query;
import org.postgresql.core.ResultCursor;
import org.postgresql.core.Tuple;

import java.util.List;

Expand All @@ -17,7 +18,7 @@ class CallableBatchResultHandler extends BatchResultHandler {
super(statement, queries, parameterLists, false);
}

public void handleResultRows(Query fromQuery, Field[] fields, List<byte[][]> tuples, ResultCursor cursor) {
public void handleResultRows(Query fromQuery, Field[] fields, List<Tuple> tuples, ResultCursor cursor) {
/* ignore */
}
}