Skip to content

Commit

Permalink
Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refa…
Browse files Browse the repository at this point in the history
…ctor
  • Loading branch information
andrewor14 committed Apr 3, 2014
2 parents 9a48fa1 + 61358e3 commit 0d61ee8
Show file tree
Hide file tree
Showing 17 changed files with 706 additions and 95 deletions.
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,6 @@
<artifactId>guava</artifactId>
<version>14.0.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
1 change: 0 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,6 @@ object SparkBuild extends Build {
name := "spark-core",
libraryDependencies ++= Seq(
"com.google.guava" % "guava" % "14.0.1",
"com.google.code.findbugs" % "jsr305" % "1.3.9",
"log4j" % "log4j" % "1.2.17",
"org.slf4j" % "slf4j-api" % slf4jVersion,
"org.slf4j" % "slf4j-log4j12" % slf4jVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receivers._
import org.apache.spark.streaming.scheduler._
import org.apache.hadoop.conf.Configuration
import org.apache.spark.streaming.ui.StreamingUI

/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
Expand Down Expand Up @@ -158,6 +159,9 @@ class StreamingContext private[streaming] (

private[streaming] val waiter = new ContextWaiter

private[streaming] val ui = new StreamingUI(this)
ui.bind()

/**
* Return the associated Spark context
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}

/* Adds metadata to the Stream while it is running.
* This method should be overwritten by sublcasses of InputDStream.
*/
private[streaming] def addMetadata(metadata: Any) {
if (metadata != null) {
logInfo("Dropping Metadata: " + metadata.toString)
}
}

/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@

package org.apache.spark.streaming.dstream

import java.util.concurrent.ArrayBlockingQueue
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag

import akka.actor.{Props, Actor}
import akka.actor.{Actor, Props}
import akka.pattern.ask

import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{RDD, BlockRDD}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.util.Utils

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -48,8 +49,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

// This is an unique identifier that is used to match the network receiver with the
// corresponding network input stream.
/** Keeps all received blocks information */
private val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

/** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()

/**
Expand All @@ -64,23 +67,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte

def stop() {}

/** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
}

/**
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This
* implementation overrides the default implementation to clear received
* block information.
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
receivedBlockInfo --= oldReceivedBlocks.keys
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
}
}


private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
private[streaming] case class ReportBlock(blockId: StreamBlockId, numRecords: Long, metadata: Any)
extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage

Expand Down Expand Up @@ -155,21 +180,30 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
actor ! ReportError(e.toString)
}


/**
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
def pushBlock(
blockId: StreamBlockId,
arrayBuffer: ArrayBuffer[T],
metadata: Any,
level: StorageLevel
) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, arrayBuffer.size, metadata)
}

/**
* Pushes a block (as bytes) into the block manager.
*/
def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
def pushBlock(
blockId: StreamBlockId,
bytes: ByteBuffer,
metadata: Any,
level: StorageLevel
) {
env.blockManager.putBytes(blockId, bytes, level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, -1 , metadata)
}

/** A helper actor that communicates with the NetworkInputTracker */
Expand All @@ -182,13 +216,15 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
val timeout = 5.seconds

override def preStart() {
val future = tracker.ask(RegisterReceiver(streamId, self))(timeout)
val msg = RegisterReceiver(
streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
val future = tracker.ask(msg)(timeout)
Await.result(future, timeout)
}

override def receive() = {
case ReportBlock(blockId, metadata) =>
tracker ! AddBlocks(streamId, Array(blockId), metadata)
case ReportBlock(blockId, numRecords, metadata) =>
tracker ! AddBlocks(ReceivedBlockInfo(streamId, blockId, numRecords, metadata))
case ReportError(msg) =>
tracker ! DeregisterReceiver(streamId, msg)
case StopReceiver(msg) =>
Expand All @@ -210,7 +246,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {

case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)

val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)

// Restart the timer
Expand All @@ -159,7 +159,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Success(jobs) =>
val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}

def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
val jobSet = new JobSet(time, jobs)
jobSets.put(time, jobSet)
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + time)
logInfo("Added jobs for time " + jobSet.time)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
case class JobSet(
time: Time,
jobs: Seq[Job],
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
) {

private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
Expand Down Expand Up @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
Expand Down
Loading

0 comments on commit 0d61ee8

Please sign in to comment.