diff --git a/build.sbt b/build.sbt index 814fa1f..2dfac52 100644 --- a/build.sbt +++ b/build.sbt @@ -1,32 +1,32 @@ name := "sparklens" organization := "com.qubole" -scalaVersion := "2.11.8" +scalaVersion := "2.12.15" -crossScalaVersions := Seq("2.10.6", "2.11.8") +crossScalaVersions := Seq("2.12.15") spName := "qubole/sparklens" -sparkVersion := "2.0.0" +sparkVersion := "3.2.1" spAppendScalaVersion := true libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided" -libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.5" % "provided" +libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "3.2.1" % "provided" -libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.6" % "provided" +libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.13" % "provided" -libraryDependencies += "org.apache.httpcomponents" % "httpmime" % "4.5.6" % "provided" +libraryDependencies += "org.apache.httpcomponents" % "httpmime" % "4.5.13" % "provided" test in assembly := {} testOptions in Test += Tests.Argument("-oF") -scalacOptions ++= Seq("-target:jvm-1.7") +scalacOptions ++= Seq("-target:jvm-1.8") -javacOptions ++= Seq("-source", "1.7", "-target", "1.7") +javacOptions ++= Seq("-source", "1.8", "-target", "1.8") publishMavenStyle := true diff --git a/project/build.properties b/project/build.properties index c091b86..8e682c5 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=0.13.16 +sbt.version=0.13.18 diff --git a/project/plugins.sbt b/project/plugins.sbt index 0331b00..592ca13 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,8 +1,8 @@ addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0") -resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven" +resolvers += "Spark Package Main Repo" at "https://repos.spark-packages.org" -addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.4") +addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6") diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 1a0c356..c8bf2c6 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -18,14 +18,13 @@ package com.qubole.sparklens import java.net.URI - import com.qubole.sparklens.analyzer._ import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} import com.qubole.sparklens.helper.{EmailReportHelper, HDFSConfigHelper} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf -import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd} import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -257,7 +256,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { if (stageCompleted.stageInfo.failureReason.isDefined) { //stage failed val si = stageCompleted.stageInfo - failedStages += s""" Stage ${si.stageId} attempt ${si.attemptId} in job ${stageIDToJobID(si.stageId)} failed. + failedStages += s""" Stage ${si.stageId} attempt ${si.attemptNumber} in job ${stageIDToJobID(si.stageId)} failed. Stage tasks: ${si.numTasks} """ stageTimeSpan.finalUpdate() diff --git a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala index b7a2fc4..4fe74a3 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleNotebookListener.scala @@ -19,9 +19,9 @@ package com.qubole.sparklens import com.qubole.sparklens.analyzer.{AppAnalyzer, EfficiencyStatisticsAnalyzer, ExecutorWallclockAnalyzer, StageSkewAnalyzer} import com.qubole.sparklens.common.AppContext import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerTaskStart} import org.apache.spark.util.SizeEstimator -import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable import scala.collection.mutable.ListBuffer diff --git a/src/main/scala/com/qubole/sparklens/app/EventHistoryReporter.scala b/src/main/scala/com/qubole/sparklens/app/EventHistoryReporter.scala index ee1388c..1bc0477 100644 --- a/src/main/scala/com/qubole/sparklens/app/EventHistoryReporter.scala +++ b/src/main/scala/com/qubole/sparklens/app/EventHistoryReporter.scala @@ -2,7 +2,6 @@ package com.qubole.sparklens.app import java.io.{BufferedInputStream, InputStream} import java.net.URI - import com.ning.compress.lzf.LZFInputStream import com.qubole.sparklens.QuboleJobListener import com.qubole.sparklens.common.Json4sWrapper diff --git a/src/main/scala/com/qubole/sparklens/app/EventHistoryToSparklensJson.scala b/src/main/scala/com/qubole/sparklens/app/EventHistoryToSparklensJson.scala index 4fa52ba..cb43e8a 100644 --- a/src/main/scala/com/qubole/sparklens/app/EventHistoryToSparklensJson.scala +++ b/src/main/scala/com/qubole/sparklens/app/EventHistoryToSparklensJson.scala @@ -1,29 +1,32 @@ package com.qubole.sparklens.app -import java.io.File +import com.qubole.sparklens.helper.HDFSConfigHelper +import org.apache.hadoop.fs.{FileSystem, Path} object EventHistoryToSparklensJson { def main(args:Array[String]):Unit = { - val defaultDestination = new File("/tmp/sparklens/") + val defaultDestination = new Path("/tmp/sparklens/") val dirs = args.length match { - case 0 => (new File("."), defaultDestination) - case 1 => (new File(args(0)), defaultDestination) - case _ => (new File(args(0)), new File(args(1))) + case 0 => (new Path("."), defaultDestination) + case 1 => (new Path(args(0)), defaultDestination) + case _ => (new Path(args(0)), new Path(args(1))) } println("Converting Event History files to Sparklens Json files") - println(s"src: ${dirs._1.getAbsolutePath} destination: ${dirs._2.getAbsolutePath}") + println(s"src: ${dirs._1.toUri.getRawPath} destination: ${dirs._2.toUri.getRawPath}") convert(dirs._1, dirs._2) } - private def convert(srcLoc:File, destLoc:File): Unit = { - if (srcLoc.isFile) { + private def convert(srcLoc:Path, destLoc:Path): Unit = { + val dfs = FileSystem.get(srcLoc.toUri, HDFSConfigHelper.getHadoopConf(None)) + + if (dfs.getFileStatus(srcLoc).isFile) { try { - new EventHistoryReporter(srcLoc.getAbsolutePath, List( + new EventHistoryReporter(srcLoc.toUri.getRawPath, List( ("spark.sparklens.reporting.disabled", "true"), ("spark.sparklens.save.data", "true"), - ("spark.sparklens.data.dir", destLoc.getAbsolutePath) + ("spark.sparklens.data.dir", destLoc.toUri.getRawPath) )) } catch { case e: Exception => { @@ -32,8 +35,8 @@ object EventHistoryToSparklensJson { } } else { //This is a directory. Process all files - srcLoc.listFiles().foreach( f => { - convert(f, destLoc) + dfs.listStatus(srcLoc).foreach( f => { + convert(f.getPath, destLoc) }) } } diff --git a/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala b/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala index dd3c373..f1d7aa8 100644 --- a/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala +++ b/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala @@ -2,7 +2,6 @@ package com.qubole.sparklens.app import java.io.{BufferedInputStream, InputStream} import java.net.URI - import com.ning.compress.lzf.LZFInputStream import com.qubole.sparklens.QuboleJobListener import com.qubole.sparklens.analyzer.AppAnalyzer diff --git a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala index 99a2a79..942ec72 100644 --- a/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala +++ b/src/main/scala/com/qubole/sparklens/common/AggregateMetrics.scala @@ -17,10 +17,10 @@ package com.qubole.sparklens.common -import java.util.Locale - import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.TaskInfo + +import java.util.Locale import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue diff --git a/src/main/scala/com/qubole/sparklens/helper/EmailReportHelper.scala b/src/main/scala/com/qubole/sparklens/helper/EmailReportHelper.scala index a960d88..749659e 100644 --- a/src/main/scala/com/qubole/sparklens/helper/EmailReportHelper.scala +++ b/src/main/scala/com/qubole/sparklens/helper/EmailReportHelper.scala @@ -1,10 +1,10 @@ package com.qubole.sparklens.helper +import org.apache.spark.SparkConf + import java.io.FileWriter import java.nio.file.{Files, Paths} -import org.apache.spark.SparkConf - object EmailReportHelper { def getTempFileLocation(): String = { diff --git a/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala b/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala index ea93ba8..28b4c24 100644 --- a/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala +++ b/src/main/scala/com/qubole/sparklens/helper/HDFSConfigHelper.scala @@ -19,16 +19,16 @@ package com.qubole.sparklens.helper import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkConf -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sparklens.SparklensSparkHadoopUtil object HDFSConfigHelper { def getHadoopConf(sparkConfOptional:Option[SparkConf]): Configuration = { if (sparkConfOptional.isDefined) { - SparkHadoopUtil.get.newConfiguration(sparkConfOptional.get) + SparklensSparkHadoopUtil.newConfiguration(sparkConfOptional.get) }else { val sparkConf = new SparkConf() - SparkHadoopUtil.get.newConfiguration(sparkConf) + SparklensSparkHadoopUtil.newConfiguration(sparkConf) } } } diff --git a/src/main/scala/org/apache/spark/sparklens/SparklensSparkHadoopUtil.scala b/src/main/scala/org/apache/spark/sparklens/SparklensSparkHadoopUtil.scala new file mode 100644 index 0000000..d44bd3d --- /dev/null +++ b/src/main/scala/org/apache/spark/sparklens/SparklensSparkHadoopUtil.scala @@ -0,0 +1,7 @@ +package org.apache.spark.sparklens + +import org.apache.spark.deploy.SparkHadoopUtil + +object SparklensSparkHadoopUtil extends SparkHadoopUtil { + +} diff --git a/version.sbt b/version.sbt index 8ed576b..0512f16 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.3.2" +version in ThisBuild := "0.6.0"