Skip to content

Commit

Permalink
[SPARK-31021][SQL] Support MariaDB Kerberos login in JDBC connector
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
When loading DataFrames from JDBC datasource with Kerberos authentication, remote executors (yarn-client/cluster etc. modes) fail to establish a connection due to lack of Kerberos ticket or ability to generate it.

This is a real issue when trying to ingest data from kerberized data sources (SQL Server, Oracle) in enterprise environment where exposing simple authentication access is not an option due to IT policy issues.

In this PR I've added MariaDB support (other supported databases will come in later PRs).

What this PR contains:
* Introduced `SecureConnectionProvider` and added basic secure functionalities
* Added `MariaDBConnectionProvider`
* Added `MariaDBConnectionProviderSuite`
* Added `MariaDBKrbIntegrationSuite` docker integration test
* Added some missing code documentation

### Why are the changes needed?
Missing JDBC kerberos support.

### Does this PR introduce any user-facing change?
Yes, now user is able to connect to MariaDB using kerberos.

### How was this patch tested?
* Additional + existing unit tests
* Additional + existing integration tests
* Test on cluster manually

Closes apache#28019 from gaborgsomogyi/SPARK-31021.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@apache.org>
  • Loading branch information
gaborgsomogyi authored and Seongjin Cho committed Apr 14, 2020
1 parent d19ea07 commit 8f4fc69
Show file tree
Hide file tree
Showing 20 changed files with 457 additions and 189 deletions.
4 changes: 2 additions & 2 deletions external/docker-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<groupId>org.mariadb.jdbc</groupId>
<artifactId>mariadb-java-client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

dpkg-divert --add /bin/systemctl && ln -sT /bin/true /bin/systemctl
apt update
apt install -y mariadb-plugin-gssapi-server
echo "gssapi_keytab_path=/docker-entrypoint-initdb.d/mariadb.keytab" >> /etc/mysql/mariadb.conf.d/auth_gssapi.cnf
echo "gssapi_principal_name=mariadb/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM" >> /etc/mysql/mariadb.conf.d/auth_gssapi.cnf
docker-entrypoint.sh mysqld
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#!/usr/bin/env bash

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

