Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into UDAF
Browse files Browse the repository at this point in the history
Conflicts:
	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala
	sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
  • Loading branch information
yhuai committed Jul 20, 2015
2 parents 0a827b3 + 163e3f1 commit 380880f
Show file tree
Hide file tree
Showing 182 changed files with 8,406 additions and 2,377 deletions.
10 changes: 5 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1314,7 +1314,7 @@ setMethod("except",
#' write.df(df, "myfile", "parquet", "overwrite")
#' }
setMethod("write.df",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand All @@ -1328,7 +1328,7 @@ setMethod("write.df",
jmode <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "saveMode", mode)
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
callJMethod(df@sdf, "save", source, jmode, options)
})
Expand All @@ -1337,7 +1337,7 @@ setMethod("write.df",
#' @aliases saveDF
#' @export
setMethod("saveDF",
signature(df = "DataFrame", path = 'character'),
signature(df = "DataFrame", path = "character"),
function(df, path, source = NULL, mode = "append", ...){
write.df(df, path, source, mode, ...)
})
Expand Down Expand Up @@ -1375,8 +1375,8 @@ setMethod("saveDF",
#' saveAsTable(df, "myfile")
#' }
setMethod("saveAsTable",
signature(df = "DataFrame", tableName = 'character', source = 'character',
mode = 'character'),
signature(df = "DataFrame", tableName = "character", source = "character",
mode = "character"),
function(df, tableName, source = NULL, mode="append", ...){
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ dropTempTable <- function(sqlContext, tableName) {
read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
if (is.null(source)) {
sqlContext <- get(".sparkRSQLsc", envir = .sparkREnv)
Expand Down Expand Up @@ -506,7 +506,7 @@ loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
createExternalTable <- function(sqlContext, tableName, path = NULL, source = NULL, ...) {
options <- varargsToEnv(...)
if (!is.null(path)) {
options[['path']] <- path
options[["path"]] <- path
}
sdf <- callJMethod(sqlContext, "createExternalTable", tableName, source, options)
dataFrame(sdf)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/R/serialize.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ writeType <- function(con, class) {
jobj = "j",
environment = "e",
Date = "D",
POSIXlt = 't',
POSIXct = 't',
POSIXlt = "t",
POSIXct = "t",
stop(paste("Unsupported type for serialization", class)))
writeBin(charToRaw(type), con)
}
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ sparkR.init <- function(
if (!file.exists(path)) {
stop("JVM is not ready after 10 seconds")
}
f <- file(path, open='rb')
f <- file(path, open="rb")
backendPort <- readInt(f)
monitorPort <- readInt(f)
close(f)
Expand Down
4 changes: 2 additions & 2 deletions R/pkg/inst/tests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ test_that("infer types", {
expect_equal(infer_type(as.Date("2015-03-11")), "date")
expect_equal(infer_type(as.POSIXlt("2015-03-11 12:13:04.043")), "timestamp")
expect_equal(infer_type(c(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
list(type = "array", elementType = "integer", containsNull = TRUE))
expect_equal(infer_type(list(1L, 2L)),
list(type = 'array', elementType = "integer", containsNull = TRUE))
list(type = "array", elementType = "integer", containsNull = TRUE))
testStruct <- infer_type(list(a = 1L, b = "2"))
expect_equal(class(testStruct), "structType")
checkStructField(testStruct$fields()[[1]], "a", "IntegerType", TRUE)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object TaskContext {
*/
def getPartitionId(): Int = {
val tc = taskContext.get()
if (tc == null) {
if (tc eq null) {
0
} else {
tc.partitionId()
Expand Down
29 changes: 18 additions & 11 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import java.util.{Arrays, Comparator}
import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.util.control.NonFatal

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -248,19 +249,25 @@ class SparkHadoopUtil extends Logging {
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
try {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
fileStatuses
} catch {
case NonFatal(e) =>
logWarning("Error while attempting to list files from application staging dir", e)
Array.empty
}
}

/**
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -508,8 +508,14 @@ object SparkSubmit {
}

// Let YARN know it's a pyspark app, so it distributes needed libraries.
if (clusterManager == YARN && args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
if (clusterManager == YARN) {
if (args.isPython) {
sysProps.put("spark.yarn.isPython", "true")
}
if (args.principal != null) {
require(args.keytab != null, "Keytab must be specified when the keytab is specified")
UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
}
}

// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,8 +407,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
* application. Order is: attempts are sorted by descending start time.
* Most recent attempt state matches with current state of the app.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
Expand All @@ -418,11 +418,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
} else {
!a1.completed
}
a1.startTime >= a2.startTime
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,13 +243,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
appListAfterRename.size should be (1)
}

test("apps with multiple attempts") {
test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)

updateAndCheck(provider) { list =>
Expand All @@ -259,7 +258,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)

updateAndCheck(provider) { list =>
Expand All @@ -268,30 +267,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
list.head.attempts.head.attemptId should be (Some("attempt2"))
}

val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
writeFile(attempt3, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)

updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt3"))
}

val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
writeFile(attempt2, true, None,
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
SparkListenerApplicationEnd(6L)
)

updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))

list.foreach { case app =>
Expand Down
6 changes: 6 additions & 0 deletions data/mllib/sample_naive_bayes_data.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
0,1 0 0
0,2 0 0
0,3 0 0
0,4 0 0
1,0 1 0
1,0 2 0
1,0 3 0
1,0 4 0
2,0 0 1
2,0 0 2
2,0 0 3
2,0 0 4
23 changes: 13 additions & 10 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ def merge_pr(pr_num, target_ref, title, body, pr_repo_desc):
'--pretty=format:%an <%ae>']).split("\n")
distinct_authors = sorted(set(commit_authors),
key=lambda x: commit_authors.count(x), reverse=True)
primary_author = distinct_authors[0]
primary_author = raw_input(
"Enter primary author in the format of \"name <email>\" [%s]: " %
distinct_authors[0])

commits = run_cmd(['git', 'log', 'HEAD..%s' % pr_branch_name,
'--pretty=format:%h [%an] %s']).split("\n\n")

Expand Down Expand Up @@ -281,7 +284,7 @@ def get_version_json(version_str):
resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
asf_jira.transition_issue(
jira_id, resolve["id"], fixVersions = jira_fix_versions,
jira_id, resolve["id"], fixVersions = jira_fix_versions,
comment = comment, resolution = {'id': resolution.raw['id']})

print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)
Expand All @@ -300,7 +303,7 @@ def standardize_jira_ref(text):
"""
Standardize the [SPARK-XXXXX] [MODULE] prefix
Converts "[SPARK-XXX][mllib] Issue", "[MLLib] SPARK-XXX. Issue" or "SPARK XXX [MLLIB]: Issue" to "[SPARK-XXX] [MLLIB] Issue"
>>> standardize_jira_ref("[SPARK-5821] [SQL] ParquetRelation2 CTAS should check if delete is successful")
'[SPARK-5821] [SQL] ParquetRelation2 CTAS should check if delete is successful'
>>> standardize_jira_ref("[SPARK-4123][Project Infra][WIP]: Show new dependencies added in pull requests")
Expand All @@ -322,11 +325,11 @@ def standardize_jira_ref(text):
"""
jira_refs = []
components = []

# If the string is compliant, no need to process any further
if (re.search(r'^\[SPARK-[0-9]{3,6}\] (\[[A-Z0-9_\s,]+\] )+\S+', text)):
return text

# Extract JIRA ref(s):
pattern = re.compile(r'(SPARK[-\s]*[0-9]{3,6})+', re.IGNORECASE)
for ref in pattern.findall(text):
Expand All @@ -348,18 +351,18 @@ def standardize_jira_ref(text):

# Assemble full text (JIRA ref(s), module(s), remaining text)
clean_text = ' '.join(jira_refs).strip() + " " + ' '.join(components).strip() + " " + text.strip()

# Replace multiple spaces with a single space, e.g. if no jira refs and/or components were included
clean_text = re.sub(r'\s+', ' ', clean_text.strip())

return clean_text

def main():
global original_head

os.chdir(SPARK_HOME)
original_head = run_cmd("git rev-parse HEAD")[:8]

branches = get_json("%s/branches" % GITHUB_API_BASE)
branch_names = filter(lambda x: x.startswith("branch-"), [x['name'] for x in branches])
# Assumes branch names can be sorted lexicographically
Expand Down Expand Up @@ -448,5 +451,5 @@ def main():
(failure_count, test_count) = doctest.testmod()
if failure_count:
exit(-1)

main()
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ def contains_file(self, filename):
python_test_goals=[
"pyspark.ml.feature",
"pyspark.ml.classification",
"pyspark.ml.clustering",
"pyspark.ml.recommendation",
"pyspark.ml.regression",
"pyspark.ml.tuning",
Expand Down
Loading

0 comments on commit 380880f

Please sign in to comment.