Skip to content

Commit

Permalink
Changed default Compression Codec from .snappy to .lz4 and changed th…
Browse files Browse the repository at this point in the history
…e spark fetcher
  • Loading branch information
swasti committed Oct 9, 2017
1 parent fc9b866 commit 656ba41
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 12 deletions.
2 changes: 1 addition & 1 deletion app-conf/FetcherConf.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
-->
<fetcher>
<applicationtype>spark</applicationtype>
<classname>com.linkedin.drelephant.spark.fetchers.FSFetcher</classname>
<classname>com.linkedin.drelephant.spark.fetchers.SparkFetcher</classname>
</fetcher>

<!--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ import scala.util.Try


class SparkMetricsAggregator(private val aggregatorConfigurationData: AggregatorConfigurationData)
extends HadoopMetricsAggregator {

extends HadoopMetricsAggregator {
import SparkMetricsAggregator._

private val logger: Logger = Logger.getLogger(classOf[SparkMetricsAggregator])
Expand Down
2 changes: 1 addition & 1 deletion app/com/linkedin/drelephant/util/SparkUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ trait SparkUtils {
}

private val IN_PROGRESS = ".inprogress"
private val DEFAULT_COMPRESSION_CODEC = "snappy"
private val DEFAULT_COMPRESSION_CODEC = "lz4"

private val compressionCodecClassNamesByShortName = Map(
"lz4" -> classOf[LZ4CompressionCodec].getName,
Expand Down
16 changes: 8 additions & 8 deletions test/com/linkedin/drelephant/util/SparkUtilsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, FileSystem, Path, Pa
import org.apache.hadoop.io.compress.CompressionInputStream
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.io.SnappyCompressionCodec
import org.apache.spark.io.{LZ4CompressionCodec, SnappyCompressionCodec}
import org.mockito.BDDMockito
import org.mockito.Matchers
import org.scalatest.{FunSpec, Matchers, OptionValues}
Expand All @@ -46,8 +46,8 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
}

val (fs, path) = sparkUtils.fileSystemAndPathForEventLogDir(hadoopConfiguration,
sparkConf,
Some("webhdfs://nn1.grid.example.com:50070/logs/spark"))
sparkConf,
Some("webhdfs://nn1.grid.example.com:50070/logs/spark"))
fs.getUri.toString should be("webhdfs://nn1.grid.example.com:50070")
path should be(new Path("/logs/spark"))
}
Expand Down Expand Up @@ -180,7 +180,7 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
val sparkUtils = SparkUtilsTest.newFakeSparkUtilsForEventLog(
new URI("webhdfs://nn1.grid.example.com:50070"),
new Path("/logs/spark"),
new Path("application_1_1.snappy"),
new Path("application_1_1.lz4"),
Array.empty[Byte]
)

Expand All @@ -189,8 +189,8 @@ class SparkUtilsTest extends FunSpec with org.scalatest.Matchers with OptionValu
val (path, codec) =
sparkUtils.pathAndCodecforEventLog(sparkConf: SparkConf, fs: FileSystem, basePath: Path, "application_1", Some("1"))

path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.snappy"))
codec.value should be(a[SnappyCompressionCodec])
path should be(new Path("webhdfs://nn1.grid.example.com:50070/logs/spark/application_1_1.lz4"))
codec.value should be(a[LZ4CompressionCodec])
}
it("returns the path and codec for the event log, given the base path and appid. Extracts attempt and codec from path") {
val hadoopConfiguration = new Configuration(false)
Expand Down Expand Up @@ -300,8 +300,8 @@ object SparkUtilsTest extends MockitoSugar {
BDDMockito.given(fs.exists(expectedPath)).willReturn(true)
BDDMockito.given(fs.getFileStatus(expectedPath)).willReturn(expectedFileStatus)
BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path( new Path(fileSystemUri), basePath)),
org.mockito.Matchers.any(filter.getClass))).
willReturn(expectedStatusArray)
org.mockito.Matchers.any(filter.getClass))).
willReturn(expectedStatusArray)
BDDMockito.given(fs.open(expectedPath)).willReturn(
new FSDataInputStream(new FakeCompressionInputStream(new ByteArrayInputStream(bytes)))
)
Expand Down

0 comments on commit 656ba41

Please sign in to comment.