Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hwanghw/spark3 #88

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 8 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
name := "sparklens"
organization := "com.qubole"

scalaVersion := "2.11.8"
scalaVersion := "2.12.15"

crossScalaVersions := Seq("2.10.6", "2.11.8")
crossScalaVersions := Seq("2.12.15")

spName := "qubole/sparklens"

sparkVersion := "2.0.0"
sparkVersion := "3.2.1"

spAppendScalaVersion := true


libraryDependencies += "org.apache.spark" %% "spark-core" % sparkVersion.value % "provided"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.5" % "provided"
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "3.2.1" % "provided"

libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.6" % "provided"
libraryDependencies += "org.apache.httpcomponents" % "httpclient" % "4.5.13" % "provided"

libraryDependencies += "org.apache.httpcomponents" % "httpmime" % "4.5.6" % "provided"
libraryDependencies += "org.apache.httpcomponents" % "httpmime" % "4.5.13" % "provided"

test in assembly := {}

testOptions in Test += Tests.Argument("-oF")

scalacOptions ++= Seq("-target:jvm-1.7")
scalacOptions ++= Seq("-target:jvm-1.8")

javacOptions ++= Seq("-source", "1.7", "-target", "1.7")
javacOptions ++= Seq("-source", "1.8", "-target", "1.8")

publishMavenStyle := true

Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=0.13.16
sbt.version=0.13.18
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.12.0")

resolvers += "Spark Package Main Repo" at "https://dl.bintray.com/spark-packages/maven"
resolvers += "Spark Package Main Repo" at "https://repos.spark-packages.org"

addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.4")
addSbtPlugin("org.spark-packages" % "sbt-spark-package" % "0.2.6")



Expand Down
5 changes: 2 additions & 3 deletions src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@
package com.qubole.sparklens

import java.net.URI

import com.qubole.sparklens.analyzer._
import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo}
import com.qubole.sparklens.helper.{EmailReportHelper, HDFSConfigHelper}
import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart, SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart, SparkListenerStageCompleted, SparkListenerStageSubmitted, SparkListenerTaskEnd}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -257,7 +256,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
if (stageCompleted.stageInfo.failureReason.isDefined) {
//stage failed
val si = stageCompleted.stageInfo
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptId} in job ${stageIDToJobID(si.stageId)} failed.
failedStages += s""" Stage ${si.stageId} attempt ${si.attemptNumber} in job ${stageIDToJobID(si.stageId)} failed.
Stage tasks: ${si.numTasks}
"""
stageTimeSpan.finalUpdate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package com.qubole.sparklens
import com.qubole.sparklens.analyzer.{AppAnalyzer, EfficiencyStatisticsAnalyzer, ExecutorWallclockAnalyzer, StageSkewAnalyzer}
import com.qubole.sparklens.common.AppContext
import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListenerTaskStart}
import org.apache.spark.util.SizeEstimator
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.qubole.sparklens.app

import java.io.{BufferedInputStream, InputStream}
import java.net.URI

import com.ning.compress.lzf.LZFInputStream
import com.qubole.sparklens.QuboleJobListener
import com.qubole.sparklens.common.Json4sWrapper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
package com.qubole.sparklens.app

import java.io.File
import com.qubole.sparklens.helper.HDFSConfigHelper
import org.apache.hadoop.fs.{FileSystem, Path}

object EventHistoryToSparklensJson {

def main(args:Array[String]):Unit = {
val defaultDestination = new File("/tmp/sparklens/")
val defaultDestination = new Path("/tmp/sparklens/")

val dirs = args.length match {
case 0 => (new File("."), defaultDestination)
case 1 => (new File(args(0)), defaultDestination)
case _ => (new File(args(0)), new File(args(1)))
case 0 => (new Path("."), defaultDestination)
case 1 => (new Path(args(0)), defaultDestination)
case _ => (new Path(args(0)), new Path(args(1)))
}
println("Converting Event History files to Sparklens Json files")
println(s"src: ${dirs._1.getAbsolutePath} destination: ${dirs._2.getAbsolutePath}")
println(s"src: ${dirs._1.toUri.getRawPath} destination: ${dirs._2.toUri.getRawPath}")
convert(dirs._1, dirs._2)
}

private def convert(srcLoc:File, destLoc:File): Unit = {
if (srcLoc.isFile) {
private def convert(srcLoc:Path, destLoc:Path): Unit = {
val dfs = FileSystem.get(srcLoc.toUri, HDFSConfigHelper.getHadoopConf(None))

if (dfs.getFileStatus(srcLoc).isFile) {
try {
new EventHistoryReporter(srcLoc.getAbsolutePath, List(
new EventHistoryReporter(srcLoc.toUri.getRawPath, List(
("spark.sparklens.reporting.disabled", "true"),
("spark.sparklens.save.data", "true"),
("spark.sparklens.data.dir", destLoc.getAbsolutePath)
("spark.sparklens.data.dir", destLoc.toUri.getRawPath)
))
} catch {
case e: Exception => {
Expand All @@ -32,8 +35,8 @@ object EventHistoryToSparklensJson {
}
} else {
//This is a directory. Process all files
srcLoc.listFiles().foreach( f => {
convert(f, destLoc)
dfs.listStatus(srcLoc).foreach( f => {
convert(f.getPath, destLoc)
})
}
}
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/com/qubole/sparklens/app/ReporterApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.qubole.sparklens.app

import java.io.{BufferedInputStream, InputStream}
import java.net.URI

import com.ning.compress.lzf.LZFInputStream
import com.qubole.sparklens.QuboleJobListener
import com.qubole.sparklens.analyzer.AppAnalyzer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package com.qubole.sparklens.common

import java.util.Locale

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.TaskInfo

import java.util.Locale
import org.json4s.DefaultFormats
import org.json4s.JsonAST.JValue

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.qubole.sparklens.helper

import org.apache.spark.SparkConf

import java.io.FileWriter
import java.nio.file.{Files, Paths}

import org.apache.spark.SparkConf

object EmailReportHelper {

def getTempFileLocation(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@ package com.qubole.sparklens.helper

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sparklens.SparklensSparkHadoopUtil

object HDFSConfigHelper {

def getHadoopConf(sparkConfOptional:Option[SparkConf]): Configuration = {
if (sparkConfOptional.isDefined) {
SparkHadoopUtil.get.newConfiguration(sparkConfOptional.get)
SparklensSparkHadoopUtil.newConfiguration(sparkConfOptional.get)
}else {
val sparkConf = new SparkConf()
SparkHadoopUtil.get.newConfiguration(sparkConf)
SparklensSparkHadoopUtil.newConfiguration(sparkConf)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.spark.sparklens

import org.apache.spark.deploy.SparkHadoopUtil

object SparklensSparkHadoopUtil extends SparkHadoopUtil {

}
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version in ThisBuild := "0.3.2"
version in ThisBuild := "0.6.0"