Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ 418] ResultSet.last() isLast() afterLast() and isAfterLast() co…
…rrection when streaming
  • Loading branch information
rusher committed Jan 30, 2017
1 parent f5dc9ea commit 2f99de9
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 57 deletions.
Expand Up @@ -173,7 +173,7 @@ public MariaSelectResultSet(ColumnInformation[] columnInformation, Results resul
this.rowPointer = -1;
this.callableResult = callableResult;

if (fetchSize == 0 || resultSetScrollType != TYPE_FORWARD_ONLY || callableResult) {
if (fetchSize == 0 || callableResult) {
fetchAllResults();
streaming = false;
} else {
Expand Down Expand Up @@ -350,19 +350,37 @@ public void fetchRemaining() throws SQLException {
}

} catch (IOException ioexception) {
throw new QueryException("Could not close resultset : " + ioexception.getMessage(), -1, CONNECTION_EXCEPTION, ioexception);
throw new QueryException("Could not close resultSet : " + ioexception.getMessage(), -1, CONNECTION_EXCEPTION, ioexception);
}
} catch (QueryException queryException) {
ExceptionMapper.throwException(queryException, null, this.statement);
}

dataFetchTime++;
streaming = false;
}

/**
* This permit to replace current stream results by next ones.
*
* @throws IOException if socket exception occur
* @throws QueryException if server return an unexpected error
*/
private void nextStreamingValue() throws IOException, QueryException {

resultSet.clear();
//if resultSet can be back to some previous value
if (resultSetScrollType == TYPE_FORWARD_ONLY) resultSet.clear();

addStreamingValue();
}

/**
* This permit to add next streaming values to existing resultSet.
*
* @throws IOException if socket exception occur
* @throws QueryException if server return an unexpected error
*/
private void addStreamingValue() throws IOException, QueryException {

//fetch maximum fetchSize results
int fetchSizeTmp = fetchSize;
while (fetchSizeTmp > 0 && readNextValue(resultSet)) {
Expand All @@ -372,6 +390,7 @@ private void nextStreamingValue() throws IOException, QueryException {
this.resultSetSize = resultSet.size();
}


/**
* Read next value.
*
Expand Down Expand Up @@ -537,31 +556,28 @@ public boolean next() throws SQLException {
rowPointer++;
return true;
} else {
if (streaming) {
if (isEof) {
return false;
} else {
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
nextStreamingValue();
} catch (IOException ioe) {
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} catch (QueryException queryException) {
throw new SQLException(queryException);
} finally {
lock.unlock();
}
rowPointer = 0;
return resultSetSize > 0;
if (streaming && !isEof) {
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
nextStreamingValue();
} catch (IOException ioe) {
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} catch (QueryException queryException) {
throw new SQLException(queryException);
} finally {
lock.unlock();
}
} else {
rowPointer = resultSetSize;
return false;
rowPointer = 0;
return resultSetSize > 0;
}

//all data are reads and pointer is after last
rowPointer = resultSetSize;
return false;
}
}

Expand Down Expand Up @@ -616,7 +632,40 @@ public boolean isBeforeFirst() throws SQLException {
@Override
public boolean isAfterLast() throws SQLException {
checkClose();
return dataFetchTime > 0 && rowPointer >= resultSetSize && resultSetSize > 0;
if (rowPointer < resultSetSize) {

//has remaining results
return false;

} else {

if (streaming && !isEof) {

//has to read more result to know if it's finished or not
//(next packet may be new data or an EOF packet indicating that there is no more data)
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
nextStreamingValue();
} catch (IOException ioe) {
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} catch (QueryException queryException) {
throw new SQLException(queryException);
} finally {
lock.unlock();
}
rowPointer = 0;
return resultSetSize == 0;
}

//has read all data and pointer is after last result
//so result would have to always to be true,
//but when result contain no row at all jdbc say that must return false
return resultSetSize > 0 || dataFetchTime > 1;
}
}

@Override
Expand All @@ -628,29 +677,42 @@ public boolean isFirst() throws SQLException {
@Override
public boolean isLast() throws SQLException {
checkClose();
if (dataFetchTime > 0 && isEof) {
if (rowPointer < resultSetSize - 1) {
return false;
} else if (isEof) {
return rowPointer == resultSetSize - 1 && resultSetSize > 0;
} else if (streaming) {
} else {
//when streaming and not having read all results,
//must read next packet to know if next packet is an EOF packet or some additional data
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
nextStreamingValue();
addStreamingValue();
} catch (IOException ioe) {
throw new SQLException(ioe);
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} catch (QueryException queryException) {
throw new SQLException(queryException);
} finally {
lock.unlock();
}
return rowPointer == resultSetSize - 1 && resultSetSize > 0;

if (isEof) {
//now driver is sure when data ends.
return rowPointer == resultSetSize - 1 && resultSetSize > 0;
}

//There is data remaining
return false;
}
return false;
}

@Override
public void beforeFirst() throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
if (resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
rowPointer = -1;
Expand All @@ -660,17 +722,28 @@ public void beforeFirst() throws SQLException {
@Override
public void afterLast() throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
rowPointer = resultSetSize;
if (!isEof) {
//load remaining results
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
fetchRemaining();
} catch (SQLException ioe) {
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} finally {
lock.unlock();
}
}
rowPointer = resultSetSize;
}

@Override
public boolean first() throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
if (resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
rowPointer = 0;
Expand All @@ -681,12 +754,23 @@ public boolean first() throws SQLException {
@Override
public boolean last() throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
rowPointer = resultSetSize - 1;
return rowPointer > 0;
if (!isEof) {
//load remaining results
ReentrantLock lock = protocol.getLock();
lock.lock();
try {
fetchRemaining();
} catch (SQLException ioe) {
throw new SQLException("Server has closed the connection. If result set contain huge amount of data, Server expects client to"
+ " read off the result set relatively fast. "
+ "In this case, please consider increasing net_wait_timeout session variable."
+ " / processing your result set faster (check Streaming result sets documentation for more information)", ioe);
} finally {
lock.unlock();
}
}
rowPointer = resultSetSize - 1;
return rowPointer > 0;
}

@Override
Expand All @@ -701,7 +785,7 @@ public int getRow() throws SQLException {
@Override
public boolean absolute(int row) throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
if (resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
if (row >= 0 && row <= resultSetSize) {
Expand All @@ -717,7 +801,7 @@ public boolean absolute(int row) throws SQLException {
@Override
public boolean relative(int rows) throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
if (resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
int newPos = rowPointer + rows;
Expand All @@ -732,7 +816,7 @@ public boolean relative(int rows) throws SQLException {
@Override
public boolean previous() throws SQLException {
checkClose();
if (streaming && resultSetScrollType == TYPE_FORWARD_ONLY) {
if (resultSetScrollType == TYPE_FORWARD_ONLY) {
throw new SQLException("Invalid operation for result set type TYPE_FORWARD_ONLY");
} else {
if (rowPointer > -1) {
Expand Down Expand Up @@ -763,6 +847,7 @@ public int getFetchSize() throws SQLException {
@Override
public void setFetchSize(int fetchSize) throws SQLException {
if (streaming && this.fetchSize == 0) {

try {
while (readNextValue(resultSet)) {
//fetch all results
Expand All @@ -773,8 +858,8 @@ public void setFetchSize(int fetchSize) throws SQLException {
throw new SQLException(queryException);
}

streaming = dataFetchTime == 1;
dataFetchTime++;
streaming = false;

}
this.fetchSize = fetchSize;
Expand All @@ -792,7 +877,7 @@ public int getConcurrency() throws SQLException {

private void checkClose() throws SQLException {
if (isClosed) {
throw new SQLException("Operation not permit on a closed resultset", "HY000");
throw new SQLException("Operation not permit on a closed resultSet", "HY000");
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/mariadb/jdbc/DriverTest.java
Expand Up @@ -515,8 +515,8 @@ public void testPreparedStatementsWithQuotes() throws SQLException {
@Test
public void testResultSetPositions() throws SQLException {
sharedConnection.createStatement().execute("insert into ressetpos values (1),(2),(3),(4)");

ResultSet rs = sharedConnection.createStatement().executeQuery("select * from ressetpos");
Statement stmt = sharedConnection.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = stmt.executeQuery("select * from ressetpos");
assertTrue(rs.isBeforeFirst());
rs.next();
assertTrue(!rs.isBeforeFirst());
Expand Down

0 comments on commit 2f99de9

Please sign in to comment.