Skip to content

Commit

Permalink
Merge pull request #595 from rene-ye/blobStream
Browse files Browse the repository at this point in the history
Fix to the persistence of blobs, and a small adjustment on how the blob retrieves its length when streaming.
  • Loading branch information
rene-ye authored Mar 6, 2018
2 parents 923a6f4 + 152e71b commit db0f41a
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerBlob.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,23 @@ public byte[] getBytes(long pos,
*/
public long length() throws SQLException {
checkClosed();
if (value == null && activeStreams.get(0) instanceof PLPInputStream) {
return (long)((PLPInputStream)activeStreams.get(0)).payloadLength;
}
getBytesFromStream();
return value.length;
}

/**
* Function for the result set to maintain blobs it has created
* @throws SQLException
*/
void fillByteArray() throws SQLException {
if(!isClosed) {
getBytesFromStream();
}
}

/**
* Converts stream to byte[]
* @throws SQLServerException
Expand Down
28 changes: 28 additions & 0 deletions src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ final void setCurrentRowType(RowType rowType) {
* occurs
*/
private Closeable activeStream;
private Blob activeBlob;

/**
* A window of fetchSize quickly accessible rows for scrollable result sets
Expand Down Expand Up @@ -669,6 +670,7 @@ final Column getColumn(int columnIndex) throws SQLServerException {
// before moving to another one.
if (null != activeStream) {
try {
fillBlobs();
activeStream.close();
}
catch (IOException e) {
Expand Down Expand Up @@ -747,6 +749,7 @@ private Column loadColumn(int index) throws SQLServerException {
/* ----------------- JDBC API methods ------------------ */

private void moverInit() throws SQLServerException {
fillBlobs();
cancelInsert();
cancelUpdates();
}
Expand Down Expand Up @@ -1035,6 +1038,7 @@ public boolean next() throws SQLServerException {
public boolean wasNull() throws SQLServerException {
loggerExternal.entering(getClassNameLogging(), "wasNull");
checkClosed();
fillBlobs();
loggerExternal.exiting(getClassNameLogging(), "wasNull", lastValueWasNull);
return lastValueWasNull;
}
Expand Down Expand Up @@ -1892,6 +1896,7 @@ Column getterGetColumn(int index) throws SQLServerException {
if (logger.isLoggable(java.util.logging.Level.FINER))
logger.finer(toString() + " Getting Column:" + index);

fillBlobs();
return loadColumn(index);
}

Expand Down Expand Up @@ -2657,6 +2662,7 @@ public Blob getBlob(int i) throws SQLServerException {
checkClosed();
Blob value = (Blob) getValue(i, JDBCType.BLOB);
loggerExternal.exiting(getClassNameLogging(), "getBlob", value);
activeBlob = value;
return value;
}

Expand All @@ -2665,6 +2671,7 @@ public Blob getBlob(String colName) throws SQLServerException {
checkClosed();
Blob value = (Blob) getValue(findColumn(colName), JDBCType.BLOB);
loggerExternal.exiting(getClassNameLogging(), "getBlob", value);
activeBlob = value;
return value;
}

Expand Down Expand Up @@ -6507,6 +6514,24 @@ final void doServerFetch(int fetchType,
scrollWindow.reset();
}
}

/*
* Iterates through the list of objects which rely on the stream that's about to be closed, filling them with their data
* Will skip over closed blobs, implemented in SQLServerBlob
*/
private void fillBlobs() {
if (null != activeBlob && activeBlob instanceof SQLServerBlob) {
try {
((SQLServerBlob)activeBlob).fillByteArray();
} catch (SQLException e) {
if (logger.isLoggable(java.util.logging.Level.FINER)) {
logger.finer(toString() + "Filling blobs before closing: " + e.getMessage());
}
} finally {
activeBlob = null;
}
}
}

/**
* Discards the contents of the current fetch buffer.
Expand All @@ -6519,6 +6544,9 @@ final void doServerFetch(int fetchType,
* fetch buffer is considered to be discarded.
*/
private void discardFetchBuffer() {
//fills blobs before discarding anything
fillBlobs();

// Clear the TDSReader mark at the start of the fetch buffer
fetchBuffer.clearStartMark();

Expand Down
139 changes: 139 additions & 0 deletions src/test/java/com/microsoft/sqlserver/jdbc/unit/lobs/lobsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,59 @@ else if (lobClass == Blob.class)
}
}

