Skip to content

Commit

Permalink
[SPARK-33084][CORE][SQL] Add jar support ivy path
Browse files Browse the repository at this point in the history
Support add jar with ivy path

Since submit app can support ivy, add jar we can also support ivy now.

User can add jar with sql like
```
add jar ivy:://group:artifict:version?exclude=xxx,xxx&transitive=true
add jar ivy:://group:artifict:version?exclude=xxx,xxx&transitive=false
```

core api
```
sparkContext.addJar("ivy:://group:artifict:version?exclude=xxx,xxx&transitive=true")
sparkContext.addJar("ivy:://group:artifict:version?exclude=xxx,xxx&transitive=false")
```

![image](https://user-images.githubusercontent.com/46485123/101227738-de451200-36d3-11eb-813d-78a8b879da4f.png)

Added UT

Closes apache#29966 from AngersZhuuuu/support-add-jar-ivy.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
AngersZhuuuu authored and xuanyuanking committed Sep 29, 2021
1 parent 1a788e0 commit 474d3b8
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 506 deletions.
51 changes: 27 additions & 24 deletions core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
Expand Up @@ -24,10 +24,9 @@ import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkSubmitUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

case class IvyProperties(
packagesExclusions: String,
Expand All @@ -40,11 +39,11 @@ private[spark] object DependencyUtils extends Logging {

def getIvyProperties(): IvyProperties = {
val Seq(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) = Seq(
JAR_PACKAGES_EXCLUSIONS.key,
JAR_PACKAGES.key,
JAR_REPOSITORIES.key,
JAR_IVY_REPO_PATH.key,
JAR_IVY_SETTING_PATH.key
"spark.jars.excludes",
"spark.jars.packages",
"spark.jars.repositories",
"spark.jars.ivy",
"spark.jars.ivySettings"
).map(sys.props.get(_).orNull)
IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath)
}
Expand All @@ -60,9 +59,8 @@ private[spark] object DependencyUtils extends Logging {
* @param uri Ivy URI need to be downloaded.
* @return Tuple value of parameter `transitive` and `exclude` value.
*
* 1. transitive: whether to download dependency jar of Ivy URI, default value is true
* and this parameter value is case-insensitive. This mimics Hive's behaviour for
* parsing the transitive parameter. Invalid value will be treat as false.
* 1. transitive: whether to download dependency jar of Ivy URI, default value is false
* and this parameter value is case-sensitive. Invalid value will be treat as false.
* Example: Input: exclude=org.mortbay.jetty:jetty&transitive=true
* Output: true
*
Expand All @@ -74,7 +72,7 @@ private[spark] object DependencyUtils extends Logging {
private def parseQueryParams(uri: URI): (Boolean, String) = {
val uriQuery = uri.getQuery
if (uriQuery == null) {
(true, "")
(false, "")
} else {
val mapTokens = uriQuery.split("&").map(_.split("="))
if (mapTokens.exists(isInvalidQueryString)) {
Expand All @@ -83,15 +81,14 @@ private[spark] object DependencyUtils extends Logging {
}
val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)

// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is true
// Parse transitive parameters (e.g., transitive=true) in an Ivy URI, default value is false
val transitiveParams = groupedParams.get("transitive")
if (transitiveParams.map(_.size).getOrElse(0) > 1) {
logWarning("It's best to specify `transitive` parameter in ivy URI query only once." +
" If there are multiple `transitive` parameter, we will select the last one")
}
val transitive =
transitiveParams.flatMap(_.takeRight(1).map(_._2.equalsIgnoreCase("true")).headOption)
.getOrElse(true)
transitiveParams.flatMap(_.takeRight(1).map(_._2 == "true").headOption).getOrElse(false)

// Parse an excluded list (e.g., exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
// in an Ivy URI. When download Ivy URI jar, Spark won't download transitive jar
Expand Down Expand Up @@ -128,11 +125,11 @@ private[spark] object DependencyUtils extends Logging {
* `parameter=value&parameter=value...`
* Note that currently Ivy URI query part support two parameters:
* 1. transitive: whether to download dependent jars related to your Ivy URI.
* transitive=false or `transitive=true`, if not set, the default value is true.
* transitive=false or `transitive=true`, if not set, the default value is false.
* 2. exclude: exclusion list when download Ivy URI jar and dependency jars.
* The `exclude` parameter content is a ',' separated `group:module` pair string :
* `exclude=group:module,group:module...`
* @return List of jars downloaded.
* @return Comma separated string list of jars downloaded.
*/
def resolveMavenDependencies(uri: URI): Seq[String] = {
val ivyProperties = DependencyUtils.getIvyProperties()
Expand All @@ -157,7 +154,7 @@ private[spark] object DependencyUtils extends Logging {
ivyProperties.repositories,
ivyProperties.ivyRepoPath,
Option(ivyProperties.ivySettingsPath)
)
).split(",")
}

def resolveMavenDependencies(
Expand All @@ -166,7 +163,7 @@ private[spark] object DependencyUtils extends Logging {
packages: String,
repositories: String,
ivyRepoPath: String,
ivySettingsPath: Option[String]): Seq[String] = {
ivySettingsPath: Option[String]): String = {
val exclusions: Seq[String] =
if (!StringUtils.isBlank(packagesExclusions)) {
packagesExclusions.split(",")
Expand All @@ -190,7 +187,8 @@ private[spark] object DependencyUtils extends Logging {
jars: String,
userJar: String,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
val targetDir = Utils.createTempDir()
val userJarName = userJar.split(File.separatorChar).last
Option(jars)
Expand All @@ -201,7 +199,7 @@ private[spark] object DependencyUtils extends Logging {
.mkString(",")
}
.filterNot(_ == "")
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf))
.map(downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr))
.orNull
}

Expand All @@ -221,16 +219,18 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return A comma separated local files list.
*/
def downloadFileList(
fileList: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
require(fileList != null, "fileList cannot be null.")
Utils.stringToSeq(fileList)
.map(downloadFile(_, targetDir, sparkConf, hadoopConf))
.map(downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr))
.mkString(",")
}

Expand All @@ -242,13 +242,15 @@ private[spark] object DependencyUtils extends Logging {
* @param targetDir A temporary directory for which downloaded files.
* @param sparkConf Spark configuration.
* @param hadoopConf Hadoop configuration.
* @param secMgr Spark security manager.
* @return Path to the local file.
*/
def downloadFile(
path: String,
targetDir: File,
sparkConf: SparkConf,
hadoopConf: Configuration): String = {
hadoopConf: Configuration,
secMgr: SecurityManager): String = {
require(path != null, "path cannot be null.")
val uri = Utils.resolveURI(path)

Expand All @@ -261,7 +263,8 @@ private[spark] object DependencyUtils extends Logging {
new File(targetDir, file.getName).toURI.toString
case _ =>
val fname = new Path(uri).getName()
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, hadoopConf)
val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
hadoopConf)
localFile.toURI().toString()
}
}
Expand Down

0 comments on commit 474d3b8

Please sign in to comment.