diff --git a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala index aeb985dc0..ce81ceb05 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/SparkRestClient.scala @@ -33,6 +33,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary, JobData, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl, JobDataImpl, StageDataImpl} import com.linkedin.drelephant.util.SparkUtils import javax.ws.rs.client.{Client, ClientBuilder, WebTarget} import javax.ws.rs.core.MediaType @@ -125,9 +126,9 @@ class SparkRestClient(sparkConf: SparkConf) { (applicationInfo, attemptTarget) } - private def getApplicationInfo(appTarget: WebTarget): ApplicationInfo = { + private def getApplicationInfo(appTarget: WebTarget): ApplicationInfoImpl = { try { - get(appTarget, SparkRestObjectMapper.readValue[ApplicationInfo]) + get(appTarget, SparkRestObjectMapper.readValue[ApplicationInfoImpl]) } catch { case NonFatal(e) => { logger.error(s"error reading applicationInfo ${appTarget.getUri}", e) @@ -177,10 +178,10 @@ class SparkRestClient(sparkConf: SparkConf) { } } - private def getJobDatas(attemptTarget: WebTarget): Seq[JobData] = { + private def getJobDatas(attemptTarget: WebTarget): Seq[JobDataImpl] = { val target = attemptTarget.path("jobs") try { - get(target, SparkRestObjectMapper.readValue[Seq[JobData]]) + get(target, SparkRestObjectMapper.readValue[Seq[JobDataImpl]]) } catch { case NonFatal(e) => { logger.error(s"error reading jobData ${target.getUri}", e) @@ -189,10 +190,10 @@ class SparkRestClient(sparkConf: SparkConf) { } } - private def getStageDatas(attemptTarget: WebTarget): Seq[StageData] = { + private def getStageDatas(attemptTarget: WebTarget): Seq[StageDataImpl] = { val target = attemptTarget.path("stages") try { - get(target, SparkRestObjectMapper.readValue[Seq[StageData]]) + get(target, SparkRestObjectMapper.readValue[Seq[StageDataImpl]]) } catch { case NonFatal(e) => { logger.error(s"error reading stageData ${target.getUri}", e) @@ -201,10 +202,10 @@ class SparkRestClient(sparkConf: SparkConf) { } } - private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummary] = { + private def getExecutorSummaries(attemptTarget: WebTarget): Seq[ExecutorSummaryImpl] = { val target = attemptTarget.path("executors") try { - get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummary]]) + get(target, SparkRestObjectMapper.readValue[Seq[ExecutorSummaryImpl]]) } catch { case NonFatal(e) => { logger.error(s"error reading executorSummary ${target.getUri}", e) diff --git a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala index d586d4b0f..1b013c0f3 100644 --- a/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala +++ b/app/com/linkedin/drelephant/spark/fetchers/statusapiv1/statusapiv1.scala @@ -43,208 +43,418 @@ import scala.collection.Map import org.apache.spark.JobExecutionStatus import org.apache.spark.status.api.v1.StageStatus +import com.fasterxml.jackson.annotation.JsonSubTypes.Type +import com.fasterxml.jackson.annotation.{JsonSubTypes, JsonTypeInfo} -class ApplicationInfo( - val id: String, - val name: String, - val attempts: Seq[ApplicationAttemptInfo]) - -class ApplicationAttemptInfo( - val attemptId: Option[String], - val startTime: Date, - val endTime: Date, - val sparkUser: String, - val completed: Boolean = false) - -class ExecutorStageSummary( - val taskTime : Long, - val failedTasks : Int, - val succeededTasks : Int, - val inputBytes : Long, - val outputBytes : Long, - val shuffleRead : Long, - val shuffleWrite : Long, - val memoryBytesSpilled : Long, - val diskBytesSpilled : Long) - -class ExecutorSummary( - val id: String, - val hostPort: String, - val rddBlocks: Int, - val memoryUsed: Long, - val diskUsed: Long, - val activeTasks: Int, - val failedTasks: Int, - val completedTasks: Int, - val totalTasks: Int, - val totalDuration: Long, - val totalInputBytes: Long, - val totalShuffleRead: Long, - val totalShuffleWrite: Long, - val maxMemory: Long, - val executorLogs: Map[String, String]) - -class JobData( - val jobId: Int, - val name: String, - val description: Option[String], - val submissionTime: Option[Date], - val completionTime: Option[Date], - val stageIds: Seq[Int], - val jobGroup: Option[String], - val status: JobExecutionStatus, - val numTasks: Int, - val numActiveTasks: Int, - val numCompletedTasks: Int, - val numSkippedTasks: Int, - val numFailedTasks: Int, - val numActiveStages: Int, - val numCompletedStages: Int, - val numSkippedStages: Int, - val numFailedStages: Int) +trait ApplicationInfo { + def id: String + def name: String + def attempts: Seq[ApplicationAttemptInfo] +} + +trait ApplicationAttemptInfo{ + def attemptId: Option[String] + def startTime: Date + def endTime: Date + def sparkUser: String + def completed: Boolean +} + +trait ExecutorStageSummary{ + def taskTime : Long + def failedTasks : Int + def succeededTasks : Int + def inputBytes : Long + def outputBytes : Long + def shuffleRead : Long + def shuffleWrite : Long + def memoryBytesSpilled : Long + def diskBytesSpilled : Long +} + +trait ExecutorSummary{ + def id: String + def hostPort: String + def rddBlocks: Int + def memoryUsed: Long + def diskUsed: Long + def activeTasks: Int + def failedTasks: Int + def completedTasks: Int + def totalTasks: Int + def totalDuration: Long + def totalInputBytes: Long + def totalShuffleRead: Long + def totalShuffleWrite: Long + def maxMemory: Long + def executorLogs: Map[String, String]} + +trait JobData{ + def jobId: Int + def name: String + def description: Option[String] + def submissionTime: Option[Date] + def completionTime: Option[Date] + def stageIds: Seq[Int] + def jobGroup: Option[String] + def status: JobExecutionStatus + def numTasks: Int + def numActiveTasks: Int + def numCompletedTasks: Int + def numSkippedTasks: Int + def numFailedTasks: Int + def numActiveStages: Int + def numCompletedStages: Int + def numSkippedStages: Int + def numFailedStages: Int} + +// Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage +// page ... does anybody pay attention to it? +trait RDDStorageInfo{ + def id: Int + def name: String + def numPartitions: Int + def numCachedPartitions: Int + def storageLevel: String + def memoryUsed: Long + def diskUsed: Long + def dataDistribution: Option[Seq[RDDDataDistribution]] + def partitions: Option[Seq[RDDPartitionInfo]]} + +trait RDDDataDistribution{ + def address: String + def memoryUsed: Long + def memoryRemaining: Long + def diskUsed: Long} + +trait RDDPartitionInfo{ + def blockName: String + def storageLevel: String + def memoryUsed: Long + def diskUsed: Long + def executors: Seq[String]} + +trait StageData{ + def status: StageStatus + def stageId: Int + def attemptId: Int + def numActiveTasks: Int + def numCompleteTasks: Int + def numFailedTasks: Int + + def executorRunTime: Long + + def inputBytes: Long + def inputRecords: Long + def outputBytes: Long + def outputRecords: Long + def shuffleReadBytes: Long + def shuffleReadRecords: Long + def shuffleWriteBytes: Long + def shuffleWriteRecords: Long + def memoryBytesSpilled: Long + def diskBytesSpilled: Long + + def name: String + def details: String + def schedulingPool: String + + def accumulatorUpdates: Seq[AccumulableInfo] + def tasks: Option[Map[Long, TaskData]] + def executorSummary: Option[Map[String, ExecutorStageSummary]]} + +trait TaskData{ + def taskId: Long + def index: Int + def attempt: Int + def launchTime: Date + def executorId: String + def host: String + def taskLocality: String + def speculative: Boolean + def accumulatorUpdates: Seq[AccumulableInfo] + def errorMessage: Option[String] + def taskMetrics: Option[TaskMetrics]} + +trait TaskMetrics{ + def executorDeserializeTime: Long + def executorRunTime: Long + def resultSize: Long + def jvmGcTime: Long + def resultSerializationTime: Long + def memoryBytesSpilled: Long + def diskBytesSpilled: Long + def inputMetrics: Option[InputMetrics] + def outputMetrics: Option[OutputMetrics] + def shuffleReadMetrics: Option[ShuffleReadMetrics] + def shuffleWriteMetrics: Option[ShuffleWriteMetrics]} + +trait InputMetrics{ + def bytesRead: Long + def recordsRead: Long} + +trait OutputMetrics{ + def bytesWritten: Long + def recordsWritten: Long} + +trait ShuffleReadMetrics{ + def remoteBlocksFetched: Int + def localBlocksFetched: Int + def fetchWaitTime: Long + def remoteBytesRead: Long + def totalBlocksFetched: Int + def recordsRead: Long} + +trait ShuffleWriteMetrics{ + def bytesWritten: Long + def writeTime: Long + def recordsWritten: Long} + +trait TaskMetricDistributions{ + def quantiles: IndexedSeq[Double] + + def executorDeserializeTime: IndexedSeq[Double] + def executorRunTime: IndexedSeq[Double] + def resultSize: IndexedSeq[Double] + def jvmGcTime: IndexedSeq[Double] + def resultSerializationTime: IndexedSeq[Double] + def memoryBytesSpilled: IndexedSeq[Double] + def diskBytesSpilled: IndexedSeq[Double] + + def inputMetrics: Option[InputMetricDistributions] + def outputMetrics: Option[OutputMetricDistributions] + def shuffleReadMetrics: Option[ShuffleReadMetricDistributions] + def shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]} + +trait InputMetricDistributions{ + def bytesRead: IndexedSeq[Double] + def recordsRead: IndexedSeq[Double]} + +trait OutputMetricDistributions{ + def bytesWritten: IndexedSeq[Double] + def recordsWritten: IndexedSeq[Double]} + +trait ShuffleReadMetricDistributions{ + def readBytes: IndexedSeq[Double] + def readRecords: IndexedSeq[Double] + def remoteBlocksFetched: IndexedSeq[Double] + def localBlocksFetched: IndexedSeq[Double] + def fetchWaitTime: IndexedSeq[Double] + def remoteBytesRead: IndexedSeq[Double] + def totalBlocksFetched: IndexedSeq[Double]} + +trait ShuffleWriteMetricDistributions{ + def writeBytes: IndexedSeq[Double] + def writeRecords: IndexedSeq[Double] + def writeTime: IndexedSeq[Double]} + +trait AccumulableInfo{ + def id: Long + def name: String + def update: Option[String] + def value: String} + +class ApplicationInfoImpl( + var id: String, + var name: String, + var attempts: Seq[ApplicationAttemptInfoImpl]) extends ApplicationInfo + +class ApplicationAttemptInfoImpl( + var attemptId: Option[String], + var startTime: Date, + var endTime: Date, + var sparkUser: String, + var completed: Boolean = false) extends ApplicationAttemptInfo + +class ExecutorStageSummaryImpl( + var taskTime : Long, + var failedTasks : Int, + var succeededTasks : Int, + var inputBytes : Long, + var outputBytes : Long, + var shuffleRead : Long, + var shuffleWrite : Long, + var memoryBytesSpilled : Long, + var diskBytesSpilled : Long) extends ExecutorStageSummary + +class ExecutorSummaryImpl( + var id: String, + var hostPort: String, + var rddBlocks: Int, + var memoryUsed: Long, + var diskUsed: Long, + var activeTasks: Int, + var failedTasks: Int, + var completedTasks: Int, + var totalTasks: Int, + var totalDuration: Long, + var totalInputBytes: Long, + var totalShuffleRead: Long, + var totalShuffleWrite: Long, + var maxMemory: Long, + var executorLogs: Map[String, String]) extends ExecutorSummary + +class JobDataImpl( + var jobId: Int, + var name: String, + var description: Option[String], + var submissionTime: Option[Date], + var completionTime: Option[Date], + var stageIds: Seq[Int], + var jobGroup: Option[String], + var status: JobExecutionStatus, + var numTasks: Int, + var numActiveTasks: Int, + var numCompletedTasks: Int, + var numSkippedTasks: Int, + var numFailedTasks: Int, + var numActiveStages: Int, + var numCompletedStages: Int, + var numSkippedStages: Int, + var numFailedStages: Int) extends JobData // Q: should Tachyon size go in here as well? currently the UI only shows it on the overall storage // page ... does anybody pay attention to it? -class RDDStorageInfo( - val id: Int, - val name: String, - val numPartitions: Int, - val numCachedPartitions: Int, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val dataDistribution: Option[Seq[RDDDataDistribution]], - val partitions: Option[Seq[RDDPartitionInfo]]) - -class RDDDataDistribution( - val address: String, - val memoryUsed: Long, - val memoryRemaining: Long, - val diskUsed: Long) - -class RDDPartitionInfo( - val blockName: String, - val storageLevel: String, - val memoryUsed: Long, - val diskUsed: Long, - val executors: Seq[String]) - -class StageData( - val status: StageStatus, - val stageId: Int, - val attemptId: Int, - val numActiveTasks: Int , - val numCompleteTasks: Int, - val numFailedTasks: Int, - - val executorRunTime: Long, - - val inputBytes: Long, - val inputRecords: Long, - val outputBytes: Long, - val outputRecords: Long, - val shuffleReadBytes: Long, - val shuffleReadRecords: Long, - val shuffleWriteBytes: Long, - val shuffleWriteRecords: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - - val name: String, - val details: String, - val schedulingPool: String, - - val accumulatorUpdates: Seq[AccumulableInfo], - val tasks: Option[Map[Long, TaskData]], - val executorSummary: Option[Map[String, ExecutorStageSummary]]) - -class TaskData( - val taskId: Long, - val index: Int, - val attempt: Int, - val launchTime: Date, - val executorId: String, - val host: String, - val taskLocality: String, - val speculative: Boolean, - val accumulatorUpdates: Seq[AccumulableInfo], - val errorMessage: Option[String] = None, - val taskMetrics: Option[TaskMetrics] = None) - -class TaskMetrics( - val executorDeserializeTime: Long, - val executorRunTime: Long, - val resultSize: Long, - val jvmGcTime: Long, - val resultSerializationTime: Long, - val memoryBytesSpilled: Long, - val diskBytesSpilled: Long, - val inputMetrics: Option[InputMetrics], - val outputMetrics: Option[OutputMetrics], - val shuffleReadMetrics: Option[ShuffleReadMetrics], - val shuffleWriteMetrics: Option[ShuffleWriteMetrics]) - -class InputMetrics( - val bytesRead: Long, - val recordsRead: Long) - -class OutputMetrics( - val bytesWritten: Long, - val recordsWritten: Long) - -class ShuffleReadMetrics( - val remoteBlocksFetched: Int, - val localBlocksFetched: Int, - val fetchWaitTime: Long, - val remoteBytesRead: Long, - val totalBlocksFetched: Int, - val recordsRead: Long) - -class ShuffleWriteMetrics( - val bytesWritten: Long, - val writeTime: Long, - val recordsWritten: Long) - -class TaskMetricDistributions( - val quantiles: IndexedSeq[Double], - - val executorDeserializeTime: IndexedSeq[Double], - val executorRunTime: IndexedSeq[Double], - val resultSize: IndexedSeq[Double], - val jvmGcTime: IndexedSeq[Double], - val resultSerializationTime: IndexedSeq[Double], - val memoryBytesSpilled: IndexedSeq[Double], - val diskBytesSpilled: IndexedSeq[Double], - - val inputMetrics: Option[InputMetricDistributions], - val outputMetrics: Option[OutputMetricDistributions], - val shuffleReadMetrics: Option[ShuffleReadMetricDistributions], - val shuffleWriteMetrics: Option[ShuffleWriteMetricDistributions]) - -class InputMetricDistributions( - val bytesRead: IndexedSeq[Double], - val recordsRead: IndexedSeq[Double]) - -class OutputMetricDistributions( - val bytesWritten: IndexedSeq[Double], - val recordsWritten: IndexedSeq[Double]) - -class ShuffleReadMetricDistributions( - val readBytes: IndexedSeq[Double], - val readRecords: IndexedSeq[Double], - val remoteBlocksFetched: IndexedSeq[Double], - val localBlocksFetched: IndexedSeq[Double], - val fetchWaitTime: IndexedSeq[Double], - val remoteBytesRead: IndexedSeq[Double], - val totalBlocksFetched: IndexedSeq[Double]) - -class ShuffleWriteMetricDistributions( - val writeBytes: IndexedSeq[Double], - val writeRecords: IndexedSeq[Double], - val writeTime: IndexedSeq[Double]) - -class AccumulableInfo( - val id: Long, - val name: String, - val update: Option[String], - val value: String) +class RDDStorageInfoImpl( + var id: Int, + var name: String, + var numPartitions: Int, + var numCachedPartitions: Int, + var storageLevel: String, + var memoryUsed: Long, + var diskUsed: Long, + var dataDistribution: Option[Seq[RDDDataDistributionImpl]], + var partitions: Option[Seq[RDDPartitionInfoImpl]]) extends RDDStorageInfo + +class RDDDataDistributionImpl( + var address: String, + var memoryUsed: Long, + var memoryRemaining: Long, + var diskUsed: Long) extends RDDDataDistribution + +class RDDPartitionInfoImpl( + var blockName: String, + var storageLevel: String, + var memoryUsed: Long, + var diskUsed: Long, + var executors: Seq[String]) extends RDDPartitionInfo + +class StageDataImpl( + var status: StageStatus, + var stageId: Int, + var attemptId: Int, + var numActiveTasks: Int , + var numCompleteTasks: Int, + var numFailedTasks: Int, + + var executorRunTime: Long, + + var inputBytes: Long, + var inputRecords: Long, + var outputBytes: Long, + var outputRecords: Long, + var shuffleReadBytes: Long, + var shuffleReadRecords: Long, + var shuffleWriteBytes: Long, + var shuffleWriteRecords: Long, + var memoryBytesSpilled: Long, + var diskBytesSpilled: Long, + + var name: String, + var details: String, + var schedulingPool: String, + + var accumulatorUpdates: Seq[AccumulableInfoImpl], + var tasks: Option[Map[Long, TaskData]], + var executorSummary: Option[Map[String, ExecutorStageSummaryImpl]]) extends StageData + +class TaskDataImpl( + var taskId: Long, + var index: Int, + var attempt: Int, + var launchTime: Date, + var executorId: String, + var host: String, + var taskLocality: String, + var speculative: Boolean, + var accumulatorUpdates: Seq[AccumulableInfoImpl], + var errorMessage: Option[String] = None, + var taskMetrics: Option[TaskMetricsImpl] = None) extends TaskData + +class TaskMetricsImpl( + var executorDeserializeTime: Long, + var executorRunTime: Long, + var resultSize: Long, + var jvmGcTime: Long, + var resultSerializationTime: Long, + var memoryBytesSpilled: Long, + var diskBytesSpilled: Long, + var inputMetrics: Option[InputMetricsImpl], + var outputMetrics: Option[OutputMetricsImpl], + var shuffleReadMetrics: Option[ShuffleReadMetricsImpl], + var shuffleWriteMetrics: Option[ShuffleWriteMetricsImpl]) extends TaskMetrics + +class InputMetricsImpl( + var bytesRead: Long, + var recordsRead: Long) extends InputMetrics + +class OutputMetricsImpl( + var bytesWritten: Long, + var recordsWritten: Long) extends OutputMetrics + +class ShuffleReadMetricsImpl( + var remoteBlocksFetched: Int, + var localBlocksFetched: Int, + var fetchWaitTime: Long, + var remoteBytesRead: Long, + var totalBlocksFetched: Int, + var recordsRead: Long) extends ShuffleReadMetrics + +class ShuffleWriteMetricsImpl( + var bytesWritten: Long, + var writeTime: Long, + var recordsWritten: Long) extends ShuffleWriteMetrics + +class TaskMetricDistributionsImpl( + var quantiles: IndexedSeq[Double], + + var executorDeserializeTime: IndexedSeq[Double], + var executorRunTime: IndexedSeq[Double], + var resultSize: IndexedSeq[Double], + var jvmGcTime: IndexedSeq[Double], + var resultSerializationTime: IndexedSeq[Double], + var memoryBytesSpilled: IndexedSeq[Double], + var diskBytesSpilled: IndexedSeq[Double], + + var inputMetrics: Option[InputMetricDistributionsImpl], + var outputMetrics: Option[OutputMetricDistributionsImpl], + var shuffleReadMetrics: Option[ShuffleReadMetricDistributionsImpl], + var shuffleWriteMetrics: Option[ShuffleWriteMetricDistributionsImpl]) extends TaskMetricDistributions + +class InputMetricDistributionsImpl( + var bytesRead: IndexedSeq[Double], + var recordsRead: IndexedSeq[Double]) extends InputMetricDistributions + +class OutputMetricDistributionsImpl( + var bytesWritten: IndexedSeq[Double], + var recordsWritten: IndexedSeq[Double]) extends OutputMetricDistributions + +class ShuffleReadMetricDistributionsImpl( + var readBytes: IndexedSeq[Double], + var readRecords: IndexedSeq[Double], + var remoteBlocksFetched: IndexedSeq[Double], + var localBlocksFetched: IndexedSeq[Double], + var fetchWaitTime: IndexedSeq[Double], + var remoteBytesRead: IndexedSeq[Double], + var totalBlocksFetched: IndexedSeq[Double]) extends ShuffleReadMetricDistributions + +class ShuffleWriteMetricDistributionsImpl( + var writeBytes: IndexedSeq[Double], + var writeRecords: IndexedSeq[Double], + var writeTime: IndexedSeq[Double]) extends ShuffleWriteMetricDistributions + +class AccumulableInfoImpl( + var id: Long, + var name: String, + var update: Option[String], + var value: String) extends AccumulableInfo diff --git a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala index 2276a00f7..0c7412fe0 100644 --- a/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala +++ b/app/com/linkedin/drelephant/spark/legacydata/LegacyDataConverters.scala @@ -48,13 +48,13 @@ object LegacyDataConverters { def extractAppConfigurationProperties(legacyData: SparkApplicationData): Map[String, String] = legacyData.getEnvironmentData.getSparkProperties.asScala.toMap - def extractApplicationInfo(legacyData: SparkApplicationData): ApplicationInfo = { + def extractApplicationInfo(legacyData: SparkApplicationData): ApplicationInfoImpl = { val generalData = legacyData.getGeneralData - new ApplicationInfo( + new ApplicationInfoImpl( generalData.getApplicationId, generalData.getApplicationName, Seq( - new ApplicationAttemptInfo( + new ApplicationAttemptInfoImpl( Some("1"), new Date(generalData.getStartTime), new Date(generalData.getEndTime), @@ -65,12 +65,12 @@ object LegacyDataConverters { ) } - def extractJobDatas(legacyData: SparkApplicationData): Seq[JobData] = { + def extractJobDatas(legacyData: SparkApplicationData): Seq[JobDataImpl] = { val jobProgressData = legacyData.getJobProgressData - def extractJobData(jobId: Int): JobData = { + def extractJobData(jobId: Int): JobDataImpl = { val jobInfo = jobProgressData.getJobInfo(jobId) - new JobData( + new JobDataImpl( jobInfo.jobId, jobInfo.jobId.toString, description = None, @@ -108,9 +108,9 @@ object LegacyDataConverters { def extractStageDatas(legacyData: SparkApplicationData): Seq[StageData] = { val jobProgressData = legacyData.getJobProgressData - def extractStageData(stageAttemptId: SparkJobProgressData.StageAttemptId): StageData = { + def extractStageData(stageAttemptId: SparkJobProgressData.StageAttemptId): StageDataImpl = { val stageInfo = jobProgressData.getStageInfo(stageAttemptId.stageId, stageAttemptId.attemptId) - new StageData( + new StageDataImpl( extractStageStatus(stageAttemptId), stageAttemptId.stageId, stageAttemptId.attemptId, @@ -153,12 +153,12 @@ object LegacyDataConverters { sortedStageAttemptIds.map { stageAttemptId => extractStageData(stageAttemptId) } } - def extractExecutorSummaries(legacyData: SparkApplicationData): Seq[ExecutorSummary] = { + def extractExecutorSummaries(legacyData: SparkApplicationData): Seq[ExecutorSummaryImpl] = { val executorData = legacyData.getExecutorData - def extractExecutorSummary(executorId: String): ExecutorSummary = { + def extractExecutorSummary(executorId: String): ExecutorSummaryImpl = { val executorInfo = executorData.getExecutorInfo(executorId) - new ExecutorSummary( + new ExecutorSummaryImpl( executorInfo.execId, executorInfo.hostPort, executorInfo.rddBlocks, diff --git a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala index 2278d124e..3947fdf3f 100644 --- a/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala +++ b/test/com/linkedin/drelephant/spark/SparkMetricsAggregatorTest.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters import com.linkedin.drelephant.analysis.ApplicationType import com.linkedin.drelephant.configurations.aggregator.AggregatorConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo, ExecutorSummary} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl} import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} @@ -43,7 +43,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { val duration = 8000000L newFakeApplicationAttemptInfo(Some("1"), startTime = new Date(now - duration), endTime = new Date(now)) } - new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) + new ApplicationInfoImpl(appId, name = "app", Seq(applicationAttemptInfo)) } val executorSummaries = Seq( @@ -113,7 +113,7 @@ class SparkMetricsAggregatorTest extends FunSpec with Matchers { val duration = -8000000L newFakeApplicationAttemptInfo(Some("1"), startTime = new Date(now - duration), endTime = new Date(now)) } - new ApplicationInfo(appId, name = "app", Seq(applicationAttemptInfo)) + new ApplicationInfoImpl(appId, name = "app", Seq(applicationAttemptInfo)) } val restDerivedData = SparkRestDerivedData( applicationInfo, @@ -168,7 +168,7 @@ object SparkMetricsAggregatorTest { attemptId: Option[String], startTime: Date, endTime: Date - ): ApplicationAttemptInfo = new ApplicationAttemptInfo( + ): ApplicationAttemptInfoImpl = new ApplicationAttemptInfoImpl( attemptId, startTime, endTime, @@ -179,7 +179,7 @@ object SparkMetricsAggregatorTest { def newFakeExecutorSummary( id: String, totalDuration: Long - ): ExecutorSummary = new ExecutorSummary( + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", rddBlocks = 0, diff --git a/test/com/linkedin/drelephant/spark/data/SparkApplicationDataTest.scala b/test/com/linkedin/drelephant/spark/data/SparkApplicationDataTest.scala index 5cd686bb5..e6ec6d51d 100644 --- a/test/com/linkedin/drelephant/spark/data/SparkApplicationDataTest.scala +++ b/test/com/linkedin/drelephant/spark/data/SparkApplicationDataTest.scala @@ -20,7 +20,7 @@ import java.util.Date import scala.collection.JavaConverters -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl} import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} @@ -39,7 +39,7 @@ class SparkApplicationDataTest extends FunSpec with Matchers { } val restDerivedData = SparkRestDerivedData( - new ApplicationInfo(appId, "app", Seq(applicationAttemptInfo)), + new ApplicationInfoImpl(appId, "app", Seq(applicationAttemptInfo)), jobDatas = Seq.empty, stageDatas = Seq.empty, executorSummaries = Seq.empty @@ -72,7 +72,7 @@ object SparkApplicationDataTest { attemptId: Option[String], startTime: Date, endTime: Date - ): ApplicationAttemptInfo = new ApplicationAttemptInfo( + ): ApplicationAttemptInfoImpl = new ApplicationAttemptInfoImpl( attemptId, startTime, endTime, diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala index 6816a4288..beb4ad54c 100644 --- a/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala +++ b/test/com/linkedin/drelephant/spark/fetchers/SparkFetcherTest.scala @@ -25,7 +25,7 @@ import com.linkedin.drelephant.analysis.{AnalyticJob, ApplicationType} import com.linkedin.drelephant.configurations.fetcher.FetcherConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} import com.linkedin.drelephant.spark.fetchers.SparkFetcher.EventLogSource -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl} import com.linkedin.drelephant.util.{SparkUtils, HadoopUtils} import org.apache.log4j.Logger import org.apache.spark.SparkConf @@ -49,7 +49,7 @@ class SparkFetcherTest extends FunSpec with Matchers with MockitoSugar { val duration = 8000000L val restDerivedData = SparkRestDerivedData( - new ApplicationInfo( + new ApplicationInfoImpl( appId, "app", Seq( @@ -284,7 +284,7 @@ object SparkFetcherTest { attemptId: Option[String], startTime: Date, endTime: Date - ): ApplicationAttemptInfo = new ApplicationAttemptInfo( + ): ApplicationAttemptInfoImpl = new ApplicationAttemptInfoImpl( attemptId, startTime, endTime, diff --git a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala index c892403c6..729311b18 100644 --- a/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala +++ b/test/com/linkedin/drelephant/spark/fetchers/SparkRestClientTest.scala @@ -28,7 +28,7 @@ import scala.concurrent.ExecutionContext import scala.util.Try import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo, ExecutorSummary, JobData, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl, ExecutorSummaryImpl, JobDataImpl, StageDataImpl} import javax.ws.rs.{GET, Path, PathParam, Produces} import javax.ws.rs.core.{Application, MediaType, Response} import javax.ws.rs.ext.ContextResolver @@ -250,7 +250,7 @@ class SparkRestClientTest extends AsyncFunSpec with Matchers { "unrecognized" : "bar" }""" - val applicationAttemptInfo = objectMapper.readValue[ApplicationAttemptInfo](json) + val applicationAttemptInfo = objectMapper.readValue[ApplicationAttemptInfoImpl](json) applicationAttemptInfo.sparkUser should be("foo") } } @@ -315,11 +315,11 @@ object SparkRestClientTest { @Produces(Array(MediaType.APPLICATION_JSON)) class ApplicationResource { @GET - def getApplication(@PathParam("appId") appId: String): ApplicationInfo = { + def getApplication(@PathParam("appId") appId: String): ApplicationInfoImpl = { val t2 = System.currentTimeMillis val t1 = t2 - 1 val duration = 8000000L - new ApplicationInfo( + new ApplicationInfoImpl( APP_ID, APP_NAME, Seq( @@ -333,21 +333,21 @@ object SparkRestClientTest { @Produces(Array(MediaType.APPLICATION_JSON)) class JobsResource { @GET - def getJobs(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[JobData] = + def getJobs(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[JobDataImpl] = if (attemptId == "2") Seq.empty else throw new Exception() } @Produces(Array(MediaType.APPLICATION_JSON)) class StagesResource { @GET - def getStages(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[StageData] = + def getStages(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[StageDataImpl] = if (attemptId == "2") Seq.empty else throw new Exception() } @Produces(Array(MediaType.APPLICATION_JSON)) class ExecutorsResource { @GET - def getExecutors(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[ExecutorSummary] = + def getExecutors(@PathParam("appId") appId: String, @PathParam("attemptId") attemptId: String): Seq[ExecutorSummaryImpl] = if (attemptId == "2") Seq.empty else throw new Exception() } @@ -387,11 +387,11 @@ object SparkRestClientTest { @Produces(Array(MediaType.APPLICATION_JSON)) class ApplicationResource { @GET - def getApplication(@PathParam("appId") appId: String): ApplicationInfo = { + def getApplication(@PathParam("appId") appId: String): ApplicationInfoImpl = { val t2 = System.currentTimeMillis val t1 = t2 - 1 val duration = 8000000L - new ApplicationInfo( + new ApplicationInfoImpl( APP_ID, APP_NAME, Seq( @@ -405,21 +405,21 @@ object SparkRestClientTest { @Produces(Array(MediaType.APPLICATION_JSON)) class JobsResource { @GET - def getJobs(@PathParam("appId") appId: String): Seq[JobData] = + def getJobs(@PathParam("appId") appId: String): Seq[JobDataImpl] = Seq.empty } @Produces(Array(MediaType.APPLICATION_JSON)) class StagesResource { @GET - def getStages(@PathParam("appId") appId: String): Seq[StageData] = + def getStages(@PathParam("appId") appId: String): Seq[StageDataImpl] = Seq.empty } @Produces(Array(MediaType.APPLICATION_JSON)) class ExecutorsResource { @GET - def getExecutors(@PathParam("appId") appId: String): Seq[ExecutorSummary] = + def getExecutors(@PathParam("appId") appId: String): Seq[ExecutorSummaryImpl] = Seq.empty } @@ -436,7 +436,7 @@ object SparkRestClientTest { attemptId: Option[String], startTime: Date, endTime: Date - ): ApplicationAttemptInfo = new ApplicationAttemptInfo( + ): ApplicationAttemptInfoImpl = new ApplicationAttemptInfoImpl( attemptId, startTime, endTime, diff --git a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala index 8d189a89e..60c2e6dac 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ConfigurationHeuristicTest.scala @@ -17,7 +17,7 @@ package com.linkedin.drelephant.spark.heuristics import com.linkedin.drelephant.spark.data.SparkRestDerivedData -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfo, ApplicationInfo} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationAttemptInfoImpl, ApplicationInfoImpl} import scala.collection.JavaConverters import com.linkedin.drelephant.analysis.{ApplicationType, Severity} @@ -285,11 +285,11 @@ object ConfigurationHeuristicTest { val appId = "application_1" val startDate = new Date() val endDate = new Date(startDate.getTime() + 10000) - val applicationAttempt = new ApplicationAttemptInfo(Option("attempt1"),startDate, endDate, "sparkUser") + val applicationAttempt = new ApplicationAttemptInfoImpl(Option("attempt1"),startDate, endDate, "sparkUser") val applicationAttempts = Seq(applicationAttempt) val restDerivedData = SparkRestDerivedData( - new ApplicationInfo(appId, name = "app", applicationAttempts), + new ApplicationInfoImpl(appId, name = "app", applicationAttempts), jobDatas = Seq.empty, stageDatas = Seq.empty, executorSummaries = Seq.empty diff --git a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala index 459cdeab2..dfdcf4a15 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/ExecutorsHeuristicTest.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters import com.linkedin.drelephant.analysis.{ApplicationType, Severity, SeverityThresholds} import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, ExecutorSummary} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, ExecutorSummaryImpl} import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} @@ -234,7 +234,7 @@ object ExecutorsHeuristicTest { totalShuffleRead: Long, totalShuffleWrite: Long, maxMemory: Long - ): ExecutorSummary = new ExecutorSummary( + ): ExecutorSummaryImpl = new ExecutorSummaryImpl( id, hostPort = "", rddBlocks = 0, @@ -252,11 +252,11 @@ object ExecutorsHeuristicTest { executorLogs = Map.empty ) - def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummary]): SparkApplicationData = { + def newFakeSparkApplicationData(executorSummaries: Seq[ExecutorSummaryImpl]): SparkApplicationData = { val appId = "application_1" val restDerivedData = SparkRestDerivedData( - new ApplicationInfo(appId, name = "app", Seq.empty), + new ApplicationInfoImpl(appId, name = "app", Seq.empty), jobDatas = Seq.empty, stageDatas = Seq.empty, executorSummaries = executorSummaries diff --git a/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala index 2a992576f..240f80d7d 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/JobsHeuristicTest.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters import com.linkedin.drelephant.analysis.{ApplicationType, Severity} import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, JobData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl} import org.apache.spark.JobExecutionStatus import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.scalatest.{FunSpec, Matchers} @@ -130,7 +130,7 @@ object JobsHeuristicTest { status: JobExecutionStatus, numCompleteTasks: Int, numFailedTasks: Int - ): JobData = new JobData( + ): JobDataImpl = new JobDataImpl( jobId, name, description = None, @@ -150,11 +150,11 @@ object JobsHeuristicTest { numFailedStages = 0 ) - def newFakeSparkApplicationData(jobDatas: Seq[JobData]): SparkApplicationData = { + def newFakeSparkApplicationData(jobDatas: Seq[JobDataImpl]): SparkApplicationData = { val appId = "application_1" val restDerivedData = SparkRestDerivedData( - new ApplicationInfo(appId, name = "app", Seq.empty), + new ApplicationInfoImpl(appId, name = "app", Seq.empty), jobDatas, stageDatas = Seq.empty, executorSummaries = Seq.empty diff --git a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala index 0b980393b..ee56af37a 100644 --- a/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala +++ b/test/com/linkedin/drelephant/spark/heuristics/StagesHeuristicTest.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration.Duration import com.linkedin.drelephant.analysis.{ApplicationType, Severity} import com.linkedin.drelephant.configurations.heuristic.HeuristicConfigurationData import com.linkedin.drelephant.spark.data.{SparkApplicationData, SparkLogDerivedData, SparkRestDerivedData} -import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfo, JobData, StageData} +import com.linkedin.drelephant.spark.fetchers.statusapiv1.{ApplicationInfoImpl, JobDataImpl, StageDataImpl} import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate import org.apache.spark.status.api.v1.StageStatus import org.scalatest.{FunSpec, Matchers} @@ -143,7 +143,7 @@ object StagesHeuristicTest { numFailedTasks: Int, executorRunTime: Long, name: String - ): StageData = new StageData( + ): StageDataImpl = new StageDataImpl( status, stageId, attemptId = 0, @@ -170,13 +170,13 @@ object StagesHeuristicTest { ) def newFakeSparkApplicationData( - stageDatas: Seq[StageData], + stageDatas: Seq[StageDataImpl], appConfigurationProperties: Map[String, String] ): SparkApplicationData = { val appId = "application_1" val restDerivedData = SparkRestDerivedData( - new ApplicationInfo(appId, name = "app", Seq.empty), + new ApplicationInfoImpl(appId, name = "app", Seq.empty), jobDatas = Seq.empty, stageDatas = stageDatas, executorSummaries = Seq.empty diff --git a/test/com/linkedin/drelephant/util/InfoExtractorTest.java b/test/com/linkedin/drelephant/util/InfoExtractorTest.java index b55293030..b1c262dbf 100644 --- a/test/com/linkedin/drelephant/util/InfoExtractorTest.java +++ b/test/com/linkedin/drelephant/util/InfoExtractorTest.java @@ -26,8 +26,13 @@ import com.linkedin.drelephant.schedulers.Scheduler; import com.linkedin.drelephant.spark.data.SparkApplicationData; -import com.linkedin.drelephant.spark.fetchers.statusapiv1.ApplicationAttemptInfo; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ApplicationAttemptInfoImpl; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ApplicationInfoImpl; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummaryImpl; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.JobDataImpl; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageDataImpl; import com.linkedin.drelephant.spark.fetchers.statusapiv1.ApplicationInfo; +import com.linkedin.drelephant.spark.fetchers.statusapiv1.ApplicationAttemptInfo; import com.linkedin.drelephant.spark.fetchers.statusapiv1.ExecutorSummary; import com.linkedin.drelephant.spark.fetchers.statusapiv1.JobData; import com.linkedin.drelephant.spark.fetchers.statusapiv1.StageData; @@ -272,7 +277,7 @@ public void testLoadInfoSpark() { HadoopApplicationData data = new SparkApplicationData("application_5678", properties, - new ApplicationInfo("", "", new Vector(0,1,0)), + new ApplicationInfoImpl("", "", new Vector(0,1,0)), new Vector(0,1,0), new Vector(0,1,0), new Vector(0,1,0)); @@ -293,7 +298,7 @@ public void testLoadInfoSparkNoConfig() { HadoopApplicationData data = new SparkApplicationData("application_5678", properties, - new ApplicationInfo("", "", new Vector(0,1,0)), + new ApplicationInfoImpl("", "", new Vector(0,1,0)), new Vector(0,1,0), new Vector(0,1,0), new Vector(0,1,0)); diff --git a/test/resources/spark_event_logs/event_log_1 b/test/resources/spark_event_logs/event_log_1 index d5a5bc1f1..9507a9c8c 100644 --- a/test/resources/spark_event_logs/event_log_1 +++ b/test/resources/spark_event_logs/event_log_1 @@ -1,7 +1,7 @@ {"Event":"SparkListenerLogStart","Spark Version":"1.5.1"} {"Event":"SparkListenerBlockManagerAdded","Block Manager ID":{"Executor ID":"driver","Host":"10.20.0.71","Port":58838},"Maximum Memory":1111794647,"Timestamp":1458126388757} {"Event":"SparkListenerEnvironmentUpdate","Spark Properties":{"spark.serializer":"org.apache.spark.serializer.KryoSerializer","spark.storage.memoryFraction":"0.3","spark.driver.memory":"2G","spark.executor.instances":"900","spark.executor.memory":"1g","spark.shuffle.memoryFraction":"0.5"},"JVM Information":{},"System Properties":{},"Classpath Entries":{}} -{"Event":"SparkListenerApplicationStart","App Name":"PythonPi","App ID":"application_1457600942802_0093","Timestamp":1458126354336,"User":"hdfs"} +{"Event":"SparkListenerApplicationStart","App Name":"PythonPi","App ID":"application_1457600942802_0093","App Attempt ID":"1","Timestamp":1458126354336,"User":"hdfs"} {"Event":"SparkListenerJobStart","Job ID":0,"Submission Time":1458126390170,"Stage Infos":[{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"reduce at pi.py:39","Number of Tasks":10,"RDD Info":[{"RDD ID":1,"Name":"PythonRDD","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"","Accumulables":[]}],"Stage IDs":[0],"Properties":{"spark.rdd.scope.noOverride":"true","spark.rdd.scope":"{\"id\":\"1\",\"name\":\"collect\"}","callSite.short":"reduce at pi.py:39"}} {"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":0,"Stage Attempt ID":0,"Stage Name":"reduce at pi.py:39","Number of Tasks":10,"RDD Info":[{"RDD ID":1,"Name":"PythonRDD","Parent IDs":[0],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0},{"RDD ID":0,"Name":"ParallelCollectionRDD","Scope":"{\"id\":\"0\",\"name\":\"parallelize\"}","Parent IDs":[],"Storage Level":{"Use Disk":false,"Use Memory":false,"Use ExternalBlockStore":false,"Deserialized":false,"Replication":1},"Number of Partitions":10,"Number of Cached Partitions":0,"Memory Size":0,"ExternalBlockStore Size":0,"Disk Size":0}],"Parent IDs":[],"Details":"","Submission Time":1458126390256,"Accumulables":[]},"Properties":{"spark.rdd.scope.noOverride":"true","spark.rdd.scope":"{\"id\":\"1\",\"name\":\"collect\"}","callSite.short":"reduce at pi.py:39"}} {"Event":"SparkListenerExecutorAdded","Timestamp":1458126397624,"Executor ID":"2","Executor Info":{"Host":".hello.com","Total Cores":2,"Log Urls":{"stdout":"http://hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000003/hdfs/stdout?start=-4096","stderr":"http://hello.com:8042/node/containerlogs/container_e38_1457600942802_0093_01_000003/hdfs/stderr?start=-4096"}}}