Skip to content

Commit

Permalink
Extend FilterInputStream and use the stream directly (#613)
Browse files Browse the repository at this point in the history
* Extend FilterInputStream and use the stream directly

Instead of trying to return a composite struct. This is because codegen will attempt to cast the returned stream as a java.io.InputStream

* Invalidate sbt caches

* Update to sbt-git 1.0.0

* Revert "Invalidate sbt caches"

This reverts commit 007741c.
  • Loading branch information
mccheah committed Oct 22, 2019
1 parent 303a6dc commit 4fa14a3
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.shuffle.api;

import java.io.FilterInputStream;
import org.apache.spark.storage.BlockId;

import java.io.InputStream;
Expand All @@ -26,20 +27,15 @@
* An object defining the shuffle block and length metadata associated with the block.
* @since 3.0.0
*/
public class ShuffleBlockInputStream {
public class ShuffleBlockInputStream extends FilterInputStream {
private final BlockId blockId;
private final InputStream inputStream;

public ShuffleBlockInputStream(BlockId blockId, InputStream inputStream) {
super(inputStream);
this.blockId = blockId;
this.inputStream = inputStream;
}

public BlockId getBlockId() {
return this.blockId;
}

public InputStream getInputStream() {
return this.inputStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,7 @@ private[spark] class BlockStoreShuffleReader[K, C](
}
}.asJava).iterator()

val retryingWrappedStreams = streamsIterator.asScala.map { shuffleBlock =>
val rawReaderStream = shuffleBlock.getInputStream
val retryingWrappedStreams = streamsIterator.asScala.map { rawReaderStream =>
if (shuffleExecutorComponents.shouldWrapPartitionReaderStream()) {
if (compressShuffle) {
compressionCodec.compressedInputStream(
Expand Down
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.8.5")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "1.0.0")

addSbtPlugin("com.etsy" % "sbt-checkstyle-plugin" % "3.1.1")

Expand Down

0 comments on commit 4fa14a3

Please sign in to comment.