Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into fix_hashcode
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jun 19, 2015
2 parents 89c2432 + 43c7ec6 commit a0626ed
Show file tree
Hide file tree
Showing 157 changed files with 2,046 additions and 560 deletions.
1 change: 1 addition & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -950,3 +950,4 @@ The following components are provided under the MIT License. See project link fo
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class SerializableWritable[T <: Writable](@transient var t: T) extends Serializa
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
val ow = new ObjectWritable()
ow.setConf(new Configuration())
ow.setConf(new Configuration(false))
ow.readFields(in)
t = ow.get().asInstanceOf[T]
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableWritable(hadoopConfiguration))
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.util.SerializableJobConf

/**
* Internal helper class that saves an RDD using a Hadoop OutputFormat.
Expand All @@ -42,7 +43,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
with Serializable {

private val now = new Date()
private val conf = new SerializableWritable(jobConf)
private val conf = new SerializableJobConf(jobConf)

private var jobID = 0
private var splitID = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.api.python

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SerializableWritable, SparkException}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.{Logging, SparkException}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io._
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -61,7 +61,7 @@ private[python] object Converter extends Logging {
* Other objects are passed through without conversion.
*/
private[python] class WritableToJavaConverter(
conf: Broadcast[SerializableWritable[Configuration]]) extends Converter[Any, Any] {
conf: Broadcast[SerializableConfiguration]) extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.api.java.{JavaPairRDD, JavaRDD, JavaSparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}

import scala.util.control.NonFatal

Expand Down Expand Up @@ -445,7 +445,7 @@ private[spark] object PythonRDD extends Logging {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration()))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -471,7 +471,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -497,7 +497,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down Expand Up @@ -540,7 +540,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(mergedConf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(mergedConf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand All @@ -566,7 +566,7 @@ private[spark] object PythonRDD extends Logging {
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClass, keyClass, valueClass, conf)
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(conf))
val confBroadcasted = sc.sc.broadcast(new SerializableConfiguration(conf))
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
new WritableToJavaConverter(confBroadcasted))
JavaRDD.fromRDD(SerDeUtil.pairRDDToPython(converted, batchSize))
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@ import java.io.IOException

import scala.reflect.ClassTag

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}

private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}

Expand All @@ -38,7 +37,7 @@ private[spark]
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {

val broadcastedConf = sc.broadcast(new SerializableWritable(sc.hadoopConfiguration))
val broadcastedConf = sc.broadcast(new SerializableConfiguration(sc.hadoopConfiguration))

@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)

