Skip to content

Commit

Permalink
Add AAD Support (#17)
Browse files Browse the repository at this point in the history
#What's supported
AAD authentication support for connector

#Changes
Support for AAD auth.
Test for AAD
Databricks sample for AAD with SQL
Cleanup formatting, dependencies

#Test done
Unit test run
AAD test pass on databricks with Azure SQL AAD
Tested on BDC for user/pass based authentication. no regression issue.
  • Loading branch information
cchighman committed Jul 22, 2020
1 parent c2fb164 commit 30ba3a5
Show file tree
Hide file tree
Showing 10 changed files with 566 additions and 190 deletions.
40 changes: 40 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,46 @@ jdbcDF = spark.read \
.option("password", password).load()
```

### Azure Active Directory Authentication

**Python Example with Service Principal**
```python
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)
access_token = token["accessToken"]

jdbc_db = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("accessToken", access_token) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
```

**Python Example with Active Directory Password**
```python
jdbc_df = spark.read \
.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", table_name) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user_name) \
.option("password", password) \
.option("encrypt", "true") \
.option("hostNameInCertificate", "*.database.windows.net") \
.load()
```
A required dependency must be installed in order to authenticate using
Active Directory.

For **Scala,** the _com.microsoft.aad.adal4j_ artifact will need to be installed.

For **Python,** the _adal_ library will need to be installed. This is available
via pip.


Please check the [sample notebooks](samples) for examples.

# Support
Expand Down
13 changes: 10 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,26 @@ version := "1.0.0"

scalaVersion := "2.11.12"

val sparkVersion = "2.4.0"
val sparkVersion = "2.4.6"

javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
// Spark Testing Utilities
"org.apache.spark" %% "spark-core" % sparkVersion % "test" classifier
"tests",
"org.apache.spark" %% "spark-sql" % sparkVersion% "test" classifier
"tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion % "test" classifier
"tests",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",

//SQLServer JDBC jars
"com.microsoft.sqlserver" % "mssql-jdbc" % "7.2.1.jre8",

"com.microsoft.sqlserver" % "mssql-jdbc" % "7.2.1.jre8"
)

scalacOptions := Seq("-unchecked", "-deprecation", "evicted")

// Exclude scala-library from this fat jar. The scala library is already there in spark package.
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.2.3
sbt.version=1.3.13

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"cells":[{"cell_type":"code","source":["# Microsoft ADAL Library \n# The Python ADAL library will need to be installed.\n# Example: 'pip install adal'\n\n# Documentation\n# https://github.com/AzureAD/azure-activedirectory-library-for-python\n\n# Example\n# https://github.com/AzureAD/azure-activedirectory-library-for-python/blob/dev/sample/client_credentials_sample.py\n\n# Note, the Python ADAL library is no longer being maintained and it's documentation suggests to use MSAL. \n# Unfortunately, it doesn't have feature parity with adal at this point and does not support service principal authentication.\nimport adal"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["# Located in App Registrations from Azure Portal\ntenant_id = \"72f988bf-86f1-41af-91ab-2d7cd011db47\"\n\n# Authority\nauthority = \"https://login.windows.net/\" + tenant_id\n\n# Located in App Registrations from Azure Portal\nresource_app_id_url = \"https://database.windows.net/\"\n\n# In the below example, we're using a Databricks utility that facilitates acquiring secrets from \n# a configured Key Vault. \n\n# Service Principal Client ID - Created in App Registrations from Azure Portal\nservice_principal_id = dbutils.secrets.get(\"principalClientId\")\n\n# Service Principal Secret - Created in App Registrations from Azure Portal\nservice_principal_secret = dbutils.secrets.get(\"principalSecret\")\n\n# SQL Server URL\nurl = \"jdbc:sqlserver://azuresqlserver.database.windows.net\"\n\n# Database Name\ndatabase_name = \"TestDatabase\"\n\n# Database Table Name\ndb_table = \"dbo.TestTable\" \n\n# Encrypt\nencrypt = \"true\"\n\n# Host Name in Certificate\nhost_name_in_certificate = \"*.database.windows.net\""],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":["# SERVICE PRINCIPAL AUTHENTICATION\n# You will need to obtain an access token.\n# The \"accessToken\" option is used in the spark dataframe to indicate this \n# authentication modality.\ncontext = adal.AuthenticationContext(authority)\ntoken = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)\n\n# Set Access Token\naccess_token = token[\"accessToken\"]\n\n# ACTIVE DIRECTORY PASSWORD AUTHENTICATION\n# The \"authentication\" option with the value of \"ActiveDirectoryPassword\"\n# is used in the spark dataframe to indicate this \n# authentication modality. \n#\n# The \"user\" and \"password\" options apply to both SQL Authentication\n# and Active Directory Authentication. SQL Authentication is used\n# by default and can be switched to Active Directory with the\n# authentication option above.\nuser = dbutils.secrets.get(\"adUser\")\npassword = dbutils.secrets.get(\"adPassword\")\n"],"metadata":{},"outputs":[],"execution_count":3},{"cell_type":"code","source":["jdbc_df = spark.read.format(\"com.microsoft.sqlserver.jdbc.spark\") \\\n .option(\"url\", url) \\\n .option(\"dbtable\", db_table) \\\n .option(\"accessToken\", access_token) \\\n .option(\"encrypt\", encrypt) \\\n .option(\"databaseName\", database_name) \\\n .option(\"hostNameInCertificate\", host_name_in_certificate) \\\n .load() \n\ndisplay(jdbc_df.select(\"SourceViewName\").limit(1))"],"metadata":{},"outputs":[],"execution_count":4},{"cell_type":"code","source":["jdbc_df = spark.read.format(\"com.microsoft.sqlserver.jdbc.spark\") \\\n .option(\"url\", url) \\\n .option(\"dbtable\", db_table) \\\n .option(\"authentication\", \"ActiveDirectoryPassword\") \\\n .option(\"user\", user) \\\n .option(\"password\", password) \\\n .option(\"encrypt\", encrypt) \\\n .option(\"databaseName\", database_name) \\\n .option(\"hostNameInCertificate\", host_name_in_certificate) \\\n .load() \n\ndisplay(jdbc_df.select(\"SourceViewName\").limit(1))"],"metadata":{},"outputs":[],"execution_count":5}],"metadata":{"name":"SQL Spark Connector - Python AAD Auth","notebookId":1021193713446438},"nbformat":4,"nbformat_minor":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Databricks notebook source
# DBTITLE 1,Import Python ADAL Library
# Microsoft ADAL Library
# The Python ADAL library will need to be installed.
# Example: 'pip install adal'

# Documentation
# https://github.com/AzureAD/azure-activedirectory-library-for-python

# Example
# https://github.com/AzureAD/azure-activedirectory-library-for-python/blob/dev/sample/client_credentials_sample.py

# Note, the Python ADAL library is no longer being maintained and it's documentation suggests to use MSAL.
# Unfortunately, it doesn't have feature parity with adal at this point and does not support service principal authentication.
import adal

# COMMAND ----------

# DBTITLE 1,Configure Connection
# Located in App Registrations from Azure Portal
tenant_id = "72f988bf-86f1-41af-91ab-2d7cd011db47"

# Authority
authority = "https://login.windows.net/" + tenant_id

# Located in App Registrations from Azure Portal
resource_app_id_url = "https://database.windows.net/"

# In the below example, we're using a Databricks utility that facilitates acquiring secrets from
# a configured Key Vault.

# Service Principal Client ID - Created in App Registrations from Azure Portal
service_principal_id = dbutils.secrets.get("principalClientId")

# Service Principal Secret - Created in App Registrations from Azure Portal
service_principal_secret = dbutils.secrets.get("principalSecret")

# SQL Server URL
url = "jdbc:sqlserver://azuresqlserver.database.windows.net"

# Database Name
database_name = "TestDatabase"

# Database Table Name
db_table = "dbo.TestTable"

# Encrypt
encrypt = "true"

# Host Name in Certificate
host_name_in_certificate = "*.database.windows.net"

# COMMAND ----------

# DBTITLE 1,Authentication Options
# SERVICE PRINCIPAL AUTHENTICATION
# You will need to obtain an access token.
# The "accessToken" option is used in the spark dataframe to indicate this
# authentication modality.
context = adal.AuthenticationContext(authority)
token = context.acquire_token_with_client_credentials(resource_app_id_url, service_principal_id, service_principal_secret)

# Set Access Token
access_token = token["accessToken"]

# ACTIVE DIRECTORY PASSWORD AUTHENTICATION
# The "authentication" option with the value of "ActiveDirectoryPassword"
# is used in the spark dataframe to indicate this
# authentication modality.
#
# The "user" and "password" options apply to both SQL Authentication
# and Active Directory Authentication. SQL Authentication is used
# by default and can be switched to Active Directory with the
# authentication option above.
user = dbutils.secrets.get("adUser")
password = dbutils.secrets.get("adPassword")


# COMMAND ----------

# DBTITLE 1,Query SQL using Spark with Service Principal Token
jdbc_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", db_table) \
.option("accessToken", access_token) \
.option("encrypt", encrypt) \
.option("databaseName", database_name) \
.option("hostNameInCertificate", host_name_in_certificate) \
.load()

display(jdbc_df.select("SourceViewName").limit(1))

# COMMAND ----------

# DBTITLE 1,Query SQL using Spark with Active Directory Password
jdbc_df = spark.read.format("com.microsoft.sqlserver.jdbc.spark") \
.option("url", url) \
.option("dbtable", db_table) \
.option("authentication", "ActiveDirectoryPassword") \
.option("user", user) \
.option("password", password) \
.option("encrypt", encrypt) \
.option("databaseName", database_name) \
.option("hostNameInCertificate", host_name_in_certificate) \
.load()

display(jdbc_df.select("SourceViewName").limit(1))

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Databricks notebook source
// DBTITLE 1,Import Microsoft ADAL Library
// REQUIREMENT - The com.microsoft.aad.adal4j artifact must be included as a dependency.
import com.microsoft.aad.adal4j.{AuthenticationContext, ClientCredential}

// COMMAND ----------

// DBTITLE 1,Import Dependencies
import org.apache.spark.sql.SparkSession
import java.util.concurrent.Executors

// COMMAND ----------

// DBTITLE 1,Setup Connection Properties
val url = "jdbc:sqlserver://azuresqlserver.database.windows.net:1433;databaseName=TestDatabase"
val dbTable = "dbo.TestTable"

// In the below example, we're using a Databricks utility that facilitates acquiring secrets from
// a configured Key Vault.

// Service Principal Client ID - Created in App Registrations from Azure Portal
val principalClientId = dbutils.secrets.get("principalClientId")

// Service Principal Secret - Created in App Registrations from Azure Portal
val principalSecret = dbutils.secrets.get("principalSecret")

// Located in App Registrations from Azure Portal
val TenantId = "72f988bf-0000-0000-0000-00000000"

val authority = "https://login.windows.net/" + TenantId
val resourceAppIdURI = "https://database.windows.net/"

// COMMAND ----------

// DBTITLE 1,Authentication Options
// SERVICE PRINCIPAL AUTHENTICATION
// You will need to obtain an access token.
// The "accessToken" option is used in the spark dataframe to indicate this
// authentication modality.
val service = Executors.newFixedThreadPool(1)
val context = new AuthenticationContext(authority, true, service);
val ClientCred = new ClientCredential(principalClientId, principalSecret)
val authResult = context.acquireToken(resourceAppIdURI, ClientCred, null)

val accessToken = authResult.get().getAccessToken

// ACTIVE DIRECTORY PASSWORD AUTHENTICATION
// The "authentication" option with the value of "ActiveDirectoryPassword"
// is used in the spark dataframe to indicate this
// authentication modality.
//
// The "user" and "password" options apply to both SQL Authentication
// and Active Directory Authentication. SQL Authentication is used
// by default and can be switched to Active Directory with the
// authentication option above.
val user = dbutils.secrets.get("adUser")
val password = dbutils.secrets.get("adPassword")


// COMMAND ----------

// DBTITLE 1,Query SQL using Spark with Service Principal

val jdbcDF = spark.read
.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", url)
.option("dbtable", dbTable)
.option("accessToken", accessToken)
.option("encrypt", "true")
.option("hostNameInCertificate", "*.database.windows.net")
.load()

display(jdbcDF.select("SourceViewName").limit(1))

// COMMAND ----------

// DBTITLE 1,Query SQL using Spark with Active Directory Password

val jdbcDF = spark.read
.format("com.microsoft.sqlserver.jdbc.spark")
.option("url", url)
.option("dbtable", dbTable)
.option("authentication", "ActiveDirectoryPassword")
.option("user", user)
.option("password", password)
.option("encrypt", "true")
.option("hostNameInCertificate", "*.database.windows.net")
.load()

display(jdbcDF.select("SourceViewName").limit(1))
Original file line number Diff line number Diff line change
@@ -1,59 +1,69 @@
package com.microsoft.sqlserver.jdbc.spark

import java.sql.Connection

import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite

class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String]) extends JdbcOptionsInWrite(params) {

def this(params: Map[String, String]) = this(CaseInsensitiveMap(params))

// Save original parameters for when a JdbcBulkOptions instance is passed
// from the Spark driver to an executor, which loses the reference to the
// params input in memory
override val parameters = params

val user = params.getOrElse("user", null)
val password = params.getOrElse("password", null)
val dbtable = params.getOrElse("dbtable", null)

// If no value is provided, then we write to a single SQL Server instance.
// A non-empty value indicates the name of a data source whose location is
// the data pool that the user wants to write to. This data source will
// contain the user's external table.
val dataPoolDataSource = params.getOrElse("dataPoolDataSource", null)

// In the standard Spark JDBC implementation, the default isolation level is
// "READ_UNCOMMITTED," but for SQL Server, the default is "READ_COMMITTED"
override val isolationLevel = params.getOrElse("mssqlIsolationLevel", "READ_COMMITTED") match {
case "READ_UNCOMMITTED" => Connection.TRANSACTION_READ_UNCOMMITTED
case "READ_COMMITTED" => Connection.TRANSACTION_READ_COMMITTED
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
case "SNAPSHOT" => Connection.TRANSACTION_READ_COMMITTED + 4094
import org.apache.spark.sql.execution.datasources.jdbc.{
JDBCOptions,
JdbcOptionsInWrite
}

class SQLServerBulkJdbcOptions(val params: CaseInsensitiveMap[String])
extends JdbcOptionsInWrite(params) {

def this(params: Map[String, String]) = this(CaseInsensitiveMap(params))

// Save original parameters for when a JdbcBulkOptions instance is passed
// from the Spark driver to an executor, which loses the reference to the
// params input in memory
override val parameters = params

val dbtable = params.getOrElse("dbtable", null)

val user = params.getOrElse("user", null)
val password = params.getOrElse("password", null)

// If no value is provided, then we write to a single SQL Server instance.
// A non-empty value indicates the name of a data source whose location is
// the data pool that the user wants to write to. This data source will
// contain the user's external table.
val dataPoolDataSource = params.getOrElse("dataPoolDataSource", null)

// In the standard Spark JDBC implementation, the default isolation level is
// "READ_UNCOMMITTED," but for SQL Server, the default is "READ_COMMITTED"
override val isolationLevel =
params.getOrElse("mssqlIsolationLevel", "READ_COMMITTED") match {
case "READ_UNCOMMITTED" => Connection.TRANSACTION_READ_UNCOMMITTED
case "READ_COMMITTED" => Connection.TRANSACTION_READ_COMMITTED
case "REPEATABLE_READ" => Connection.TRANSACTION_REPEATABLE_READ
case "SERIALIZABLE" => Connection.TRANSACTION_SERIALIZABLE
case "SNAPSHOT" => Connection.TRANSACTION_READ_COMMITTED + 4094
}

val reliabilityLevel = params.getOrElse("reliabilityLevel", "BEST_EFFORT") match {
case "BEST_EFFORT" => SQLServerBulkJdbcOptions.BEST_EFFORT
case "NO_DUPLICATES" => SQLServerBulkJdbcOptions.NO_DUPLICATES
val reliabilityLevel =
params.getOrElse("reliabilityLevel", "BEST_EFFORT") match {
case "BEST_EFFORT" => SQLServerBulkJdbcOptions.BEST_EFFORT
case "NO_DUPLICATES" => SQLServerBulkJdbcOptions.NO_DUPLICATES
}

// batchSize is already defined in JDBCOptions superclass
val checkConstraints = params.getOrElse("checkConstraints", "false").toBoolean
val fireTriggers = params.getOrElse("fireTriggers", "false").toBoolean
val keepIdentity = params.getOrElse("keepIdentity", "false").toBoolean
val keepNulls = params.getOrElse("keepNulls", "false").toBoolean
val tableLock = params.getOrElse("tableLock", "false").toBoolean
val allowEncryptedValueModifications = params.getOrElse("allowEncryptedValueModifications", "false").toBoolean
// batchSize is already defined in JDBCOptions superclass
val checkConstraints = params.getOrElse("checkConstraints", "false").toBoolean
val fireTriggers = params.getOrElse("fireTriggers", "false").toBoolean
val keepIdentity = params.getOrElse("keepIdentity", "false").toBoolean
val keepNulls = params.getOrElse("keepNulls", "false").toBoolean
val tableLock = params.getOrElse("tableLock", "false").toBoolean
val allowEncryptedValueModifications =
params.getOrElse("allowEncryptedValueModifications", "false").toBoolean

// Not a feature
// Only used for internally testing data idempotency
val testDataIdempotency = params.getOrElse("testDataIdempotency", "false").toBoolean
// Not a feature
// Only used for internally testing data idempotency
val testDataIdempotency =
params.getOrElse("testDataIdempotency", "false").toBoolean

val dataPoolDistPolicy = params.getOrElse("dataPoolDistPolicy", "ROUND_ROBIN")
val dataPoolDistPolicy = params.getOrElse("dataPoolDistPolicy", "ROUND_ROBIN")
}

object SQLServerBulkJdbcOptions {
val BEST_EFFORT = 0
val NO_DUPLICATES = 1
val BEST_EFFORT = 0
val NO_DUPLICATES = 1
}
Loading

0 comments on commit 30ba3a5

Please sign in to comment.