Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need Support for Stored Procedures, Updates, Deletes #21

Closed
furlong46 opened this issue Jul 9, 2020 · 7 comments
Closed

Need Support for Stored Procedures, Updates, Deletes #21

furlong46 opened this issue Jul 9, 2020 · 7 comments
Labels
wontfix This will not be worked on

Comments

@furlong46
Copy link

The previous, unrelated version of this solution supported DDL and DML operations. We need the ability to execute Update, Delete, and Stored Procedures from within a Spark notebook.

https://github.com/Azure/azure-sqldb-spark

@furlong46 furlong46 changed the title Need Support for Stored Procedures, Updates Need Support for Stored Procedures, Updates, Deletes Jul 9, 2020
@rajmera3 rajmera3 added the enhancement New feature or request label Jul 9, 2020
@B4PJS
Copy link

B4PJS commented Jul 22, 2020

@furlong46 You can use the query option for all operations except sprocs. You would first have to stage data into another table before issuing updates etc:

`articleQuery = """SELECT
DimArticleSK
,hope_number
,article_number
FROM dim.Article"""

try:
article = spark.read
.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", url)
.option("query", articleQuery)
.option("user", jdbcUsername)
.option("password", jdbcPassword)
.load()
except ValueError as error :
print("Connector read failed", error)`

@B4PJS
Copy link

B4PJS commented Jul 23, 2020

On further playing around I see the issue and just assumed this wrapper behaved similar to the scala version. To do a non query you can do something like this:

jdbcUsername = "RelationalLayerLogin"
jdbcPassword = dbutils.secrets.get(scope = "AzureKeyVault", key = "RelationalLayerLogin")
server = dbutils.secrets.get(scope = "AzureKeyVault", key = "SqlServerNameSemanticLayer")
database = dbutils.secrets.get(scope = "AzureKeyVault", key = "RelationalLayerDatabase")

url = "jdbc:sqlserver://{SERVER_ADDR};databaseName={DATABASE_NAME};".format(SERVER_ADDR = server,DATABASE_NAME = database)

#for executing non-queries
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
con = driver_manager.getConnection(url, jdbcUsername, jdbcPassword)

drop = "DROP TABLE IF EXISTS Stage.SalesByDateStoreArticle"

stmt = con.createStatement()
stmt.executeUpdate(drop)
stmt.close()

@burma69
Copy link

burma69 commented Aug 8, 2020

with AAD authentication this can look like:

  val properties = new Properties()
  
  properties.setProperty("databaseName", "***")
  properties.setProperty("accessToken", sqlAzureToken)
  properties.setProperty("encrypt", "true")
  properties.setProperty("hostNameInCertificate", "*.database.windows.net")
  properties.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  
  val conn = DriverManager.getConnection("jdbc:sqlserver://***.database.windows.net", properties)
  val rs = conn.prepareStatement(Query).executeUpdate()
  conn.commit()
  conn.close()

@rajmera3 rajmera3 added wontfix This will not be worked on and removed enhancement New feature or request labels Aug 28, 2020
@rajmera3
Copy link
Contributor

Hi all,

While the older Azure SQL DB Spark connector did include this functionality, since this new one is based on the Spark DataSource APIs, it is out of scope.

This functionality is provided by libraries like pyodbc.

@bellowman
Copy link

Hello, can anyone please give me a scala version for B4PJS's code for the DROP TABLE example? I can't get his version to work in scala

@pitscher
Copy link

pitscher commented Mar 11, 2021

Hi, we faced a similar issue today.
This issue is ranking pretty high on Google if you search for "databricks scala execute stored procedure".

What we wanted to do:
From inside a Databricks Scala Notebook access a Stored Procedure (SP) of an Azure SQL Database and return the result of the SP as a Data Frame.
For the connection between the SQL Server and Databricks we used the Apache Spark Connector for SQL Server and Azure SQL and for authorization we used Azure AD.

Here is a fully working example of a Databricks Scala Notebook, accessing an Azure SQL DB with Azure AD and running a Stored Procedure:

If you want to use it, don't forget to change the values of <...> appropriately and install the required libraries!

// Import libraries
import com.microsoft.aad.adal4j.{AuthenticationContext, ClientCredential}
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.Statement
import java.util.concurrent.Executors
import java.util.Properties
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val secretScope = "<yourSecretScope>"
val clientIdKey = "<servicePrincipalClientId>"
val clientSecretKey = "<servicePrincipalClientSecret>"

