Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sparklens-22: Scalability aware Autoscaling with Sparklens #23

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/main/scala/com/qubole/sparklens/QuboleJobListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ package com.qubole.sparklens
import java.net.URI

import com.qubole.sparklens.analyzer._
import com.qubole.sparklens.autoscaling.{AutoscalingPolicy}
import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo}
import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkConf
import org.apache.spark.scheduler._
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
Expand All @@ -47,6 +49,9 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
protected val stageIDToJobID = new mutable.HashMap[Int, Long]
protected val failedStages = new ListBuffer[String]
protected val appMetrics = new AggregateMetrics()
private var autoscalingPolicy: Option[AutoscalingPolicy] = None
private val log = LoggerFactory.getLogger(classOf[QuboleJobListener])


private def hostCount():Int = hostMap.size

Expand Down Expand Up @@ -141,10 +146,22 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
//println(s"Application ${applicationStart.appId} started at ${applicationStart.time}")
autoscalingPolicy = getAutoScalingPolicy()
appInfo.applicationID = applicationStart.appId.getOrElse("NA")
appInfo.startTime = applicationStart.time
}

def getAutoScalingPolicy(): Option[AutoscalingPolicy] = {
AutoscalingPolicy.init(sparkConf) match {
case Some(autoscalingSparklensClient) =>
log.info(s"Autoscaling client for sparklens exists = ${autoscalingSparklensClient}," +
s" will generate sparklens autoscaling policy")
Some(new AutoscalingPolicy(autoscalingSparklensClient, sparkConf))
case None =>
None
}
}

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
stageMap.map(x => x._2).foreach( x => x.tempTaskTimes.clear())
//println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}")
Expand All @@ -170,6 +187,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
}
}
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
autoscalingPolicy.map(_.onExecutorAdded(executorAdded))
val executorTimeSpan = executorMap.get(executorAdded.executorId)
if (!executorTimeSpan.isDefined) {
val timeSpan = new ExecutorTimeSpan(executorAdded.executorId,
Expand All @@ -187,12 +205,14 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
}

override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
autoscalingPolicy.map(_.onExecutorRemoved(executorRemoved))
val executorTimeSpan = executorMap(executorRemoved.executorId)
executorTimeSpan.setEndTime(executorRemoved.time)
//We don't get any event for host. Will not try to check when the hosts go out of service
}