mysql -u root -p'rootpass' -e 'CREATE USER "mariadb/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM" IDENTIFIED WITH gssapi;'
mysql -u root -p'rootpass' -D mysql -e 'GRANT ALL PRIVILEGES ON *.* TO "mariadb/__IP_ADDRESS_REPLACE_ME__@EXAMPLE.COM";'
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ abstract class DatabaseOnDocker {
*/
def getJdbcUrl(ip: String, port: Int): String

/**
* Optional entry point when container starts
*
* Startup process is a parameter of entry point. This may or may not be considered during
* startup. Prefer entry point to startup process when you need a command always to be executed or
* you want to change the initialization order.
*/
def getEntryPoint: Option[String] = None

/**
* Optional process to run when container starts
*/
def getStartupProcessName: Option[String]
def getStartupProcessName: Option[String] = None

/**
* Optional step before container starts
Expand All @@ -77,6 +86,7 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
val db: DatabaseOnDocker

private var docker: DockerClient = _
protected var externalPort: Int = _
private var containerId: String = _
protected var jdbcUrl: String = _

Expand All @@ -101,7 +111,7 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
docker.pull(db.imageName)
}
// Configure networking (necessary for boot2docker / Docker Machine)
val externalPort: Int = {
externalPort = {
val sock = new ServerSocket(0)
val port = sock.getLocalPort
sock.close()
Expand All @@ -118,9 +128,11 @@ abstract class DockerJDBCIntegrationSuite extends SharedSparkSession with Eventu
.networkDisabled(false)
.env(db.env.map { case (k, v) => s"$k=$v" }.toSeq.asJava)
.exposedPorts(s"${db.jdbcPort}/tcp")
if(db.getStartupProcessName.isDefined) {
containerConfigBuilder
.cmd(db.getStartupProcessName.get)
if (db.getEntryPoint.isDefined) {
containerConfigBuilder.entrypoint(db.getEntryPoint.get)
}
if (db.getStartupProcessName.isDefined) {
containerConfigBuilder.cmd(db.getStartupProcessName.get)
}
db.beforeContainerStart(hostConfigBuilder, containerConfigBuilder)
containerConfigBuilder.hostConfig(hostConfigBuilder.build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,22 @@
package org.apache.spark.sql.jdbc

import java.io.{File, FileInputStream, FileOutputStream}
import java.sql.Connection
import java.util.Properties
import javax.security.auth.login.Configuration

import scala.io.Source

import org.apache.hadoop.minikdc.MiniKdc

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.{SecurityUtils, Utils}

abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite {
private var kdc: MiniKdc = _
protected var workDir: File = _
protected var entryPointDir: File = _
protected var initDbDir: File = _
protected val userName: String
protected var principal: String = _
protected val keytabFileName: String
Expand All @@ -46,8 +51,9 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite

principal = s"$userName@${kdc.getRealm}"

workDir = Utils.createTempDir()
val keytabFile = new File(workDir, keytabFileName)
entryPointDir = Utils.createTempDir()
initDbDir = Utils.createTempDir()
val keytabFile = new File(initDbDir, keytabFileName)
keytabFullPath = keytabFile.getAbsolutePath
kdc.createPrincipal(keytabFile, userName)
logInfo(s"Created keytab file: $keytabFullPath")
Expand All @@ -62,6 +68,7 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
try {
if (kdc != null) {
kdc.stop()
kdc = null
}
Configuration.setConfiguration(null)
SecurityUtils.setGlobalKrbDebug(false)
Expand All @@ -71,7 +78,7 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
}

protected def copyExecutableResource(
fileName: String, dir: File, processLine: String => String) = {
fileName: String, dir: File, processLine: String => String = identity) = {
val newEntry = new File(dir.getAbsolutePath, fileName)
newEntry.createNewFile()
Utils.tryWithResource(
Expand All @@ -91,4 +98,64 @@ abstract class DockerKrbJDBCIntegrationSuite extends DockerJDBCIntegrationSuite
logInfo(s"Created executable resource file: ${newEntry.getAbsolutePath}")
newEntry
}

override def dataPreparation(conn: Connection): Unit = {
conn.prepareStatement("CREATE TABLE bar (c0 text)").executeUpdate()
conn.prepareStatement("INSERT INTO bar VALUES ('hello')").executeUpdate()
}

test("Basic read test in query option") {
// This makes sure Spark must do authentication
Configuration.setConfiguration(null)

val expectedResult = Set("hello").map(Row(_))

val query = "SELECT c0 FROM bar"
// query option to pass on the query string.
val df = spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("keytab", keytabFullPath)
.option("principal", principal)
.option("query", query)
.load()
assert(df.collect().toSet === expectedResult)
}

test("Basic read test in create table path") {
// This makes sure Spark must do authentication
Configuration.setConfiguration(null)

val expectedResult = Set("hello").map(Row(_))

val query = "SELECT c0 FROM bar"
// query option in the create table path.
sql(
s"""
|CREATE OR REPLACE TEMPORARY VIEW queryOption
|USING org.apache.spark.sql.jdbc
|OPTIONS (url '$jdbcUrl', query '$query', keytab '$keytabFullPath', principal '$principal')
""".stripMargin.replaceAll("\n", " "))
assert(sql("select c0 from queryOption").collect().toSet === expectedResult)
}

test("Basic write test") {
// This makes sure Spark must do authentication
Configuration.setConfiguration(null)

val props = new Properties
props.setProperty("keytab", keytabFullPath)
props.setProperty("principal", principal)

val tableName = "write_test"
sqlContext.createDataFrame(Seq(("foo", "bar")))
.write.jdbc(jdbcUrl, tableName, props)
val df = sqlContext.read.jdbc(jdbcUrl, tableName, props)

val schema = df.schema
assert(schema.map(_.dataType).toSeq === Seq(StringType, StringType))
val rows = df.collect()
assert(rows.length === 1)
assert(rows(0).getString(0) === "foo")
assert(rows(0).getString(1) === "bar")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.jdbc

import javax.security.auth.login.Configuration

import com.spotify.docker.client.messages.{ContainerConfig, HostConfig}

import org.apache.spark.sql.execution.datasources.jdbc.connection.SecureConnectionProvider
import org.apache.spark.tags.DockerTest

@DockerTest
class MariaDBKrbIntegrationSuite extends DockerKrbJDBCIntegrationSuite {
override protected val userName = s"mariadb/$dockerIp"
override protected val keytabFileName = "mariadb.keytab"

override val db = new DatabaseOnDocker {
override val imageName = "mariadb:10.4"
override val env = Map(
"MYSQL_ROOT_PASSWORD" -> "rootpass"
)
override val usesIpc = false
override val jdbcPort = 3306

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=$principal"

override def getEntryPoint: Option[String] =
Some("/docker-entrypoint/mariadb_docker_entrypoint.sh")

override def beforeContainerStart(
hostConfigBuilder: HostConfig.Builder,
containerConfigBuilder: ContainerConfig.Builder): Unit = {
def replaceIp(s: String): String = s.replace("__IP_ADDRESS_REPLACE_ME__", dockerIp)
copyExecutableResource("mariadb_docker_entrypoint.sh", entryPointDir, replaceIp)
copyExecutableResource("mariadb_krb_setup.sh", initDbDir, replaceIp)

hostConfigBuilder.appendBinds(
HostConfig.Bind.from(entryPointDir.getAbsolutePath)
.to("/docker-entrypoint").readOnly(true).build(),
HostConfig.Bind.from(initDbDir.getAbsolutePath)
.to("/docker-entrypoint-initdb.d").readOnly(true).build()
)
}
}

override protected def setAuthentication(keytabFile: String, principal: String): Unit = {
val config = new SecureConnectionProvider.JDBCConfiguration(
Configuration.getConfiguration, "Krb5ConnectorContext", keytabFile, principal)
Configuration.setConfiguration(config)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class MsSqlServerIntegrationSuite extends DockerJDBCIntegrationSuite {

override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:sqlserver://$ip:$port;user=sa;password=Sapass123;"

override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite {
override val jdbcPort: Int = 3306
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:mysql://$ip:$port/mysql?user=root&password=rootpass"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class OracleIntegrationSuite extends DockerJDBCIntegrationSuite with SharedSpark
override val jdbcPort: Int = 1521
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:oracle:thin:system/oracle@//$ip:$port/xe"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationSuite {
override val jdbcPort = 5432
override def getJdbcUrl(ip: String, port: Int): String =
s"jdbc:postgresql://$ip:$port/postgres?user=postgres&password=rootpass"
override def getStartupProcessName: Option[String] = None
}

override def dataPreparation(conn: Connection): Unit = {
Expand Down
Loading

0 comments on commit 8f4fc69

Please sign in to comment.