Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Exception is not thrown as expected when the session is killed in SQL database #1846

Closed
DorisTao77 opened this issue Jun 20, 2022 · 6 comments · Fixed by #1857
Closed

Comments

@DorisTao77
Copy link

DorisTao77 commented Jun 20, 2022

Question

when using this library in Spark to run the SQL query and get the data into a Spark dataframe, if the session is accidentally killed from SQL database side, there's no reset error thrown as expected, but the count result has indicated that the data is not read completely.

steps to reproduce:

(1) use this library spark to run the SQL query and get the data into a Spark dataframe, code shown as below:

%scala
val token = "<token>"

val jdbcHostname = "xinrandatabseserver.database.windows.net"
val jdbcDatabase = "xinranSQLDatabase"
val jdbcPort = 1433
val jdbcUrl = "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net".format(jdbcHostname, jdbcPort, jdbcDatabase)+ ";accessToken="


import java.util.Properties
val connectionProperties = new Properties()
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)
connectionProperties.setProperty("accesstoken", token)

val sql_pushdown = "(select UNITS from payment_balance_new) emp_alias"
val df_stripe_dispute = spark.read.option("connectRetryCount", 200).option("numPartitions",1).jdbc(url=jdbcUrl, table=sql_pushdown, properties=connectionProperties)
df_stripe_dispute.count()

(2) kill the session from the SQL database side by "kill [SPID]"

(3) check the count() result, which is less than the actual one, but no error exception is thrown, the command is executed successfully.

Need help on

In general Spark connects to SqlServer and reads the data using the below code where rs is com.microsoft.sqlserver.jdbc.SQLServerResultSet, so whenever you killed the session rs.next() returned false but didn't throw the exception.

override protected def getNext(): InternalRow = {
if (rs.next()) {
inputMetrics.incRecordsRead(1)
var i = 0
while (i < getters.length) {
getters(i).apply(rs, mutableRow, i)
if (rs.wasNull) mutableRow.setNullAt(i)
i = i + 1
}
mutableRow
} else {
finished = true
null.asInstanceOf[InternalRow]
}
}

Based on the code analysis of com.microsoft.sqlserver.jdbc.SQLServerResultSet.next() there is a possibility that it returns false when the connection is broken, You can refer the code here https://github.com/microsoft/mssql-jdbc/blob/main/src/main/java/com/microsoft/sqlserver/jdbc/SQLServerResultSet.java#L1011-L1107

Could you help check why the exception is not thrown as expected? Thanks!

@tkyc
Copy link
Contributor

tkyc commented Jun 21, 2022

Hi, we'll do some investigating on our end and get back to you.

@tkyc tkyc added the Under Investigation Used for issues under investigation label Jun 21, 2022
@lilgreenbird
Copy link
Contributor

I can not repro this issue when it's run against a local SQL Server as I do see a TDS_ERR reported and the driver will throw an exception that the session is in the Kill state.

However when running against an Azure DB or a remote SQL Server there is just no error reported to the driver so there is nothing the driver can do as it doesn’t know there was a problem . It’s no different than if the network had gone down and the server has gone away eventually it will time out if there’s no response from the server and that’s all.

In the code that's referenced when it tries to fetch the next fetch buffer it does check to see if there was a database error reported so if there was the driver will report the exception but if there was no error reported then there is nothing for the driver to do.

@DorisTao77
Copy link
Author

DorisTao77 commented Jun 27, 2022

Thanks for your analysis! I'm still a little confused, as if I enable the debug logs for jdbc jar which slows down the read operation, it would also thrown the error in this case:
image

Also, if I run the below Scala codes utilizing the driver, the error would be thrown in this case:

%scala
import java.sql.DriverManager
import java.sql.Connection
import java.util.Properties;
import com.microsoft.sqlserver.jdbc.SQLServerResultSet

val jdbcHostname = "xinrandatabseserver.database.windows.net"
val jdbcDatabase = "xinranSQLDatabase"
val jdbcPort = "1433"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

    val jdbcUrl = "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net".format(jdbcHostname, jdbcPort, jdbcDatabase)+ ";accessToken="+token
 
    var connection:Connection = null
     val info:Properties = new Properties();
        info.setProperty("user", token);
    
      // make the connection
      Class.forName(driver)
      connection = DriverManager.getConnection(jdbcUrl,info )

      // create the statement, and run the select query
      var statement = connection.createStatement()
          statement.setFetchSize(100)

      val resultSet = statement.executeQuery("select UNITS from payment_balance_new").asInstanceOf[SQLServerResultSet]
        
      while ( resultSet.next() ) {

        println("__________________________"+resultSet.getString(1))
        Thread.sleep(10)
      }

image

Could you help check what would be the difference here? Thanks in advance!

@lilgreenbird
Copy link
Contributor

depends on timing if the connection died you will see a connection reset

@DorisTao77
Copy link
Author

DorisTao77 commented Jun 30, 2022

Could you help explain the reasons in details? I'm a little confused why the error is not thrown when using spark, but would throw in some certain scenario, such as with the debug logs for jdbc jar is enabled?
Thanks for your help in advance!

@DorisTao77
Copy link
Author

Hi @lilgreenbird , I have investigated the issue with the Databricks side. From the current findings, the issue may still persist with the newly released 11.2.0 version: https://github.com/microsoft/mssql-jdbc/releases/tag/v11.2.0

We have tried to only use Scala codes, which don’t involve Spark and just tries to call the library in the same way how Spark calls it. . After killing the session and giving only part of the records, but the error doesn’t show up. It could be the scenario when the error doesn't show up.

%scala
import java.sql.DriverManager
import java.sql.Connection
import java.util.Properties;
import java.sql.{Connection, PreparedStatement, ResultSet}

val jdbcHostname = "xinrandatabseserver.database.windows.net"
val jdbcDatabase = "xinranSQLDatabase"
val jdbcPort = "1433"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val token = "xxxxxxxxx"

val jdbcUrl = "jdbc:sqlserver://%s:%s;databaseName=%s;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net".format(jdbcHostname, jdbcPort, jdbcDatabase)+ ";accessToken="+token
 
val info:Properties = new Properties();
info.setProperty("accesstoken", token);
    
// make the connection
Class.forName(driver)
connection = DriverManager.getConnection(jdbcUrl,info )

val sqlText = "SELECT 1 FROM (select UNITS from payment_balance_new) emp_alias"
val stmt = connection.prepareStatement(sqlText,
        ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
stmt.setFetchSize(0)
stmt.setQueryTimeout(0)
val resultSet = stmt.executeQuery()
var count = 0 
while ( resultSet.next() ) {
 // println("__________________________"+resultSet.getString(1))
  count = count + 1
}

print(count)
// the actual result should be 2673440
//assert(count == 2673440)

image

Could you help further look into it? Thanks in advance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants