Skip to content

Commit

Permalink
Merge github.com:apache/spark into cleanup
Browse files Browse the repository at this point in the history
Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
  • Loading branch information
andrewor14 committed Mar 27, 2014
2 parents f201a8d + 53953d0 commit c92e4d9
Show file tree
Hide file tree
Showing 154 changed files with 2,707 additions and 533 deletions.
32 changes: 32 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

language: scala
scala:
- "2.10.3"
jdk:
- oraclejdk7
env:
matrix:
- TEST="scalastyle assembly/assembly"
- TEST="catalyst/test sql/test streaming/test mllib/test graphx/test bagel/test"
- TEST=hive/test
cache:
directories:
- $HOME/.m2
- $HOME/.ivy2
- $HOME/.sbt
script:
- "sbt ++$TRAVIS_SCALA_VERSION $TEST"
1 change: 0 additions & 1 deletion bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ CLASSPATH="$SPARK_CLASSPATH:$FWDIR/conf"
# Hopefully we will find a way to avoid uber-jars entirely and deploy only the needed packages in
# the future.
if [ -f "$FWDIR"/sql/hive/target/scala-$SCALA_VERSION/spark-hive-assembly-*.jar ]; then
echo "Hive assembly found, including hive support. If this isn't desired run sbt hive/clean."

# Datanucleus jars do not work if only included in the uberjar as plugin.xml metadata is lost.
DATANUCLEUSJARS=$(JARS=("$FWDIR/lib_managed/jars"/datanucleus-*.jar); IFS=:; echo "${JARS[*]}")
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* Represents a dependency on the output of a shuffle stage.
* @param rdd the parent RDD
* @param partitioner partitioner used to partition the shuffle output
* @param serializer [[Serializer]] to use. If set to null, the default serializer, as specified
* by `spark.serializer` config option, will be used.
* @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If set to null,
* the default serializer, as specified by `spark.serializer` config option, will
* be used.
*/
class ShuffleDependency[K, V](
@transient rdd: RDD[_ <: Product2[K, V]],
Expand Down
34 changes: 21 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand Down Expand Up @@ -232,7 +233,7 @@ class SparkContext(
postEnvironmentUpdate()

/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
Expand Down Expand Up @@ -632,7 +633,7 @@ class SparkContext(
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
(initialValue: R) = {
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
}
Expand Down Expand Up @@ -772,7 +773,7 @@ class SparkContext(
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
var key = ""
Expand Down Expand Up @@ -883,7 +884,8 @@ class SparkContext(
* has overridden the call site, this will return the user's version.
*/
private[spark] def getCallSite(): String = {
Option(getLocalProperty("externalCallSite")).getOrElse(Utils.formatCallSiteInfo())
val defaultCallSite = Utils.getCallSiteInfo
Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
}

/**
Expand Down Expand Up @@ -1131,7 +1133,7 @@ object SparkContext extends Logging {
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)

implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)

implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
Expand Down Expand Up @@ -1168,27 +1170,33 @@ object SparkContext extends Logging {
}

// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}

implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)

implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)

implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)

implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)

implicit def booleanWritableConverter() =
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)

implicit def bytesWritableConverter() = {
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
}

implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)

implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
Expand Down
19 changes: 13 additions & 6 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
def saveAsTextFile(path: String): Unit = {
rdd.saveAsTextFile(path)
}


/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
rdd.saveAsTextFile(path, codec)
}

/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
def saveAsObjectFile(path: String): Unit = {
rdd.saveAsObjectFile(path)
}

/**
* Creates tuples of the elements in this RDD by applying `f`.
Expand All @@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() = rdd.checkpoint()
def checkpoint(): Unit = {
rdd.checkpoint()
}

/**
* Return whether this RDD has been checkpointed or not
Expand Down Expand Up @@ -481,7 +488,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the maximum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @param comp the comparator that defines ordering
* @return the maximum of the RDD
* */
def max(comp: Comparator[T]): T = {
Expand All @@ -491,7 +498,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Returns the minimum element from this RDD as defined by the specified
* Comparator[T].
* @params comp the comparator that defines ordering
* @param comp the comparator that defines ordering
* @return the minimum of the RDD
* */
def min(comp: Comparator[T]): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.setCheckpointDir(dir)
}

def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir)

protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) {
}

object ClientArguments {
def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar")
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
val conf = newConfiguration()
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import scala.collection.JavaConversions._

import akka.serialization.Serialization
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode

import org.apache.spark.{Logging, SparkConf}
Expand All @@ -29,7 +30,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = SparkCuratorUtil.newClient(conf)
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)

SparkCuratorUtil.mkdir(zk, WORKING_DIR)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry,
case None => CONSOLE_DEFAULT_PERIOD
}

val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry,
case None => CSV_DEFAULT_PERIOD
}

val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
val GRAPHITE_KEY_UNIT = "unit"
val GRAPHITE_KEY_PREFIX = "prefix"

def propertyToOption(prop: String) = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))

if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
throw new Exception("Graphite sink requires 'host' property.")
Expand All @@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
case None => GRAPHITE_DEFAULT_PERIOD
}

val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
array
}

override val partitioner = Some(part)
override val partitioner: Some[Partitioner] = Some(part)

override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val sparkConf = SparkEnv.get.conf
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ class HadoopRDD[K, V](
array
}

override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {

val split = theSplit.asInstanceOf[HadoopPartition]
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ class JdbcRDD[T: ClassTag](
}

object JdbcRDD {
def resultSetToObjectArray(rs: ResultSet) = {
def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class NewHadoopRDD[K, V](
result
}

override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)
Expand Down
Loading

0 comments on commit c92e4d9

Please sign in to comment.