Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tvp server cursor fix #234

Merged
merged 20 commits into from
Apr 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 110 additions & 3 deletions src/main/java/com/microsoft/sqlserver/jdbc/IOBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4535,12 +4535,59 @@ void writeTVP(TVP value) throws SQLServerException {
void writeTVPRows(TVP value) throws SQLServerException {
boolean isShortValue, isNull;
int dataLength;


boolean tdsWritterCached = false;
ByteBuffer cachedTVPHeaders = null;
TDSCommand cachedCommand = null;

boolean cachedRequestComplete = false;
boolean cachedInterruptsEnabled = false;
boolean cachedProcessedResponse = false;

if (!value.isNull()) {

// If the preparedStatement and the ResultSet are created by the same connection, and TVP is set with ResultSet and Server Cursor
// is used, the tdsWriter of the calling preparedStatement is overwritten by the SQLServerResultSet#next() method when fetching new rows.
// Therefore, we need to send TVP data row by row before fetching new row.
if (TVPType.ResultSet == value.tvpType) {
if ((null != value.sourceResultSet) && (value.sourceResultSet instanceof SQLServerResultSet)) {
SQLServerResultSet sourceResultSet = (SQLServerResultSet) value.sourceResultSet;
SQLServerStatement src_stmt = (SQLServerStatement) sourceResultSet.getStatement();
int resultSetServerCursorId = sourceResultSet.getServerCursorId();

if (con.equals(src_stmt.getConnection()) && 0 != resultSetServerCursorId) {
cachedTVPHeaders = ByteBuffer.allocate(stagingBuffer.capacity()).order(stagingBuffer.order());
cachedTVPHeaders.put(stagingBuffer.array(), 0, stagingBuffer.position());

cachedCommand = this.command;

cachedRequestComplete = command.getRequestComplete();
cachedInterruptsEnabled = command.getInterruptsEnabled();
cachedProcessedResponse = command.getProcessedResponse();

tdsWritterCached = true;

if (sourceResultSet.isForwardOnly()) {
sourceResultSet.setFetchSize(1);
}
}
}
}

Map<Integer, SQLServerMetaData> columnMetadata = value.getColumnMetadata();
Iterator<Entry<Integer, SQLServerMetaData>> columnsIterator;

while (value.next()) {

// restore command and TDS header, which have been overwritten by value.next()
if (tdsWritterCached) {
command = cachedCommand;

stagingBuffer.clear();
logBuffer.clear();
writeBytes(cachedTVPHeaders.array(), 0, cachedTVPHeaders.position());
}

Object[] rowData = value.getRowData();

// ROW
Expand Down Expand Up @@ -4749,10 +4796,40 @@ else if (DataTypes.UNKNOWN_STREAM_LENGTH == dataLength)
}
currentColumn++;
}

// send this row, read its response (throw exception in case of errors) and reset command status
if (tdsWritterCached) {
// TVP_END_TOKEN
writeByte((byte) 0x00);

writePacket(TDS.STATUS_BIT_EOM);

TDSReader tdsReader = tdsChannel.getReader(command);
int tokenType = tdsReader.peekTokenType();

if (TDS.TDS_ERR == tokenType) {
StreamError databaseError = new StreamError();
databaseError.setFromTDS(tdsReader);

SQLServerException.makeFromDatabaseError(con, null, databaseError.getMessage(), databaseError, false);
}

command.setInterruptsEnabled(true);
command.setRequestComplete(false);
}
}
}
// TVP_END_TOKEN
writeByte((byte) 0x00);

// reset command status which have been overwritten
if (tdsWritterCached) {
command.setRequestComplete(cachedRequestComplete);
command.setInterruptsEnabled(cachedInterruptsEnabled);
command.setProcessedResponse(cachedProcessedResponse);
}
else {
// TVP_END_TOKEN
writeByte((byte) 0x00);
}
}