val tenantId = "<yourTenantId>"
val authority = s"https://login.windows.net/${tenantId}"
val resourceAppIdUri = "https://database.windows.net/"

val sqlServerName = "<yourSqlServersName>"
val sqlDatabaseName = "yourSqlDatabase"
val url = s"jdbc:sqlserver://${sqlServerName}.database.windows.net:1433;databaseName=${sqlDatabaseName}"

// --- Get AccessToken for ServicePrincipal ---
val service = Executors.newFixedThreadPool(1)
val authenticationContext = new AuthenticationContext(authority, true, service)

val principalClientId = dbutils.secrets.get(scope=secretScope, key=clientIdKey)
val principalSecret = dbutils.secrets.get(scope=secretScope, key=clientSecretKey)
val clientCredential = new ClientCredential(principalClientId, principalSecret)

val authResult = authenticationContext.acquireToken(resourceAppIdUri, clientCredential, null)

val accessToken = authResult.get().getAccessToken

// --- Prepare ResultSet to DataFrame conversion ---
// Define columns & prepare schema
val columns = Seq ("<myColumn1>", "<myColumn2>", "<myColumn3>", "<myColumn4>")

val schema = StructType(List(
    StructField("<myColumn1>", StringType, nullable = true),
    StructField("<myColumn2>", StringType, nullable = true),
    StructField("<myColumn3>", StringType, nullable = true),
    StructField("<myColumn4>", StringType, nullable = true)
))

// Define how each record in ResultSet will be converted to a Row at each iteration
def parseResultSet(rs: ResultSet): Row = {
    val resultSetRecord = columns.map(c => rs.getString(c))
    Row(resultSetRecord:_*)
}

// Define a function to convert the ResultSet to an Iterator[Row] (It will use the function of the previous step)
def resultSetToIter(rs: ResultSet)(f: ResultSet => Row): Iterator[Row] =
  new Iterator[Row] {
    def hasNext: Boolean = rs.next()
    def next(): Row = f(rs)
}

// Define a function that creates an RDD out of an Iterator[Row].toSeq which uses the previously definded functions
// The above created schema will be used to create a DataFrame
def parallelizeResultSet(rs: ResultSet, spark: SparkSession): DataFrame = {
  val rdd = spark.sparkContext.parallelize(resultSetToIter(rs)(parseResultSet).toSeq)
  spark.createDataFrame(rdd, schema)
}

// --- Execute StoredProcedure ---
// Configure connection properties
val connectionProperties = new Properties()
connectionProperties.setProperty("Driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
connectionProperties.put("accessToken", accessToken)
connectionProperties.put("encrypt", "true")
connectionProperties.put("hostNameInCertificate", "*.database.windows.net")

// Execute the Stored Procedure with "EXEC <nameOfStoredProcedure> <parameterIfNeeded>"
val connection = DriverManager.getConnection(url, connectionProperties)
val resultSet = connection.createStatement.executeQuery("EXEC <nameOfStoredProcedure> <parameterIfNeeded>")
val df = parallelizeResultSet(resultSet, spark)
display(df)

val columns & val schema must reflect the output of your stored procedure - they reflect how the Data Frame will look like. So adjust the values of both vars!

Keep in mind:
If you want to use Azure AD for auth, you have to install the following library at the cluster you want to run the above notebook on:
com.microsoft.aad:adal4j:0.0.2 (Maven Coordinate)
Further reference:
https://github.com/microsoft/sql-spark-connector/blob/master/samples/Databricks-AzureSQL/DatabricksNotebooks/SQL%20Spark%20Connector%20-%20Scala%20AAD%20Auth.scala#L78

--> The new SQL Spark Connector should already be installed. If not do so.
Source of the Stored Procedure part: https://stackoverflow.com/questions/41117750/scala-resultset-to-spark-dataframe

Hope this is helpful. 🤓

@yyipkei
Copy link

yyipkei commented Jun 7, 2021

spark._sc._gateway.jvm.java.sql.DriverManager

May I know how to pass "accessToken" as connection properties to the function in python?

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wontfix This will not be worked on
Projects
None yet
Development

No branches or pull requests

7 participants