@Test
@DisplayName("testFreedBlobs")
private void testFreedBlobs(Class lobClass,
boolean isResultSet) throws SQLException {
String types[] = {"varbinary(max)"};
try {
table = createTable(table, types, false); // create empty table
int size = 10000;

byte[] data = new byte[size];
ThreadLocalRandom.current().nextBytes(data);

Blob blob = null;
InputStream stream = null;
for (int i = 0; i < 5; i++)
{
PreparedStatement ps = conn.prepareStatement("INSERT INTO " + table.getEscapedTableName() + " VALUES(?)");
blob = conn.createBlob();
blob.setBytes(1, data);
ps.setBlob(1, blob);
ps.executeUpdate();
}

byte[] chunk = new byte[size];
ResultSet rs = stmt.executeQuery("select * from " + table.getEscapedTableName());
for (int i = 0; i < 5; i++)
{
rs.next();

blob = rs.getBlob(1);
stream = blob.getBinaryStream();
while (stream.available() > 0)
stream.read();
blob.free();
try {
stream = blob.getBinaryStream();
} catch (SQLException e) {
assertTrue(e.getMessage().contains("This Blob object has been freed."));
}
}
rs.close();
try {
stream = blob.getBinaryStream();
} catch (SQLException e) {
assertTrue(e.getMessage().contains("This Blob object has been freed."));
}
}
catch (Exception e) {
this.dropTables(table);
e.printStackTrace();
}
}

@Test
@DisplayName("testMultipleCloseCharacterStream")
public void testMultipleCloseCharacterStream() throws Exception {
Expand Down Expand Up @@ -413,6 +466,92 @@ public void testUpdatorClob() throws Exception {
String types[] = {"varchar(max)"};
testUpdateLobs(types, Clob.class);
}

@Test
@DisplayName("readBlobStreamAfterClosingRS")
public void readBlobStreamAfterClosingRS() throws Exception {
String types[] = {"varbinary(max)"};
table = createTable(table, types, false); // create empty table
int size = 10000;

byte[] data = new byte[size];
ThreadLocalRandom.current().nextBytes(data);

Blob blob = null;
InputStream stream = null;
PreparedStatement ps = conn.prepareStatement("INSERT INTO " + table.getEscapedTableName() + " VALUES(?)");
blob = conn.createBlob();
blob.setBytes(1, data);
ps.setBlob(1, blob);
ps.executeUpdate();

byte[] chunk = new byte[size];
ResultSet rs = stmt.executeQuery("select * from " + table.getEscapedTableName());
rs.next();

blob = rs.getBlob(1);
stream = blob.getBinaryStream();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int read = 0;
while ((read = stream.read(chunk)) > 0)
buffer.write(chunk, 0, read);
assertEquals(chunk.length, size);
rs.close();
stream = blob.getBinaryStream();
buffer = new ByteArrayOutputStream();
read = 0;
while ((read = stream.read(chunk)) > 0)
buffer.write(chunk, 0, read);
assertEquals(chunk.length, size);

if (null != blob)
blob.free();
dropTables(table);
}

@Test
@DisplayName("readMultipleBlobStreamsThenCloseRS")
public void readMultipleBlobStreamsThenCloseRS() throws Exception {
String types[] = {"varbinary(max)"};
table = createTable(table, types, false);
int size = 10000;

byte[] data = new byte[size];
Blob[] blobs = {null, null, null, null, null};
InputStream stream = null;
for (int i = 0; i < 5; i++)//create 5 blobs
{
PreparedStatement ps = conn.prepareStatement("INSERT INTO " + table.getEscapedTableName() + " VALUES(?)");
blobs[i] = conn.createBlob();
ThreadLocalRandom.current().nextBytes(data);
blobs[i].setBytes(1, data);
ps.setBlob(1, blobs[i]);
ps.executeUpdate();
}
byte[] chunk = new byte[size];
ResultSet rs = stmt.executeQuery("select * from " + table.getEscapedTableName());
for (int i = 0; i < 5; i++)
{
rs.next();
blobs[i] = rs.getBlob(1);
stream = blobs[i].getBinaryStream();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int read = 0;
while ((read = stream.read(chunk)) > 0)
buffer.write(chunk, 0, read);
assertEquals(chunk.length, size);
}
rs.close();
for (int i = 0; i < 5; i++)
{
stream = blobs[i].getBinaryStream();
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int read = 0;
while ((read = stream.read(chunk)) > 0)
buffer.write(chunk, 0, read);
assertEquals(chunk.length, size);
}
}

private void testUpdateLobs(String types[],
Class lobClass) throws Exception {
Expand Down

0 comments on commit db0f41a

Please sign in to comment.