Skip to content

Commit

Permalink
Remove upper type bound in ShuffleWriter interface.
Browse files Browse the repository at this point in the history
This bound wasn't necessary and was causing IntelliJ to display spurious
errors when editing UnsafeShuffleWriter.java.
  • Loading branch information
JoshRosen committed May 7, 2015
1 parent cfe0ec4 commit e67f1ea
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.TaskMemoryManager;

// IntelliJ gets confused and claims that this class should be abstract, but this actually compiles
public class UnsafeShuffleWriter<K, V> implements ShuffleWriter<K, V> {
public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {

private static final int SER_BUFFER_SIZE = 1024 * 1024; // TODO: tune this
private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object();
Expand Down Expand Up @@ -102,6 +101,7 @@ public void write(Iterator<Product2<K, V>> records) {
write(JavaConversions.asScalaIterator(records));
}

@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) {
try {
final long[] partitionLengths = mergeSpills(insertRecordsIntoSorter(records));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.spark.scheduler.MapStatus
/**
* Obtained inside a map task to write out records to the shuffle system.
*/
private[spark] trait ShuffleWriter[K, V] {
private[spark] abstract class ShuffleWriter[K, V] {
/** Write a sequence of records to this task's output */
def write(records: Iterator[_ <: Product2[K, V]]): Unit
def write(records: Iterator[Product2[K, V]]): Unit

/** Close this writer, passing along whether the map completed */
def stop(success: Boolean): Option[MapStatus]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class HashShuffleWriter[K, V](
writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
override def write(records: Iterator[Product2[K, V]]): Unit = {
val iter = if (dep.aggregator.isDefined) {
if (dep.mapSideCombine) {
dep.aggregator.get.combineValuesByKey(records, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ private[spark] class SortShuffleWriter[K, V, C](
context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
override def write(records: Iterator[Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
Expand Down

0 comments on commit e67f1ea

Please sign in to comment.