Expand Down Expand Up @@ -87,7 +86,7 @@ private[spark] object CheckpointRDD extends Logging {

def writeToFile[T: ClassTag](
path: String,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
broadcastedConf: Broadcast[SerializableConfiguration],
blockSize: Int = -1
)(ctx: TaskContext, iterator: Iterator[T]) {
val env = SparkEnv.get
Expand Down Expand Up @@ -135,7 +134,7 @@ private[spark] object CheckpointRDD extends Logging {

def readFromFile[T](
path: Path,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
broadcastedConf: Broadcast[SerializableConfiguration],
context: TaskContext
): Iterator[T] = {
val env = SparkEnv.get
Expand Down Expand Up @@ -164,7 +163,7 @@ private[spark] object CheckpointRDD extends Logging {
val path = new Path(hdfsPath, "temp")
val conf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
val fs = path.getFileSystem(conf)
val broadcastedConf = sc.broadcast(new SerializableWritable(conf))
val broadcastedConf = sc.broadcast(new SerializableConfiguration(conf))
sc.runJob(rdd, CheckpointRDD.writeToFile[Int](path.toString, broadcastedConf, 1024) _)
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.{NextIterator, Utils}
import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -100,7 +100,7 @@ private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSp
@DeveloperApi
class HadoopRDD[K, V](
@transient sc: SparkContext,
broadcastedConf: Broadcast[SerializableWritable[Configuration]],
broadcastedConf: Broadcast[SerializableConfiguration],
initLocalJobConfFuncOpt: Option[JobConf => Unit],
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
Expand All @@ -121,8 +121,8 @@ class HadoopRDD[K, V](
minPartitions: Int) = {
this(
sc,
sc.broadcast(new SerializableWritable(conf))
.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
sc.broadcast(new SerializableConfiguration(conf))
.asInstanceOf[Broadcast[SerializableConfiguration]],
None /* initLocalJobConfFuncOpt */,
inputFormatClass,
keyClass,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -74,7 +74,7 @@ class NewHadoopRDD[K, V](
with Logging {

// A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
private val confBroadcast = sc.broadcast(new SerializableConfiguration(conf))
// private val serializableConf = new SerializableWritable(conf)

private val jobTrackerId: String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.executor.{DataWriteMethod, OutputMetrics}
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.serializer.Serializer
import org.apache.spark.util.Utils
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.CompactBuffer
import org.apache.spark.util.random.StratifiedSamplingUtils

Expand Down Expand Up @@ -1002,7 +1002,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
val wrappedConf = new SerializableWritable(job.getConfiguration)
val wrappedConf = new SerializableConfiguration(job.getConfiguration)
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

Expand Down Expand Up @@ -1065,7 +1065,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
// Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
val hadoopConf = conf
val wrappedConf = new SerializableWritable(hadoopConf)
val wrappedConf = new SerializableConfiguration(hadoopConf)
val outputFormatInstance = hadoopConf.getOutputFormat
val keyClass = hadoopConf.getOutputKeyClass
val valueClass = hadoopConf.getOutputValueClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark._
import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
import org.apache.spark.util.SerializableConfiguration

/**
* Enumeration to manage state transitions of an RDD through checkpointing
Expand Down Expand Up @@ -91,7 +92,7 @@ private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])

// Save to file, and reload it as an RDD
val broadcastedConf = rdd.context.broadcast(
new SerializableWritable(rdd.context.hadoopConfiguration))
new SerializableConfiguration(rdd.context.hadoopConfiguration))
val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
if (rdd.conf.getBoolean("spark.cleaner.referenceTracking.cleanCheckpoints", false)) {
rdd.context.cleaner.foreach { cleaner =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.hadoop.conf.Configuration

import org.apache.spark.util.Utils

private[spark]
class SerializableConfiguration(@transient var value: Configuration) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
value = new Configuration(false)
value.readFields(in)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.io.{ObjectInputStream, ObjectOutputStream}

import org.apache.hadoop.mapred.JobConf

import org.apache.spark.util.Utils

private[spark]
class SerializableJobConf(@transient var value: JobConf) extends Serializable {
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
out.defaultWriteObject()
value.write(out)
}

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
value = new JobConf(false)
value.readFields(in)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.annotation.DeveloperApi
* size, which is guaranteed to explore all spaces for each key (see
* http://en.wikipedia.org/wiki/Quadratic_probing).
*
* The map can support up to `536870912 (2 ^ 29)` elements.
* The map can support up to `375809638 (0.7 * 2 ^ 29)` elements.
*
* TODO: Cache the hash values of each key? java.util.HashMap does that.
*/
Expand Down Expand Up @@ -199,11 +199,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/** Increase table size by 1, rehashing if necessary */
private def incrementSize() {
if (curSize == MAXIMUM_CAPACITY) {
throw new IllegalStateException(s"Can't put more that ${MAXIMUM_CAPACITY} elements")
}
curSize += 1
if (curSize > growThreshold && capacity < MAXIMUM_CAPACITY) {
if (curSize > growThreshold) {
growTable()
}
}
Expand All @@ -216,7 +213,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)
/** Double the table's size and re-hash everything */
protected def growTable() {
// capacity < MAXIMUM_CAPACITY (2 ^ 29) so capacity * 2 won't overflow
val newCapacity = (capacity * 2).min(MAXIMUM_CAPACITY)
val newCapacity = capacity * 2
require(newCapacity <= MAXIMUM_CAPACITY, s"Can't contain more than ${growThreshold} elements")
val newData = new Array[AnyRef](2 * newCapacity)
val newMask = newCapacity - 1
// Insert all our old values into the new array. Note that because our old keys are
Expand Down
1 change: 1 addition & 0 deletions docs/_layouts/global.html
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ <h1 class="title">{{ page.title }}</h1>

<script src="js/vendor/jquery-1.8.0.min.js"></script>
<script src="js/vendor/bootstrap.min.js"></script>
<script src="js/vendor/anchor.min.js"></script>
<script src="js/main.js"></script>

<!-- MathJax Section -->
Expand Down
5 changes: 5 additions & 0 deletions docs/css/main.css
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,8 @@ ul.nav li.dropdown ul.dropdown-menu li.dropdown-submenu ul.dropdown-menu {
.MathJax .mi { color: inherit }
.MathJax .mf { color: inherit }
.MathJax .mh { color: inherit }

/**
* AnchorJS (anchor links when hovering over headers)
*/
a.anchorjs-link:hover { text-decoration: none; }
Loading

0 comments on commit a0626ed

Please sign in to comment.