Skip to content

Commit

Permalink
Merge pull request alteryx#88 from rxin/clean
Browse files Browse the repository at this point in the history
Made the following traits/interfaces/classes non-public:

Made the following traits/interfaces/classes non-public:
SparkHadoopWriter
SparkHadoopMapRedUtil
SparkHadoopMapReduceUtil
SparkHadoopUtil
PythonAccumulatorParam
BlockManagerSlaveActor
  • Loading branch information
pwendell committed Oct 21, 2013
2 parents 35886f3 + b4d8478 commit aa61bfd
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,29 @@

package org.apache.hadoop.mapred

private[apache]
trait SparkHadoopMapRedUtil {
def newJobContext(conf: JobConf, jobId: JobID): JobContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl", "org.apache.hadoop.mapred.JobContext");
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[org.apache.hadoop.mapreduce.JobID])
val klass = firstAvailableClass("org.apache.hadoop.mapred.JobContextImpl",
"org.apache.hadoop.mapred.JobContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf],
classOf[org.apache.hadoop.mapreduce.JobID])
ctor.newInstance(conf, jobId).asInstanceOf[JobContext]
}

def newTaskAttemptContext(conf: JobConf, attemptId: TaskAttemptID): TaskAttemptContext = {
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl", "org.apache.hadoop.mapred.TaskAttemptContext")
val klass = firstAvailableClass("org.apache.hadoop.mapred.TaskAttemptContextImpl",
"org.apache.hadoop.mapred.TaskAttemptContext")
val ctor = klass.getDeclaredConstructor(classOf[JobConf], classOf[TaskAttemptID])
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}

def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
def newTaskAttemptID(
jtIdentifier: String,
jobId: Int,
isMap: Boolean,
taskId: Int,
attemptId: Int) = {
new TaskAttemptID(jtIdentifier, jobId, isMap, taskId, attemptId)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.hadoop.mapreduce

import org.apache.hadoop.conf.Configuration
import java.lang.{Integer => JInteger, Boolean => JBoolean}
import org.apache.hadoop.conf.Configuration

private[apache]
trait SparkHadoopMapReduceUtil {
def newJobContext(conf: Configuration, jobId: JobID): JobContext = {
val klass = firstAvailableClass(
Expand All @@ -37,23 +38,31 @@ trait SparkHadoopMapReduceUtil {
ctor.newInstance(conf, attemptId).asInstanceOf[TaskAttemptContext]
}

def newTaskAttemptID(jtIdentifier: String, jobId: Int, isMap: Boolean, taskId: Int, attemptId: Int) = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID");
def newTaskAttemptID(
jtIdentifier: String,
jobId: Int,
isMap: Boolean,
taskId: Int,
attemptId: Int) = {
val klass = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptID")
try {
// first, attempt to use the old-style constructor that takes a boolean isMap (not available in YARN)
// First, attempt to use the old-style constructor that takes a boolean isMap
// (not available in YARN)
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], classOf[Boolean],
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), new JBoolean(isMap), new JInteger(taskId),
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
} catch {
case exc: NoSuchMethodException => {
// failed, look for the new ctor that takes a TaskType (not available in 1.x)
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType").asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(taskTypeClass, if(isMap) "MAP" else "REDUCE")
// If that failed, look for the new constructor that takes a TaskType (not available in 1.x)
val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType")
.asInstanceOf[Class[Enum[_]]]
val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke(
taskTypeClass, if(isMap) "MAP" else "REDUCE")
val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass,
classOf[Int], classOf[Int])
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), new
JInteger(attemptId)).asInstanceOf[TaskAttemptID]
ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId),
new JInteger(attemptId)).asInstanceOf[TaskAttemptID]
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package org.apache.hadoop.mapred

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import java.io.IOException
import java.text.SimpleDateFormat
import java.text.NumberFormat
import java.io.IOException
import java.util.Date

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

import org.apache.spark.Logging
import org.apache.spark.SerializableWritable

Expand All @@ -36,6 +36,7 @@ import org.apache.spark.SerializableWritable
* Saves the RDD using a JobConf, which should contain an output key class, an output value class,
* a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
private[apache]
class SparkHadoopWriter(@transient jobConf: JobConf)
extends Logging
with SparkHadoopMapRedUtil
Expand Down Expand Up @@ -86,13 +87,11 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(
fs, conf.value, outputName, Reporter.NULL)
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
}

