diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 0f315b85bfca6..ea8aa346cd9f3 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -38,7 +38,7 @@ private[spark] class HashShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { val blockStreams = BlockStoreShuffleFetcher.fetchBlockStreams( - handle.shuffleId, startPartition, context) + handle.shuffleId, startPartition, context) // Wrap the streams for compression based on configuration val wrappedStreams = blockStreams.map { case (blockId, inputStream) => diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 3758a758943d4..771194225af2f 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -21,7 +21,7 @@ import java.io.InputStream import java.util.concurrent.LinkedBlockingQueue import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashSet} import scala.util.{Failure, Try} import org.apache.spark.network.buffer.ManagedBuffer @@ -78,7 +78,7 @@ final class ShuffleBlockFetcherIterator( private[this] val localBlocks = new ArrayBuffer[BlockId]() /** Remote blocks to fetch, excluding zero-sized blocks. */ - private[this] val remoteBlocks = new mutable.HashSet[BlockId]() + private[this] val remoteBlocks = new HashSet[BlockId]() /** * A queue to hold our results. This turns the asynchronous model provided by