override def onJobStart(jobStart: SparkListenerJobStart) {
autoscalingPolicy.map(_.scale(jobStart))
val jobTimeSpan = new JobTimeSpan(jobStart.jobId)
jobTimeSpan.setStartTime(jobStart.time)
jobMap(jobStart.jobId) = jobTimeSpan
Expand All @@ -202,6 +222,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener {
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
autoscalingPolicy.map(_.scale(jobEnd))
val jobTimeSpan = jobMap(jobEnd.jobId)
jobTimeSpan.setEndTime(jobEnd.time)
//if we miss cleaing up tasks at end of stage, clean them after end of job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.util.Date
import java.util.concurrent.TimeUnit

import com.qubole.sparklens.common.AppContext
import org.apache.spark.SparkConf

import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -84,6 +85,7 @@ object AppAnalyzer {
list += new EfficiencyStatisticsAnalyzer
list += new ExecutorWallclockAnalyzer
list += new StageSkewAnalyzer
list += new AutoscaleAnalyzer


list.foreach( x => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.qubole.sparklens.analyzer
import com.qubole
import com.qubole.sparklens.autoscaling.{AutoscalingPolicy}
import com.qubole.sparklens.chart.{Graph, Point}
import com.qubole.sparklens.common.AppContext
import org.apache.spark.SparkConf

class AutoscaleAnalyzer(conf: SparkConf = new SparkConf) extends AppAnalyzer {
val random = scala.util.Random

override def analyze(ac: AppContext, startTime: Long, endTime: Long): String = {
println("================== AutoScale Analyze ==================")
val dimensions = qubole.sparklens.autoscaleGraphDimensions(conf)
val coresPerExecutor = ac.executorMap.values.map(x => x.cores).sum / ac.executorMap.size
println(s"cores per executor = ${coresPerExecutor}")
val originalGraph = createGraphs(dimensions, ac, coresPerExecutor)

""
}

private def createGraphs(dimensions: List[Int], ac: AppContext, coresPerExecutor: Int): Graph = {

val graph = new Graph(dimensions.head, dimensions.last)
createActualExecutorGraph(ac, graph, 'o')
createIdealPerJob(ac, graph, '*', coresPerExecutor)

val realDuration = ac.appInfo.endTime - ac.appInfo.startTime
println(s"\n\nTotal app duration = ${pd(realDuration)}")

println(s"Maximum concurrent executors = ${graph.getMaxY()}")
println(s"coresPerExecutor = ${coresPerExecutor}")
println(s"\n\nIndex:\noooooo --> Actual number of executors")
println("****** --> Ideal number of executors which would give same timelines")

graph.plot('o', '*')
graph
}

private def createActualExecutorGraph(appContext: AppContext, graph: Graph, graphIndex: Char)
: Unit = {
val sorted = AppContext.getSortedMap(appContext.executorMap, appContext)
graph.addPoint(Point(appContext.appInfo.startTime, 0, graphIndex)) // start point
var count: Int = 0
sorted.map(x => {
count += x._2.asInstanceOf[Int]
graph.addPoint(Point(x._1, count, graphIndex))
})
graph.addPoint(Point(appContext.appInfo.endTime, 0, graphIndex))
}

private def createIdealPerJob(ac: AppContext, graph: Graph, graphIndex: Char,
coresPerExecutor: Int): Unit = {
val maxConcurrentExecutors = AppContext.getMaxConcurrent(ac.executorMap, ac)

graph.addPoint(Point(ac.appInfo.startTime, 0, graphIndex))
var lastJobEndTime = ac.appInfo.startTime

ac.jobMap.values.toSeq.sortWith(_.startTime < _.startTime).foreach(jobTimeSpan => {
val optimalExecutors = jobTimeSpan.optimumNumExecutorsForJob(coresPerExecutor,
maxConcurrentExecutors.asInstanceOf[Int])

// first driver time when no jobs have run
if (lastJobEndTime == ac.appInfo.startTime) graph.addPoint(Point(jobTimeSpan.startTime, 0,
graphIndex))

// If time gap between this job and last job is large
if (jobTimeSpan.startTime - lastJobEndTime > AutoscalingPolicy.releaseTimeout) {
graph.addPoint(Point(lastJobEndTime, 0, graphIndex))
}

graph.addPoint(Point(jobTimeSpan.startTime, optimalExecutors, graphIndex))
graph.addPoint(Point(jobTimeSpan.endTime, optimalExecutors, graphIndex))
lastJobEndTime = jobTimeSpan.endTime
})
graph.addPoint(Point(lastJobEndTime, 0, graphIndex))
graph.addPoint(Point(ac.appInfo.endTime, 0, graphIndex))
}
}
10 changes: 7 additions & 3 deletions src/main/scala/com/qubole/sparklens/app/ReporterApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,19 @@ object ReporterApp extends App {
}

private def reportFromSparklensDump(file: String): Unit = {
val json = readSparklenDump(file)
startAnalysersFromString(json)

}

def readSparklenDump(file: String): String = {
val fs = FileSystem.get(new URI(file), new Configuration())

val path = new Path(file)
val byteArray = new Array[Byte](fs.getFileStatus(path).getLen.toInt)
fs.open(path).readFully(byteArray)

val json = (byteArray.map(_.toChar)).mkString
startAnalysersFromString(json)

(byteArray.map(_.toChar)).mkString
}

def reportFromEventHistory(file: String): Unit = {
Expand Down
Loading