def write(key: AnyRef, value: AnyRef) {
if (writer!=null) {
//println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
if (writer != null) {
writer.write(key, value)
} else {
throw new IOException("Writer is null, open() has not been called")
Expand Down Expand Up @@ -182,6 +181,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}

private[apache]
object SparkHadoopWriter {
def createJobID(time: Date, id: Int): JobID = {
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
* Internal class that acts as an `AccumulatorParam` for Python accumulators. Inside, it
* collects a list of pickled strings that we pass to Python through a socket.
*/
class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {

Utils.checkHost(serverHost, "Expected hostname")
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,31 @@

package org.apache.spark.deploy

import com.google.common.collect.MapMaker

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf

import com.google.common.collect.MapMaker


/**
* Contains util methods to interact with Hadoop from spark.
* Contains util methods to interact with Hadoop from Spark.
*/
private[spark]
class SparkHadoopUtil {
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()

// Return an appropriate (subclass) of Configuration. Creating config can initializes some hadoop
// subsystems
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
*/
def newConfiguration(): Configuration = new Configuration()

// Add any user credentials to the job conf which are necessary for running on a secure Hadoop
// cluster
/**
* Add any user credentials to the job conf which are necessary for running on a secure Hadoop
* cluster.
*/
def addCredentials(conf: JobConf) {}

def isYarnMode(): Boolean = { false }
Expand Down
126 changes: 57 additions & 69 deletions core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,56 +24,54 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source
import scala.collection.mutable.{HashMap, ListBuffer}

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.executor.TaskMetrics

// Used to record runtime information for each job, including RDD graph
// tasks' start/stop shuffle information and information from outside

/**
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
* per Spark job with information such as RDD graph, tasks start/stop, shuffle information.
*
* @param logDirName The base directory for the log files.
*/
class JobLogger(val logDirName: String) extends SparkListener with Logging {
private val logDir =
if (System.getenv("SPARK_LOG_DIR") != null)
System.getenv("SPARK_LOG_DIR")
else
"/tmp/spark"

private val logDir = Option(System.getenv("SPARK_LOG_DIR")).getOrElse("/tmp/spark")

private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIDToJobID = new HashMap[Int, Int]
private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]

createLogDir()
def this() = this(String.valueOf(System.currentTimeMillis()))

def getLogDir = logDir
def getJobIDtoPrintWriter = jobIDToPrintWriter
def getStageIDToJobID = stageIDToJobID
def getJobIDToStages = jobIDToStages
def getEventQueue = eventQueue


// The following 5 functions are used only in testing.
private[scheduler] def getLogDir = logDir
private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter
private[scheduler] def getStageIDToJobID = stageIDToJobID
private[scheduler] def getJobIDToStages = jobIDToStages
private[scheduler] def getEventQueue = eventQueue

// Create a folder for log files, the folder's name is the creation time of the jobLogger
protected def createLogDir() {
val dir = new File(logDir + "/" + logDirName + "/")
if (dir.exists()) {
return
}
if (dir.mkdirs() == false) {
logError("create log directory error:" + logDir + "/" + logDirName + "/")
if (!dir.exists() && !dir.mkdirs()) {
logError("Error creating log directory: " + logDir + "/" + logDirName + "/")
}
}

// Create a log file for one job, the file name is the jobID
protected def createLogWriter(jobID: Int) {
try{
try {
val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
jobIDToPrintWriter += (jobID -> fileWriter)
} catch {
case e: FileNotFoundException => e.printStackTrace()
}
} catch {
case e: FileNotFoundException => e.printStackTrace()
}
}

// Close log file, and clean the stage relationship in stageIDToJobID
Expand Down Expand Up @@ -118,10 +116,9 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
var rddList = new ListBuffer[RDD[_]]
rddList += rdd
rdd.dependencies.foreach{ dep => dep match {
case shufDep: ShuffleDependency[_,_] =>
case _ => rddList ++= getRddsInStage(dep.rdd)
}
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd)
}
rddList
}
Expand Down Expand Up @@ -161,29 +158,27 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
jobLogInfo(jobID, indentString(indent) + rddInfo, false)
rdd.dependencies.foreach{ dep => dep match {
case shufDep: ShuffleDependency[_,_] =>
val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
}
rdd.dependencies.foreach {
case shufDep: ShuffleDependency[_, _] =>
val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
}
}

protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
var stageInfo: String = ""
if (stage.isShuffleMap) {
stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" +
stage.shuffleDep.get.shuffleId
}else{
stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
val stageInfo = if (stage.isShuffleMap) {
"STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
} else {
"STAGE_ID=" + stage.id + " RESULT_STAGE"
}
if (stage.jobId == jobID) {
jobLogInfo(jobID, indentString(indent) + stageInfo, false)
recordRddInStageGraph(jobID, stage.rdd, indent)
stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
} else
} else {
jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
}
}

// Record task metrics into job log files
Expand All @@ -193,39 +188,32 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging {
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
val readMetrics =
taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
case None => ""
}
val writeMetrics =
taskMetrics.shuffleWriteMetrics match {
case Some(metrics) =>
" SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
val readMetrics = taskMetrics.shuffleReadMetrics match {
case Some(metrics) =>
" SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
" BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
" BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
" BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
" REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
" REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
" REMOTE_BYTES_READ=" + metrics.remoteBytesRead
case None => ""
}
val writeMetrics = taskMetrics.shuffleWriteMetrics match {
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
}

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
stageLogInfo(
stageSubmitted.stage.id,
"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
stageSubmitted.stage.id, stageSubmitted.taskSize))
stageLogInfo(stageSubmitted.stage.id, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
stageSubmitted.stage.id, stageSubmitted.taskSize))
}

override def onStageCompleted(stageCompleted: StageCompleted) {
stageLogInfo(
stageCompleted.stageInfo.stage.id,
stageLogInfo(stageCompleted.stageInfo.stage.id,
"STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))

}

override def onTaskStart(taskStart: SparkListenerTaskStart) { }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.storage.BlockManagerMessages._
* An actor to take commands from the master to execute options. For example,
* this is used to remove blocks from the slave's BlockManager.
*/
private[storage]
class BlockManagerSlaveActor(blockManager: BlockManager) extends Actor {
override def receive = {

Expand Down

0 comments on commit aa61bfd

Please sign in to comment.