diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 128431334ca52..1662d6bb3b1ac 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -91,8 +91,8 @@ setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD # prev_serializedMode is used during the delayed computation of JRDD in getJRDD } else { - pipelinedFunc <- function(split, iterator) { - func(split, prev@func(split, iterator)) + pipelinedFunc <- function(partIndex, part) { + func(partIndex, prev@func(partIndex, part)) } .Object@func <- cleanClosure(pipelinedFunc) .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline @@ -306,7 +306,7 @@ setMethod("numPartitions", signature(x = "RDD"), function(x) { jrdd <- getJRDD(x) - partitions <- callJMethod(jrdd, "splits") + partitions <- callJMethod(jrdd, "partitions") callJMethod(partitions, "size") }) @@ -452,8 +452,8 @@ setMethod("countByValue", setMethod("lapply", signature(X = "RDD", FUN = "function"), function(X, FUN) { - func <- function(split, iterator) { - lapply(iterator, FUN) + func <- function(partIndex, part) { + lapply(part, FUN) } lapplyPartitionsWithIndex(X, func) }) @@ -538,8 +538,8 @@ setMethod("mapPartitions", #'\dontrun{ #' sc <- sparkR.init() #' rdd <- parallelize(sc, 1:10, 5L) -#' prod <- lapplyPartitionsWithIndex(rdd, function(split, part) { -#' split * Reduce("+", part) }) +#' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) { +#' partIndex * Reduce("+", part) }) #' collect(prod, flatten = FALSE) # 0, 7, 22, 45, 76 #'} #' @rdname lapplyPartitionsWithIndex @@ -813,7 +813,7 @@ setMethod("distinct", #' @examples #'\dontrun{ #' sc <- sparkR.init() -#' rdd <- parallelize(sc, 1:10) # ensure each num is in its own split +#' rdd <- parallelize(sc, 1:10) #' collect(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements #' collect(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates #'} @@ -825,14 +825,14 @@ setMethod("sampleRDD", function(x, withReplacement, fraction, seed) { # The sampler: takes a partition and returns its sampled version. - samplingFunc <- function(split, part) { + samplingFunc <- function(partIndex, part) { set.seed(seed) res <- vector("list", length(part)) len <- 0 # Discards some random values to ensure each partition has a # different random seed. - runif(split) + runif(partIndex) for (elem in part) { if (withReplacement) { @@ -967,7 +967,7 @@ setMethod("keyBy", setMethod("repartition", signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions) { - coalesce(x, numToInt(numPartitions), TRUE) + coalesce(x, numPartitions, TRUE) }) #' Return a new RDD that is reduced into numPartitions partitions. @@ -989,8 +989,8 @@ setMethod("coalesce", function(x, numPartitions, shuffle = FALSE) { numPartitions <- numToInt(numPartitions) if (shuffle || numPartitions > SparkR::numPartitions(x)) { - func <- function(s, part) { - set.seed(s) # split as seed + func <- function(partIndex, part) { + set.seed(partIndex) # partIndex as seed start <- as.integer(sample(numPartitions, 1) - 1) lapply(seq_along(part), function(i) { @@ -1035,7 +1035,7 @@ setMethod("saveAsObjectFile", #' Save this RDD as a text file, using string representations of elements. #' #' @param x The RDD to save -#' @param path The directory where the splits of the text file are saved +#' @param path The directory where the partitions of the text file are saved #' @examples #'\dontrun{ #' sc <- sparkR.init() @@ -1335,10 +1335,10 @@ setMethod("zipWithUniqueId", function(x) { n <- numPartitions(x) - partitionFunc <- function(split, part) { + partitionFunc <- function(partIndex, part) { mapply( function(item, index) { - list(item, (index - 1) * n + split) + list(item, (index - 1) * n + partIndex) }, part, seq_along(part), @@ -1382,11 +1382,11 @@ setMethod("zipWithIndex", startIndices <- Reduce("+", nums, accumulate = TRUE) } - partitionFunc <- function(split, part) { - if (split == 0) { + partitionFunc <- function(partIndex, part) { + if (partIndex == 0) { startIndex <- 0 } else { - startIndex <- startIndices[[split]] + startIndex <- startIndices[[partIndex]] } mapply( diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index ebbb8fba1052d..b4845b6948997 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -17,12 +17,12 @@ # context.R: SparkContext driven functions -getMinSplits <- function(sc, minSplits) { - if (is.null(minSplits)) { +getMinPartitions <- function(sc, minPartitions) { + if (is.null(minPartitions)) { defaultParallelism <- callJMethod(sc, "defaultParallelism") - minSplits <- min(defaultParallelism, 2) + minPartitions <- min(defaultParallelism, 2) } - as.integer(minSplits) + as.integer(minPartitions) } #' Create an RDD from a text file. @@ -33,7 +33,7 @@ getMinSplits <- function(sc, minSplits) { #' #' @param sc SparkContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. -#' @param minSplits Minimum number of splits to be created. If NULL, the default +#' @param minPartitions Minimum number of partitions to be created. If NULL, the default #' value is chosen based on available parallelism. #' @return RDD where each item is of type \code{character} #' @export @@ -42,13 +42,13 @@ getMinSplits <- function(sc, minSplits) { #' sc <- sparkR.init() #' lines <- textFile(sc, "myfile.txt") #'} -textFile <- function(sc, path, minSplits = NULL) { +textFile <- function(sc, path, minPartitions = NULL) { # Allow the user to have a more flexible definiton of the text file path path <- suppressWarnings(normalizePath(path)) #' Convert a string vector of paths to a string containing comma separated paths path <- paste(path, collapse = ",") - jrdd <- callJMethod(sc, "textFile", path, getMinSplits(sc, minSplits)) + jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions)) # jrdd is of type JavaRDD[String] RDD(jrdd, "string") } @@ -60,7 +60,7 @@ textFile <- function(sc, path, minSplits = NULL) { #' #' @param sc SparkContext to use #' @param path Path of file to read. A vector of multiple paths is allowed. -#' @param minSplits Minimum number of splits to be created. If NULL, the default +#' @param minPartitions Minimum number of partitions to be created. If NULL, the default #' value is chosen based on available parallelism. #' @return RDD containing serialized R objects. #' @seealso saveAsObjectFile @@ -70,13 +70,13 @@ textFile <- function(sc, path, minSplits = NULL) { #' sc <- sparkR.init() #' rdd <- objectFile(sc, "myfile") #'} -objectFile <- function(sc, path, minSplits = NULL) { +objectFile <- function(sc, path, minPartitions = NULL) { # Allow the user to have a more flexible definiton of the text file path path <- suppressWarnings(normalizePath(path)) #' Convert a string vector of paths to a string containing comma separated paths path <- paste(path, collapse = ",") - jrdd <- callJMethod(sc, "objectFile", path, getMinSplits(sc, minSplits)) + jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions)) # Assume the RDD contains serialized R objects. RDD(jrdd, "byte") } diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 6c6233390134c..34dbe84051c50 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -60,7 +60,7 @@ setGeneric("countByValue", function(x) { standardGeneric("countByValue") }) #' @rdname distinct #' @export -setGeneric("distinct", function(x, numPartitions = 1L) { standardGeneric("distinct") }) +setGeneric("distinct", function(x, numPartitions = 1) { standardGeneric("distinct") }) #' @rdname filterRDD #' @export @@ -182,7 +182,7 @@ setGeneric("setName", function(x, name) { standardGeneric("setName") }) #' @rdname sortBy #' @export setGeneric("sortBy", - function(x, func, ascending = TRUE, numPartitions = 1L) { + function(x, func, ascending = TRUE, numPartitions = 1) { standardGeneric("sortBy") }) @@ -244,7 +244,7 @@ setGeneric("flatMapValues", function(X, FUN) { standardGeneric("flatMapValues") #' @rdname intersection #' @export -setGeneric("intersection", function(x, other, numPartitions = 1L) { +setGeneric("intersection", function(x, other, numPartitions = 1) { standardGeneric("intersection") }) #' @rdname keys @@ -346,21 +346,21 @@ setGeneric("rightOuterJoin", function(x, y, numPartitions) { standardGeneric("ri #' @rdname sortByKey #' @export setGeneric("sortByKey", - function(x, ascending = TRUE, numPartitions = 1L) { + function(x, ascending = TRUE, numPartitions = 1) { standardGeneric("sortByKey") }) #' @rdname subtract #' @export setGeneric("subtract", - function(x, other, numPartitions = 1L) { + function(x, other, numPartitions = 1) { standardGeneric("subtract") }) #' @rdname subtractByKey #' @export setGeneric("subtractByKey", - function(x, other, numPartitions = 1L) { + function(x, other, numPartitions = 1) { standardGeneric("subtractByKey") }) diff --git a/R/pkg/R/pairRDD.R b/R/pkg/R/pairRDD.R index 13efebc11c46e..9791e55791bae 100644 --- a/R/pkg/R/pairRDD.R +++ b/R/pkg/R/pairRDD.R @@ -190,7 +190,7 @@ setMethod("flatMapValues", #' @rdname partitionBy #' @aliases partitionBy,RDD,integer-method setMethod("partitionBy", - signature(x = "RDD", numPartitions = "integer"), + signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions, partitionFunc = hashCode) { #if (missing(partitionFunc)) { @@ -206,12 +206,12 @@ setMethod("partitionBy", get(name, .broadcastNames) }) jrdd <- getJRDD(x) - # We create a PairwiseRRDD that extends RDD[(Array[Byte], - # Array[Byte])], where the key is the hashed split, the value is + # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])], + # where the key is the target partition number, the value is # the content (key-val pairs). pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD", callJMethod(jrdd, "rdd"), - as.integer(numPartitions), + numToInt(numPartitions), serializedHashFuncBytes, getSerializedMode(x), packageNamesArr, @@ -221,7 +221,7 @@ setMethod("partitionBy", # Create a corresponding partitioner. rPartitioner <- newJObject("org.apache.spark.HashPartitioner", - as.integer(numPartitions)) + numToInt(numPartitions)) # Call partitionBy on the obtained PairwiseRDD. javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD") @@ -256,7 +256,7 @@ setMethod("partitionBy", #' @rdname groupByKey #' @aliases groupByKey,RDD,integer-method setMethod("groupByKey", - signature(x = "RDD", numPartitions = "integer"), + signature(x = "RDD", numPartitions = "numeric"), function(x, numPartitions) { shuffled <- partitionBy(x, numPartitions) groupVals <- function(part) { @@ -315,7 +315,7 @@ setMethod("groupByKey", #' @rdname reduceByKey #' @aliases reduceByKey,RDD,integer-method setMethod("reduceByKey", - signature(x = "RDD", combineFunc = "ANY", numPartitions = "integer"), + signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"), function(x, combineFunc, numPartitions) { reduceVals <- function(part) { vals <- new.env() @@ -422,7 +422,7 @@ setMethod("reduceByKeyLocally", #' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method setMethod("combineByKey", signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY", - mergeCombiners = "ANY", numPartitions = "integer"), + mergeCombiners = "ANY", numPartitions = "numeric"), function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) { combineLocally <- function(part) { combiners <- new.env() @@ -483,7 +483,7 @@ setMethod("combineByKey", #' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method setMethod("aggregateByKey", signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", - combOp = "ANY", numPartitions = "integer"), + combOp = "ANY", numPartitions = "numeric"), function(x, zeroValue, seqOp, combOp, numPartitions) { createCombiner <- function(v) { do.call(seqOp, list(zeroValue, v)) @@ -514,7 +514,7 @@ setMethod("aggregateByKey", #' @aliases foldByKey,RDD,ANY,ANY,integer-method setMethod("foldByKey", signature(x = "RDD", zeroValue = "ANY", - func = "ANY", numPartitions = "integer"), + func = "ANY", numPartitions = "numeric"), function(x, zeroValue, func, numPartitions) { aggregateByKey(x, zeroValue, func, func, numPartitions) }) @@ -553,7 +553,7 @@ setMethod("join", joinTaggedList(v, list(FALSE, FALSE)) } - joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numToInt(numPartitions)), + joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin) }) @@ -582,7 +582,7 @@ setMethod("join", #' @rdname join-methods #' @aliases leftOuterJoin,RDD,RDD-method setMethod("leftOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) @@ -619,7 +619,7 @@ setMethod("leftOuterJoin", #' @rdname join-methods #' @aliases rightOuterJoin,RDD,RDD-method setMethod("rightOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) @@ -659,7 +659,7 @@ setMethod("rightOuterJoin", #' @rdname join-methods #' @aliases fullOuterJoin,RDD,RDD-method setMethod("fullOuterJoin", - signature(x = "RDD", y = "RDD", numPartitions = "integer"), + signature(x = "RDD", y = "RDD", numPartitions = "numeric"), function(x, y, numPartitions) { xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) }) yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) }) @@ -866,8 +866,8 @@ setMethod("sampleByKey", } # The sampler: takes a partition and returns its sampled version. - samplingFunc <- function(split, part) { - set.seed(bitwXor(seed, split)) + samplingFunc <- function(partIndex, part) { + set.seed(bitwXor(seed, partIndex)) res <- vector("list", length(part)) len <- 0 diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 23305d3c67074..0e7b7bd5a5b34 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -501,7 +501,7 @@ appendPartitionLengths <- function(x, other) { # A result RDD. mergePartitions <- function(rdd, zip) { serializerMode <- getSerializedMode(rdd) - partitionFunc <- function(split, part) { + partitionFunc <- function(partIndex, part) { len <- length(part) if (len > 0) { if (serializerMode == "byte") { diff --git a/R/pkg/inst/tests/test_rdd.R b/R/pkg/inst/tests/test_rdd.R index 3ba7d1716302a..d55af93e3e50a 100644 --- a/R/pkg/inst/tests/test_rdd.R +++ b/R/pkg/inst/tests/test_rdd.R @@ -105,8 +105,8 @@ test_that("several transformations on RDD (a benchmark on PipelinedRDD)", { rdd2 <- rdd for (i in 1:12) rdd2 <- lapplyPartitionsWithIndex( - rdd2, function(split, part) { - part <- as.list(unlist(part) * split + i) + rdd2, function(partIndex, part) { + part <- as.list(unlist(part) * partIndex + i) }) rdd2 <- lapply(rdd2, function(x) x + x) actual <- collect(rdd2) @@ -121,8 +121,8 @@ test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkp # PipelinedRDD rdd2 <- lapplyPartitionsWithIndex( rdd2, - function(split, part) { - part <- as.list(unlist(part) * split) + function(partIndex, part) { + part <- as.list(unlist(part) * partIndex) }) cache(rdd2) @@ -174,13 +174,13 @@ test_that("lapply with dependency", { }) test_that("lapplyPartitionsWithIndex on RDDs", { - func <- function(splitIndex, part) { list(splitIndex, Reduce("+", part)) } + func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) } actual <- collect(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE) expect_equal(actual, list(list(0, 15), list(1, 40))) pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L) partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 } - mkTup <- function(splitIndex, part) { list(splitIndex, part) } + mkTup <- function(partIndex, part) { list(partIndex, part) } actual <- collect(lapplyPartitionsWithIndex( partitionBy(pairsRDD, 2L, partitionByParity), mkTup), diff --git a/assembly/pom.xml b/assembly/pom.xml index f1f8b0d3682e2..20593e710dedb 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -213,16 +213,6 @@ - - kinesis-asl - - - org.apache.httpcomponents - httpclient - ${commons.httpclient.version} - - - diff --git a/core/pom.xml b/core/pom.xml index e80829b7a7f3d..5e89d548cd47f 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -74,6 +74,10 @@ javax.servlet servlet-api + + org.codehaus.jackson + jackson-mapper-asl + @@ -275,7 +279,7 @@ org.tachyonproject tachyon-client - 0.5.0 + 0.6.4 org.apache.hadoop diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index 951897cead996..583f1fdf0475b 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.text.SimpleDateFormat import java.util.{Date, Random} -import tachyon.client.TachyonFS -import tachyon.client.TachyonFile +import tachyon.TachyonURI +import tachyon.client.{TachyonFile, TachyonFS} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager( val master: String) extends Logging { - val client = if (master != null && master != "") TachyonFS.get(master) else null + val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager( addShutdownHook() def removeFile(file: TachyonFile): Boolean = { - client.delete(file.getPath(), false) + client.delete(new TachyonURI(file.getPath()), false) } def fileExists(file: TachyonFile): Boolean = { - client.exist(file.getPath()) + client.exist(new TachyonURI(file.getPath())) } def getFile(filename: String): TachyonFile = { @@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager( if (old != null) { old } else { - val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") client.mkdir(path) val newDir = client.getFile(path) subDirs(dirId)(subDirId) = newDir @@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager( } } } - val filePath = subDir + "/" + filename + val filePath = new TachyonURI(s"$subDir/$filename") if(!client.exist(filePath)) { client.createFile(filePath) } @@ -113,7 +113,7 @@ private[spark] class TachyonBlockManager( tries += 1 try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") if (!client.exist(path)) { foundLocalDir = client.mkdir(path) tachyonDir = client.getFile(path) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2feb7341b159b..667aa168e7ef3 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ + +import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ @@ -955,7 +957,7 @@ private[spark] object Utils extends Logging { * Delete a file or directory and its contents recursively. */ def deleteRecursively(dir: TachyonFile, client: TachyonFS) { - if (!client.delete(dir.getPath(), true)) { + if (!client.delete(new TachyonURI(dir.getPath()), true)) { throw new IOException("Failed to delete the tachyon dir: " + dir) } } diff --git a/examples/pom.xml b/examples/pom.xml index afd7c6d52f0dd..df1717403b673 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -390,11 +390,6 @@ spark-streaming-kinesis-asl_${scala.binary.version} ${project.version} - - org.apache.httpcomponents - httpclient - ${commons.httpclient.version} - diff --git a/launcher/pom.xml b/launcher/pom.xml index 182e5f60218db..ebfa7685eaa18 100644 --- a/launcher/pom.xml +++ b/launcher/pom.xml @@ -68,6 +68,12 @@ org.apache.hadoop hadoop-client test + + + org.codehaus.jackson + jackson-mapper-asl + + diff --git a/make-distribution.sh b/make-distribution.sh index 738a9c4d69601..cb65932b4abc0 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -32,7 +32,7 @@ SPARK_HOME="$(cd "`dirname "$0"`"; pwd)" DISTDIR="$SPARK_HOME/dist" SPARK_TACHYON=false -TACHYON_VERSION="0.5.0" +TACHYON_VERSION="0.6.4" TACHYON_TGZ="tachyon-${TACHYON_VERSION}-bin.tar.gz" TACHYON_URL="https://github.com/amplab/tachyon/releases/download/v${TACHYON_VERSION}/${TACHYON_TGZ}" diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala new file mode 100644 index 0000000000000..e6a62d998bb97 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml._ +import org.apache.spark.ml.param._ +import org.apache.spark.ml.param.shared._ +import org.apache.spark.ml.util.SchemaUtils +import org.apache.spark.mllib.feature +import org.apache.spark.mllib.linalg.{Vector, VectorUDT} +import org.apache.spark.sql._ +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.types.StructType + +/** + * Params for [[IDF]] and [[IDFModel]]. + */ +private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol { + + /** + * The minimum of documents in which a term should appear. + * @group param + */ + final val minDocFreq = new IntParam( + this, "minDocFreq", "minimum of documents in which a term should appear for filtering") + + setDefault(minDocFreq -> 0) + + /** @group getParam */ + def getMinDocFreq: Int = getOrDefault(minDocFreq) + + /** @group setParam */ + def setMinDocFreq(value: Int): this.type = set(minDocFreq, value) + + /** + * Validate and transform the input schema. + */ + protected def validateAndTransformSchema(schema: StructType, paramMap: ParamMap): StructType = { + val map = extractParamMap(paramMap) + SchemaUtils.checkColumnType(schema, map(inputCol), new VectorUDT) + SchemaUtils.appendColumn(schema, map(outputCol), new VectorUDT) + } +} + +/** + * :: AlphaComponent :: + * Compute the Inverse Document Frequency (IDF) given a collection of documents. + */ +@AlphaComponent +final class IDF extends Estimator[IDFModel] with IDFBase { + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def fit(dataset: DataFrame, paramMap: ParamMap): IDFModel = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = extractParamMap(paramMap) + val input = dataset.select(map(inputCol)).map { case Row(v: Vector) => v } + val idf = new feature.IDF(map(minDocFreq)).fit(input) + val model = new IDFModel(this, map, idf) + Params.inheritValues(map, this, model) + model + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap) + } +} + +/** + * :: AlphaComponent :: + * Model fitted by [[IDF]]. + */ +@AlphaComponent +class IDFModel private[ml] ( + override val parent: IDF, + override val fittingParamMap: ParamMap, + idfModel: feature.IDFModel) + extends Model[IDFModel] with IDFBase { + + /** @group setParam */ + def setInputCol(value: String): this.type = set(inputCol, value) + + /** @group setParam */ + def setOutputCol(value: String): this.type = set(outputCol, value) + + override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = { + transformSchema(dataset.schema, paramMap, logging = true) + val map = extractParamMap(paramMap) + val idf = udf { vec: Vector => idfModel.transform(vec) } + dataset.withColumn(map(outputCol), idf(col(map(inputCol)))) + } + + override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = { + validateAndTransformSchema(schema, paramMap) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala new file mode 100644 index 0000000000000..d855f04799ae7 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import scala.collection.mutable + +import org.apache.spark.annotation.AlphaComponent +import org.apache.spark.ml.UnaryTransformer +import org.apache.spark.ml.param.{IntParam, ParamMap} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.sql.types.DataType + +/** + * :: AlphaComponent :: + * Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion, + * which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an + * expansion of a product of sums expresses it as a sum of products by using the fact that + * multiplication distributes over addition". Take a 2-variable feature vector as an example: + * `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`. + */ +@AlphaComponent +class PolynomialExpansion extends UnaryTransformer[Vector, Vector, PolynomialExpansion] { + + /** + * The polynomial degree to expand, which should be larger than 1. + * @group param + */ + val degree = new IntParam(this, "degree", "the polynomial degree to expand") + setDefault(degree -> 2) + + /** @group getParam */ + def getDegree: Int = getOrDefault(degree) + + /** @group setParam */ + def setDegree(value: Int): this.type = set(degree, value) + + override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = { v => + val d = paramMap(degree) + PolynomialExpansion.expand(v, d) + } + + override protected def outputDataType: DataType = new VectorUDT() +} + +/** + * The expansion is done via recursion. Given n features and degree d, the size after expansion is + * (n + d choose d) (including 1 and first-order values). For example, let f([a, b, c], 3) be the + * function that expands [a, b, c] to their monomials of degree 3. We have the following recursion: + * + * {{{ + * f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3] + * }}} + * + * To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember the + * current index and increment it properly for sparse input. + */ +object PolynomialExpansion { + + private def choose(n: Int, k: Int): Int = { + Range(n, n - k, -1).product / Range(k, 1, -1).product + } + + private def getPolySize(numFeatures: Int, degree: Int): Int = choose(numFeatures + degree, degree) + + private def expandDense( + values: Array[Double], + lastIdx: Int, + degree: Int, + multiplier: Double, + polyValues: Array[Double], + curPolyIdx: Int): Int = { + if (multiplier == 0.0) { + // do nothing + } else if (degree == 0 || lastIdx < 0) { + if (curPolyIdx >= 0) { // skip the very first 1 + polyValues(curPolyIdx) = multiplier + } + } else { + val v = values(lastIdx) + val lastIdx1 = lastIdx - 1 + var alpha = multiplier + var i = 0 + var curStart = curPolyIdx + while (i <= degree && alpha != 0.0) { + curStart = expandDense(values, lastIdx1, degree - i, alpha, polyValues, curStart) + i += 1 + alpha *= v + } + } + curPolyIdx + getPolySize(lastIdx + 1, degree) + } + + private def expandSparse( + indices: Array[Int], + values: Array[Double], + lastIdx: Int, + lastFeatureIdx: Int, + degree: Int, + multiplier: Double, + polyIndices: mutable.ArrayBuilder[Int], + polyValues: mutable.ArrayBuilder[Double], + curPolyIdx: Int): Int = { + if (multiplier == 0.0) { + // do nothing + } else if (degree == 0 || lastIdx < 0) { + if (curPolyIdx >= 0) { // skip the very first 1 + polyIndices += curPolyIdx + polyValues += multiplier + } + } else { + // Skip all zeros at the tail. + val v = values(lastIdx) + val lastIdx1 = lastIdx - 1 + val lastFeatureIdx1 = indices(lastIdx) - 1 + var alpha = multiplier + var curStart = curPolyIdx + var i = 0 + while (i <= degree && alpha != 0.0) { + curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, degree - i, alpha, + polyIndices, polyValues, curStart) + i += 1 + alpha *= v + } + } + curPolyIdx + getPolySize(lastFeatureIdx + 1, degree) + } + + private def expand(dv: DenseVector, degree: Int): DenseVector = { + val n = dv.size + val polySize = getPolySize(n, degree) + val polyValues = new Array[Double](polySize - 1) + expandDense(dv.values, n - 1, degree, 1.0, polyValues, -1) + new DenseVector(polyValues) + } + + private def expand(sv: SparseVector, degree: Int): SparseVector = { + val polySize = getPolySize(sv.size, degree) + val nnz = sv.values.length + val nnzPolySize = getPolySize(nnz, degree) + val polyIndices = mutable.ArrayBuilder.make[Int] + polyIndices.sizeHint(nnzPolySize - 1) + val polyValues = mutable.ArrayBuilder.make[Double] + polyValues.sizeHint(nnzPolySize - 1) + expandSparse( + sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, polyValues, -1) + new SparseVector(polySize - 1, polyIndices.result(), polyValues.result()) + } + + def expand(v: Vector, degree: Int): Vector = { + v match { + case dv: DenseVector => expand(dv, degree) + case sv: SparseVector => expand(sv, degree) + case _ => throw new IllegalArgumentException + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala new file mode 100644 index 0000000000000..eaee3443c1f23 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/IDFSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.sql.{Row, SQLContext} + +class IDFSuite extends FunSuite with MLlibTestSparkContext { + + @transient var sqlContext: SQLContext = _ + + override def beforeAll(): Unit = { + super.beforeAll() + sqlContext = new SQLContext(sc) + } + + def scaleDataWithIDF(dataSet: Array[Vector], model: Vector): Array[Vector] = { + dataSet.map { + case data: DenseVector => + val res = data.toArray.zip(model.toArray).map { case (x, y) => x * y } + Vectors.dense(res) + case data: SparseVector => + val res = data.indices.zip(data.values).map { case (id, value) => + (id, value * model(id)) + } + Vectors.sparse(data.size, res) + } + } + + test("compute IDF with default parameter") { + val numOfFeatures = 4 + val data = Array( + Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), + Vectors.dense(0.0, 1.0, 2.0, 3.0), + Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) + ) + val numOfData = data.size + val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => + math.log((numOfData + 1.0) / (x + 1.0)) + }) + val expected = scaleDataWithIDF(data, idf) + + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") + + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idfValue") + .fit(df) + + idfModel.transform(df).select("idfValue", "expected").collect().foreach { + case Row(x: Vector, y: Vector) => + assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") + } + } + + test("compute IDF with setter") { + val numOfFeatures = 4 + val data = Array( + Vectors.sparse(numOfFeatures, Array(1, 3), Array(1.0, 2.0)), + Vectors.dense(0.0, 1.0, 2.0, 3.0), + Vectors.sparse(numOfFeatures, Array(1), Array(1.0)) + ) + val numOfData = data.size + val idf = Vectors.dense(Array(0, 3, 1, 2).map { x => + if (x > 0) math.log((numOfData + 1.0) / (x + 1.0)) else 0 + }) + val expected = scaleDataWithIDF(data, idf) + + val df = sqlContext.createDataFrame(data.zip(expected)).toDF("features", "expected") + + val idfModel = new IDF() + .setInputCol("features") + .setOutputCol("idfValue") + .setMinDocFreq(1) + .fit(df) + + idfModel.transform(df).select("idfValue", "expected").collect().foreach { + case Row(x: Vector, y: Vector) => + assert(x ~== y absTol 1e-5, "Transformed vector is different with expected vector.") + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala new file mode 100644 index 0000000000000..c1d64fba0aa8f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.feature + +import org.scalatest.FunSuite + +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.mllib.util.TestingUtils._ +import org.apache.spark.sql.{Row, SQLContext} +import org.scalatest.exceptions.TestFailedException + +class PolynomialExpansionSuite extends FunSuite with MLlibTestSparkContext { + + @transient var sqlContext: SQLContext = _ + + override def beforeAll(): Unit = { + super.beforeAll() + sqlContext = new SQLContext(sc) + } + + test("Polynomial expansion with default parameter") { + val data = Array( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(-2.0, 2.3), + Vectors.dense(0.0, 0.0, 0.0), + Vectors.dense(0.6, -1.1, -3.0), + Vectors.sparse(3, Seq()) + ) + + val twoDegreeExpansion: Array[Vector] = Array( + Vectors.sparse(9, Array(0, 1, 2, 3, 4), Array(-2.0, 4.0, 2.3, -4.6, 5.29)), + Vectors.dense(-2.0, 4.0, 2.3, -4.6, 5.29), + Vectors.dense(new Array[Double](9)), + Vectors.dense(0.6, 0.36, -1.1, -0.66, 1.21, -3.0, -1.8, 3.3, 9.0), + Vectors.sparse(9, Array.empty, Array.empty)) + + val df = sqlContext.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", "expected") + + val polynomialExpansion = new PolynomialExpansion() + .setInputCol("features") + .setOutputCol("polyFeatures") + + polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach { + case Row(expanded: DenseVector, expected: DenseVector) => + assert(expanded ~== expected absTol 1e-1) + case Row(expanded: SparseVector, expected: SparseVector) => + assert(expanded ~== expected absTol 1e-1) + case _ => + throw new TestFailedException("Unmatched data types after polynomial expansion", 0) + } + } + + test("Polynomial expansion with setter") { + val data = Array( + Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))), + Vectors.dense(-2.0, 2.3), + Vectors.dense(0.0, 0.0, 0.0), + Vectors.dense(0.6, -1.1, -3.0), + Vectors.sparse(3, Seq()) + ) + + val threeDegreeExpansion: Array[Vector] = Array( + Vectors.sparse(19, Array(0, 1, 2, 3, 4, 5, 6, 7, 8), + Array(-2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17)), + Vectors.dense(-2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17), + Vectors.dense(new Array[Double](19)), + Vectors.dense(0.6, 0.36, 0.216, -1.1, -0.66, -0.396, 1.21, 0.726, -1.331, -3.0, -1.8, + -1.08, 3.3, 1.98, -3.63, 9.0, 5.4, -9.9, -27.0), + Vectors.sparse(19, Array.empty, Array.empty)) + + val df = sqlContext.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", "expected") + + val polynomialExpansion = new PolynomialExpansion() + .setInputCol("features") + .setOutputCol("polyFeatures") + .setDegree(3) + + polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach { + case Row(expanded: DenseVector, expected: DenseVector) => + assert(expanded ~== expected absTol 1e-1) + case Row(expanded: SparseVector, expected: SparseVector) => + assert(expanded ~== expected absTol 1e-1) + case _ => + throw new TestFailedException("Unmatched data types after polynomial expansion", 0) + } + } +} + diff --git a/pom.xml b/pom.xml index bcc2f57f1af5d..4b0b0c85eff21 100644 --- a/pom.xml +++ b/pom.xml @@ -146,7 +146,7 @@ 0.7.1 1.8.3 1.1.0 - 4.2.6 + 4.3.2 3.4.1 ${project.build.directory}/spark-test-classpath.txt 2.10.4 @@ -420,6 +420,16 @@ jsr305 1.3.9 + + org.apache.httpcomponents + httpclient + ${commons.httpclient.version} + + + org.apache.httpcomponents + httpcore + ${commons.httpclient.version} + org.seleniumhq.selenium selenium-java diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml index 04440076a26a3..21dce8d8a565a 100644 --- a/sql/hive/pom.xml +++ b/sql/hive/pom.xml @@ -59,6 +59,11 @@ ${hive.group} hive-exec + + org.apache.httpcomponents + httpclient + ${commons.httpclient.version} + org.codehaus.jackson jackson-mapper-asl