private static byte[] toByteArray(String s) {
Expand Down Expand Up @@ -6954,6 +7031,16 @@ final void log(Level level,
// interrupt is ignored.
private volatile boolean interruptsEnabled = false;

protected boolean getInterruptsEnabled() {
return interruptsEnabled;
}

protected void setInterruptsEnabled(boolean interruptsEnabled) {
synchronized (interruptLock) {
this.interruptsEnabled = interruptsEnabled;
}
}

// Flag set to indicate that an interrupt has happened.
private volatile boolean wasInterrupted = false;

Expand All @@ -6970,6 +7057,16 @@ private boolean wasInterrupted() {
// After the request is complete, the interrupting thread must send the attention signal.
private volatile boolean requestComplete;

protected boolean getRequestComplete() {
return requestComplete;
}

protected void setRequestComplete(boolean requestComplete) {
synchronized (interruptLock) {
this.requestComplete = requestComplete;
}
}

// Flag set when an attention signal has been sent to the server, indicating that a
// TDS packet containing the attention ack message is to be expected in the response.
// This flag is cleared after the attention ack message has been received and processed.
Expand All @@ -6984,6 +7081,16 @@ boolean attentionPending() {
// ENVCHANGE notifications.
private volatile boolean processedResponse;

protected boolean getProcessedResponse() {
return processedResponse;
}

protected void setProcessedResponse(boolean processedResponse) {
synchronized (interruptLock) {
this.processedResponse = processedResponse;
}
}

// Flag set when this command's response is ready to be read from the server and cleared
// after its response has been received, but not necessarily processed, up to and including
// any attention ack. The command's response is read either on demand as it is processed,
Expand Down
10 changes: 0 additions & 10 deletions src/main/java/com/microsoft/sqlserver/jdbc/Parameter.java
Original file line number Diff line number Diff line change
Expand Up @@ -331,16 +331,6 @@ else if (value instanceof SQLServerDataTable) {
tvpValue = new TVP(tvpName, (SQLServerDataTable) value);
}
else if (value instanceof ResultSet) {
// if ResultSet and PreparedStatemet/CallableStatement are created from same connection object
// with property SelectMethod=cursor, TVP is not supported
if (con.getSelectMethod().equalsIgnoreCase("cursor") && (value instanceof SQLServerResultSet)) {
SQLServerStatement stmt = (SQLServerStatement) ((SQLServerResultSet) value).getStatement();

if (con.equals(stmt.connection)) {
throw new SQLServerException(SQLServerException.getErrString("R_invalidServerCursorForTVP"), null);
}
}

tvpValue = new TVP(tvpName, (ResultSet) value);
}
else if (value instanceof ISQLServerDataRecord) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ protected Object[][] getContents() {
{"R_invalidKeyStoreFile", "Cannot parse \"{0}\". Either the file format is not valid or the password is not correct."}, // for JKS/PKCS
{"R_invalidCEKCacheTtl", "Invalid column encryption key cache time-to-live specified. The columnEncryptionKeyCacheTtl value cannot be negative and timeUnit can only be DAYS, HOURS, MINUTES or SECONDS."},
{"R_sendTimeAsDateTimeForAE", "Use sendTimeAsDateTime=false with Always Encrypted."},
{"R_invalidServerCursorForTVP" , "Use different Connection for source ResultSet and prepared query, if selectMethod is set to cursor for Table-Valued Parameter."},
{"R_TVPnotWorkWithSetObjectResultSet" , "setObject() with ResultSet is not supported for Table-Valued Parameter. Please use setStructured()"},
{"R_invalidQueryTimeout", "The queryTimeout {0} is not valid."},
{"R_invalidSocketTimeout", "The socketTimeout {0} is not valid."},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ String getClassNameLogging() {
private boolean isClosed = false;

private final int serverCursorId;

protected int getServerCursorId() {
return serverCursorId;
}

/** the intended fetch direction to optimize cursor performance */
private int fetchDirection;
Expand Down Expand Up @@ -448,7 +452,7 @@ private void throwNotScrollable() throws SQLServerException {
true);
}

private boolean isForwardOnly() {
protected boolean isForwardOnly() {
return TYPE_SS_DIRECT_FORWARD_ONLY == stmt.getSQLResultSetType() || TYPE_SS_SERVER_CURSOR_FORWARD_ONLY == stmt.getSQLResultSetType();
}

Expand Down
Loading