From 49b0c7ed42cf8c4ab8fd7f61f402e3fcdeba8836 Mon Sep 17 00:00:00 2001 From: skakker Date: Wed, 10 Jan 2018 17:59:33 +0530 Subject: [PATCH] Peak Unified Memory Heuristic - (Depends on Custom SHS - Requires peakUnifiedMemory metric) (#281) --- app-conf/HeuristicConf.xml | 6 + .../fetchers/statusapiv1/statusapiv1.scala | 4 +- .../heuristics/UnifiedMemoryHeuristic.scala | 133 ++++++++++++++++++ .../legacydata/LegacyDataConverters.scala | 6 +- .../helpUnifiedMemoryHeuristic.scala.html | 24 ++++ .../spark/SparkMetricsAggregatorTest.scala | 3 +- .../heuristics/ExecutorGcHeuristicTest.scala | 3 +- .../ExecutorStorageSpillHeuristicTest.scala | 7 +- .../heuristics/ExecutorsHeuristicTest.scala | 3 +- .../JvmUsedMemoryHeuristicTest.scala | 7 +- .../heuristics/StagesHeuristicTest.scala | 2 +- .../UnifiedMemoryHeuristicTest.scala | 83 +++++++++++ .../drelephant/util/SparkUtilsTest.scala | 4 +- 13 files changed, 271 insertions(+), 14 deletions(-) create mode 100644 app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala create mode 100644 app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html create mode 100644 test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala diff --git a/app-conf/HeuristicConf.xml b/app-conf/HeuristicConf.xml index b243fe5dc..17a9466ee 100644 --- a/app-conf/HeuristicConf.xml +++ b/app-conf/HeuristicConf.xml @@ -193,6 +193,12 @@ com.linkedin.drelephant.spark.heuristics.StagesHeuristic views.html.help.spark.helpStagesHeuristic + + spark + Peak Unified Memory + com.linkedin.drelephant.spark.heuristics.UnifiedMemoryHeuristic + views.html.help.spark.helpUnifiedMemoryHeuristic + spark JVM Used Memory diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index 3f43a30b8..63d99eba4 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -90,6 +90,7 @@ trait ExecutorSummary{ def totalMemoryBytesSpilled: Long def executorLogs: Map[String, String] def peakJvmUsedMemory: Map[String, Long] + def peakUnifiedMemory: Map[String, Long] } trait JobData{ @@ -298,7 +299,8 @@ class ExecutorSummaryImpl( var totalGCTime: Long, var totalMemoryBytesSpilled: Long, var executorLogs: Map[String, String], - var peakJvmUsedMemory: Map[String, Long]) extends ExecutorSummary + var peakJvmUsedMemory: Map[String, Long], + var peakUnifiedMemory: Map[String, Long]) extends ExecutorSummary class JobDataImpl( var jobId: Int, diff --git a/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala b/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala new file mode 100644 index 000000000..3b6f54cfb --- /dev/null +++ b/app/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristic.scala @@ -0,0 +1,133 @@ +/* + * Copyright 2016 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis._ +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.SparkApplicationData +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary +import com.linkedin.drelephant.util.MemoryFormatUtils + +import scala.collection.JavaConverters + + +/** + * A heuristic based on peak unified memory for the spark executors + * + * This heuristic reports the fraction of memory used/ memory allocated and if the fraction can be reduced. Also, it checks for the skew in peak unified memory and reports if the skew is too much. + */ +class UnifiedMemoryHeuristic(private val heuristicConfigurationData: HeuristicConfigurationData) + extends Heuristic[SparkApplicationData] { + + import UnifiedMemoryHeuristic._ + import JavaConverters._ + + override def getHeuristicConfData(): HeuristicConfigurationData = heuristicConfigurationData + + lazy val peakUnifiedMemoryThresholdString: String = heuristicConfigurationData.getParamMap.get(PEAK_UNIFIED_MEMORY_THRESHOLD_KEY) + + override def apply(data: SparkApplicationData): HeuristicResult = { + val evaluator = new Evaluator(this, data) + + var resultDetails = Seq( + new HeuristicResultDetails("Unified Memory Space Allocated", MemoryFormatUtils.bytesToString(evaluator.maxMemory)), + new HeuristicResultDetails("Mean peak unified memory", MemoryFormatUtils.bytesToString(evaluator.meanUnifiedMemory)), + new HeuristicResultDetails("Max peak unified memory", MemoryFormatUtils.bytesToString(evaluator.maxUnifiedMemory)), + new HeuristicResultDetails("spark.executor.memory", MemoryFormatUtils.bytesToString(evaluator.sparkExecutorMemory)), + new HeuristicResultDetails("spark.memory.fraction", evaluator.sparkMemoryFraction.toString) + ) + + val result = new HeuristicResult( + heuristicConfigurationData.getClassName, + heuristicConfigurationData.getHeuristicName, + evaluator.severity, + 0, + resultDetails.asJava + ) + result + } +} + +object UnifiedMemoryHeuristic { + + val EXECUTION_MEMORY = "executionMemory" + val STORAGE_MEMORY = "storageMemory" + val SPARK_EXECUTOR_MEMORY_KEY = "spark.executor.memory" + val SPARK_MEMORY_FRACTION_KEY = "spark.memory.fraction" + val PEAK_UNIFIED_MEMORY_THRESHOLD_KEY = "peak_unified_memory_threshold" + + class Evaluator(unifiedMemoryHeuristic: UnifiedMemoryHeuristic, data: SparkApplicationData) { + lazy val appConfigurationProperties: Map[String, String] = + data.appConfigurationProperties + + lazy val DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD: SeverityThresholds = SeverityThresholds(low = 0.7 * maxMemory, moderate = 0.6 * maxMemory, severe = 0.4 * maxMemory, critical = 0.2 * maxMemory, ascending = false) + + lazy val executorSummaries: Seq[ExecutorSummary] = data.executorSummaries + if (executorSummaries == null) { + throw new Exception("Executors Summary is null.") + } + + val executorList: Seq[ExecutorSummary] = executorSummaries.filterNot(_.id.equals("driver")) + if (executorList.isEmpty) { + throw new Exception("No executor information available.") + } + + //allocated memory for the unified region + val maxMemory: Long = executorList.head.maxMemory + + val PEAK_UNIFIED_MEMORY_THRESHOLDS: SeverityThresholds = if (unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString == null) { + DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD + } else { + SeverityThresholds.parse(unifiedMemoryHeuristic.peakUnifiedMemoryThresholdString.split(",").map(_.toDouble * maxMemory).toString, ascending = false).getOrElse(DEFAULT_PEAK_UNIFIED_MEMORY_THRESHOLD) + } + + def getPeakUnifiedMemoryExecutorSeverity(executorSummary: ExecutorSummary): Severity = { + return PEAK_UNIFIED_MEMORY_THRESHOLDS.severityOf(executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue + + executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue) + } + + val sparkExecutorMemory: Long = (appConfigurationProperties.get(SPARK_EXECUTOR_MEMORY_KEY).map(MemoryFormatUtils.stringToBytes)).getOrElse(0L) + + val sparkMemoryFraction: Double = appConfigurationProperties.getOrElse(SPARK_MEMORY_FRACTION_KEY, 0.6D).asInstanceOf[Number].doubleValue + + lazy val meanUnifiedMemory: Long = (executorList.map { + executorSummary => { + executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue + +executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue + } + }.sum) / executorList.size + + lazy val maxUnifiedMemory: Long = executorList.map { + executorSummary => { + executorSummary.peakUnifiedMemory.getOrElse(EXECUTION_MEMORY, 0).asInstanceOf[Number].longValue + +executorSummary.peakUnifiedMemory.getOrElse(STORAGE_MEMORY, 0).asInstanceOf[Number].longValue + } + }.max + + lazy val severity: Severity = { + var severityPeakUnifiedMemoryVariable: Severity = Severity.NONE + for (executorSummary <- executorList) { + var peakUnifiedMemoryExecutorSeverity: Severity = getPeakUnifiedMemoryExecutorSeverity(executorSummary) + if (peakUnifiedMemoryExecutorSeverity.getValue > severityPeakUnifiedMemoryVariable.getValue) { + severityPeakUnifiedMemoryVariable = peakUnifiedMemoryExecutorSeverity + } + } + severityPeakUnifiedMemoryVariable + } + } + +} diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 88344da5d..76c317be3 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -24,7 +24,6 @@ import scala.util.Try import com.linkedin.drelephant.spark.fetchers.statusapiv1._ import org.apache.spark.JobExecutionStatus import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus - /** * Converters for legacy SparkApplicationData to current SparkApplicationData. * @@ -46,7 +45,7 @@ object LegacyDataConverters { override def shuffleWriteRecords: Long = 0 override def inputBytes: Long = 0 override def details: String = "" - override def tasks: Option[collection.Map[Long, TaskData]] = None + override def tasks = None override def attemptId: Int = 0 override def stageId: Int = 0 override def memoryBytesSpilled: Long = 0 @@ -206,7 +205,8 @@ object LegacyDataConverters { executorInfo.totalGCTime, executorInfo.totalMemoryBytesSpilled, executorLogs = Map.empty, - peakJvmUsedMemory = Map.empty + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory = Map.empty ) } diff --git a/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html b/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html new file mode 100644 index 000000000..9ea156004 --- /dev/null +++ b/app/views/help/spark/helpUnifiedMemoryHeuristic.scala.html @@ -0,0 +1,24 @@ +@* +* Copyright 2016 LinkedIn Corp. +* +* Licensed under the Apache License, Version 2.0 (the "License"); you may not +* use this file except in compliance with the License. You may obtain a copy of +* the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +* License for the specific language governing permissions and limitations under +* the License. +*@ +

Peak Unified Memory Heuristic identifies and flags jobs which have over allocated Unified Memory region.

+

Peak Unified Memory

+

If the job's Peak Unified Memory Consumption is much smaller than the allocated Unified Memory space, then we recommend decreasing the allocated Unified Memory Region for your job.

+

Action Items

+

The Allocated Unified Memory Region can be reduced in the following ways:

+

1. If your job's Executor Memory is already low, then reduce spark.memory.fraction which will reduce the amount of space allocated to the Unified Memory Region.

+

2. If your job's Executor Memory is high, then we recommend reducing the spark.executor.memory itself which will lower the Allocated Unified Memory space.

+

Note:

+

spark.memory.fraction: This is the fraction of JVM Used Memory (Executor memory - Reserved memory) dedicated to the unified memory region (execution + storage). It basically partitions user memory from execution and storage memory.

diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index c979d5157..d7d6acc0a 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -199,6 +199,7 @@ object SparkMetricsAggregatorTest { totalGCTime = 0, totalMemoryBytesSpilled = 0, executorLogs = Map.empty, - peakJvmUsedMemory = Map.empty + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory = Map.empty ) } diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala index 765b08889..373cc56b0 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorGcHeuristicTest.scala @@ -121,7 +121,8 @@ object ExecutorGcHeuristicTest { totalGCTime, totalMemoryBytesSpilled = 0, executorLogs = Map.empty, - peakJvmUsedMemory = Map.empty + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory = Map.empty ) def newFakeSparkApplicationData( diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala index 2a061e8d3..d11ca0dd1 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorStorageSpillHeuristicTest.scala @@ -111,7 +111,9 @@ object ExecutorStorageSpillHeuristicTest { maxMemory= 0, totalGCTime = 0, totalMemoryBytesSpilled, - executorLogs = Map.empty + executorLogs = Map.empty, + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory = Map.empty ) def newFakeSparkApplicationData( @@ -124,7 +126,8 @@ object ExecutorStorageSpillHeuristicTest { new ApplicationInfoImpl(appId, name = "app", Seq.empty), jobDatas = Seq.empty, stageDatas = Seq.empty, - executorSummaries = executorSummaries + executorSummaries = executorSummaries, + stagesWithFailedTasks = Seq.empty ) val logDerivedData = SparkLogDerivedData( diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index c83d9cbfb..8844b3aaf 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -252,7 +252,8 @@ object ExecutorsHeuristicTest { totalGCTime = 0, totalMemoryBytesSpilled = 0, executorLogs = Map.empty, - peakJvmUsedMemory = Map.empty + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory = Map.empty ) def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = { diff --git a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala index f138de427..8f80a7a39 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/JvmUsedMemoryHeuristicTest.scala @@ -86,8 +86,10 @@ object JvmUsedMemoryHeuristicTest { totalShuffleWrite = 0, maxMemory = 0, totalGCTime = 0, + totalMemoryBytesSpilled = 0, executorLogs = Map.empty, - peakJvmUsedMemory + peakJvmUsedMemory, + peakUnifiedMemory = Map.empty ) def newFakeSparkApplicationData( @@ -104,7 +106,8 @@ object JvmUsedMemoryHeuristicTest { new ApplicationInfoImpl(appId, name = "app", Seq.empty), jobDatas = Seq.empty, stageDatas = Seq.empty, - executorSummaries = executorSummaries + executorSummaries = executorSummaries, + stagesWithFailedTasks = Seq.empty ) SparkApplicationData(appId, restDerivedData, Some(logDerivedData)) diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala index 92cd21bce..723f69250 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration import com.linkedin.drelephant.analysis.{ApplicationType, Severity} import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, StageDataImpl} import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageStatus import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} diff --git a/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala new file mode 100644 index 000000000..a1685838d --- /dev/null +++ b/test/com/linkedin/drelephant/spark/heuristics/UnifiedMemoryHeuristicTest.scala @@ -0,0 +1,83 @@ +package com.linkedin.drelephant.spark.heuristics + +import com.linkedin.drelephant.analysis.{ApplicationType, Severity} +import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData +import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkRestDerivedData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl} +import org.scalatest.{FunSpec, Matchers} + +import scala.collection.JavaConverters + +class UnifiedMemoryHeuristicTest extends FunSpec with Matchers { + + import UnifiedMemoryHeuristicTest._ + + val heuristicConfigurationData = newFakeHeuristicConfigurationData() + + val memoryFractionHeuristic = new UnifiedMemoryHeuristic(heuristicConfigurationData) + + val executorData = Seq( + newDummyExecutorData("1", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94567)), + newDummyExecutorData("2", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34568)), + newDummyExecutorData("3", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 34569)), + newDummyExecutorData("4", 400000, Map("executionMemory" -> 20000, "storageMemory" -> 3456)), + newDummyExecutorData("5", 400000, Map("executionMemory" -> 200000, "storageMemory" -> 34564)), + newDummyExecutorData("6", 400000, Map("executionMemory" -> 300000, "storageMemory" -> 94561)) + ) + describe(".apply") { + val data = newFakeSparkApplicationData(executorData) + val heuristicResult = memoryFractionHeuristic.apply(data) + val heuristicResultDetails = heuristicResult.getHeuristicResultDetails + + it("has severity") { + heuristicResult.getSeverity should be(Severity.CRITICAL) + } + } +} + +object UnifiedMemoryHeuristicTest { + + import JavaConverters._ + + def newFakeHeuristicConfigurationData(params: Map[String, String] = Map.empty): HeuristicConfigurationData = + new HeuristicConfigurationData("heuristic", "class", "view", new ApplicationType("type"), params.asJava) + + def newDummyExecutorData( + id: String, + maxMemory: Long, + peakUnifiedMemory: Map[String, Long] + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( + id, + hostPort = "", + rddBlocks = 0, + memoryUsed = 0, + diskUsed = 0, + activeTasks = 0, + failedTasks = 0, + completedTasks = 0, + totalTasks = 0, + totalDuration = 0, + totalInputBytes = 0, + totalShuffleRead = 0, + totalShuffleWrite = 0, + maxMemory, + totalGCTime = 0, + totalMemoryBytesSpilled = 0, + executorLogs = Map.empty, + peakJvmUsedMemory = Map.empty, + peakUnifiedMemory + ) + + def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = { + val appId = "application_1" + val restDerivedData = SparkRestDerivedData( + new ApplicationInfoImpl(appId, name = "app", Seq.empty), + jobDatas = Seq.empty, + stageDatas = Seq.empty, + executorSummaries = executorSummaries, + stagesWithFailedTasks = Seq.empty + ) + + SparkApplicationData(appId, restDerivedData, logDerivedData = None) + } +} diff --git a/test/com/linkedin/drelephant/util/SparkUtilsTest.scala b/test/com/linkedin/drelephant/util/SparkUtilsTest.scala index f8efab9d5..d2e5355d7 100644 --- a/test/com/linkedin/drelephant/util/SparkUtilsTest.scala +++ b/test/com/linkedin/drelephant/util/SparkUtilsTest.scala @@ -287,7 +287,7 @@ object SparkUtilsTest extends MockitoSugar { BDDMockito.given(fileStatus.getPath()).willReturn(expectedPath) fileStatus } - val expectedStatusArray = Array(expectedFileStatus) + val expectedStatusArray = Array(expectedFileStatus) val filter = new PathFilter() { override def accept(file: Path): Boolean = { @@ -298,7 +298,7 @@ object SparkUtilsTest extends MockitoSugar { BDDMockito.given(fs.getUri).willReturn(fileSystemUri) BDDMockito.given(fs.exists(expectedPath)).willReturn(true) BDDMockito.given(fs.getFileStatus(expectedPath)).willReturn(expectedFileStatus) - BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path( new Path(fileSystemUri), basePath)), + BDDMockito.given(fs.listStatus(org.mockito.Matchers.refEq(new Path(new Path(fileSystemUri), basePath)), org.mockito.Matchers.any(filter.getClass))). willReturn(expectedStatusArray) BDDMockito.given(fs.open(expectedPath)).willReturn(