Skip to content

Commit

Permalink
added python support for crosstab
Browse files Browse the repository at this point in the history
  • Loading branch information
brkyvz committed May 1, 2015
2 parents 27a5a81 + b1f4ca8 commit fd53b00
Show file tree
Hide file tree
Showing 47 changed files with 1,426 additions and 638 deletions.
2 changes: 1 addition & 1 deletion bin/spark-shell2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ rem

set SPARK_HOME=%~dp0..

echo "%*" | findstr " --help -h" >nul
echo "%*" | findstr " \<--help\> \<-h\>" >nul
if %ERRORLEVEL% equ 0 (
call :usage
exit /b 0
Expand Down
27 changes: 9 additions & 18 deletions core/src/main/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,18 +105,23 @@ private[spark] object TestUtils {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}

private[spark] class JavaSourceFromString(val name: String, val code: String)
private class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean): String = code
}

/** Creates a compiled class with the source file. Class file will be placed in destDir. */
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
sourceFile: JavaSourceFromString,
classpathUrls: Seq[URL]): File = {
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
Expand All @@ -139,18 +144,4 @@ private[spark] object TestUtils {
assert(out.exists(), "Destination file not moved: " + out.getAbsolutePath())
out
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(
className: String,
destDir: File,
toStringValue: String = "",
baseClass: String = null,
classpathUrls: Seq[URL] = Seq()): File = {
val extendsText = Option(baseClass).map { c => s" extends ${c}" }.getOrElse("")
val sourceFile = new JavaSourceFromString(className,
"public class " + className + extendsText + " implements java.io.Serializable {" +
" @Override public String toString() { return \"" + toStringValue + "\"; }}")
createCompiledClass(className, destDir, sourceFile, classpathUrls)
}
}
81 changes: 79 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -32,14 +36,17 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

/**
* :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(new SparkConf())
private val sparkConf = new SparkConf()
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
* the respective files.
*/
def listFilesSorted(
remoteFs: FileSystem,
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
}

/**
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
* is valid the latest)?
* This will return -ve (or 0) value if the fraction of validity has already expired.
*/
def getTimeFromNowToRenewal(
sparkConf: SparkConf,
fraction: Double,
credentials: Credentials): Long = {
val now = System.currentTimeMillis()

val renewalInterval =
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)

credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
}.foldLeft(0L)(math.max)
}


private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
val fileName = credentialsPath.getName
fileName.substring(
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
}


private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored

/**
Expand Down Expand Up @@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Start a thread to periodically update the current user's credentials with new delegation
* tokens so that writes to HDFS do not fail.
*/
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}

/**
* Stop the thread that does the delegation token updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}
}

object SparkHadoopUtil {
Expand All @@ -251,6 +324,10 @@ object SparkHadoopUtil {
}
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
133 changes: 64 additions & 69 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy
import java.io.{File, PrintStream}
import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowableException}
import java.net.URL
import java.nio.file.{Path => JavaPath}
import java.security.PrivilegedExceptionAction

import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
Expand Down Expand Up @@ -401,6 +400,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Yarn client or cluster
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
Expand Down Expand Up @@ -709,9 +712,7 @@ private[deploy] object SparkSubmitUtils {
* @param artifactId the artifactId of the coordinate
* @param version the version of the coordinate
*/
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String) {
override def toString: String = s"$groupId:$artifactId:$version"
}
private[deploy] case class MavenCoordinate(groupId: String, artifactId: String, version: String)

/**
* Extracts maven coordinates from a comma-delimited string. Coordinates should be provided
Expand All @@ -734,10 +735,6 @@ private[deploy] object SparkSubmitUtils {
}
}

/** Path of the local Maven cache. */
private[spark] def m2Path: JavaPath = new File(System.getProperty("user.home"),
".m2" + File.separator + "repository" + File.separator).toPath

/**
* Extracts maven coordinates from a comma-delimited string
* @param remoteRepos Comma-delimited string of remote repositories
Expand All @@ -751,7 +748,8 @@ private[deploy] object SparkSubmitUtils {

val localM2 = new IBiblioResolver
localM2.setM2compatible(true)
localM2.setRoot(m2Path.toUri.toString)
val m2Path = ".m2" + File.separator + "repository" + File.separator
localM2.setRoot(new File(System.getProperty("user.home"), m2Path).toURI.toString)
localM2.setUsepoms(true)
localM2.setName("local-m2-cache")
cr.add(localM2)
Expand Down Expand Up @@ -876,72 +874,69 @@ private[deploy] object SparkSubmitUtils {
""
} else {
val sysOut = System.out
try {
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
// To prevent ivy from logging to system out
System.setOut(printStream)
val artifacts = extractMavenCoordinates(coordinates)
// Default configuration name for ivy
val ivyConfName = "default"
// set ivy settings for location of cache
val ivySettings: IvySettings = new IvySettings
// Directories for caching downloads through ivy and storing the jars when maven coordinates
// are supplied to spark-submit
val alternateIvyCache = ivyPath.getOrElse("")
val packagesDirectory: File =
if (alternateIvyCache.trim.isEmpty) {
new File(ivySettings.getDefaultIvyUserDir, "jars")
} else {
resolveOptions.setDownload(true)
ivySettings.setDefaultIvyUserDir(new File(alternateIvyCache))
ivySettings.setDefaultCache(new File(alternateIvyCache, "cache"))
new File(alternateIvyCache, "jars")
}
printStream.println(
s"Ivy Default Cache set to: ${ivySettings.getDefaultCache.getAbsolutePath}")
printStream.println(s"The jars for the packages stored in: $packagesDirectory")
// create a pattern matcher
ivySettings.addMatcher(new GlobPatternMatcher)
// create the dependency resolvers
val repoResolver = createRepoResolvers(remoteRepos, ivySettings)
ivySettings.addResolver(repoResolver)
ivySettings.setDefaultResolver(repoResolver.getName)

val ivy = Ivy.newInstance(ivySettings)
// Set resolve options to download transitive dependencies as well
val resolveOptions = new ResolveOptions
resolveOptions.setTransitive(true)
val retrieveOptions = new RetrieveOptions
// Turn downloading and logging off for testing
if (isTest) {
resolveOptions.setDownload(false)
resolveOptions.setLog(LogOptions.LOG_QUIET)
retrieveOptions.setLog(LogOptions.LOG_QUIET)
} else {
resolveOptions.setDownload(true)
}

// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)
// A Module descriptor must be specified. Entries are dummy strings
val md = getModuleDescriptor
md.setDefaultConf(ivyConfName)

// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)
// Add exclusion rules for Spark and Scala Library
addExclusionRules(ivySettings, ivyConfName, md)
// add all supplied maven artifacts as dependencies
addDependenciesToIvy(md, artifacts, ivyConfName)

// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
} finally {
System.setOut(sysOut)
// resolve dependencies
val rr: ResolveReport = ivy.resolve(md, resolveOptions)
if (rr.hasError) {
throw new RuntimeException(rr.getAllProblemMessages.toString)
}
// retrieve all resolved dependencies
ivy.retrieve(rr.getModuleDescriptor.getModuleRevisionId,
packagesDirectory.getAbsolutePath + File.separator +
"[organization]_[artifact]-[revision].[ext]",
retrieveOptions.setConfs(Array(ivyConfName)))
System.setOut(sysOut)
resolveDependencyPaths(rr.getArtifacts.toArray, packagesDirectory)
}
}
}
Expand Down
Loading

0 comments on commit fd53b00

Please sign in to comment.