Skip to content

Commit

Permalink
Added network receiver information to the Streaming UI.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Mar 31, 2014
1 parent 56cc7fb commit 93f1c69
Show file tree
Hide file tree
Showing 11 changed files with 301 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -351,15 +351,6 @@ abstract class DStream[T: ClassTag] (
dependencies.foreach(_.clearMetadata(time))
}

/* Adds metadata to the Stream while it is running.
* This method should be overwritten by sublcasses of InputDStream.
*/
private[streaming] def addMetadata(metadata: Any) {
if (metadata != null) {
logInfo("Dropping Metadata: " + metadata.toString)
}
}

/**
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,23 @@

package org.apache.spark.streaming.dstream

import java.util.concurrent.ArrayBlockingQueue
import java.nio.ByteBuffer
import java.util.concurrent.ArrayBlockingQueue

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag

import akka.actor.{Props, Actor}
import akka.actor.{Actor, Props}
import akka.pattern.ask

import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
import org.apache.spark.streaming._
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.rdd.{RDD, BlockRDD}
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver}
import org.apache.spark.streaming._
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver}
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand All @@ -48,8 +48,10 @@ import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, Regi
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {

// This is an unique identifier that is used to match the network receiver with the
// corresponding network input stream.
/** Keeps all received blocks information */
private val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]]

/** This is an unique identifier for the network input stream. */
val id = ssc.getNewNetworkStreamId()

/**
Expand All @@ -64,23 +66,45 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte

def stop() {}

/** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockIds = ssc.scheduler.networkInputTracker.getBlockIds(id, validTime)
val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}

/** Get information on received blocks. */
private[streaming] def getReceivedBlockInfo(time: Time) = {
receivedBlockInfo(time)
}

/**
* Clear metadata that are older than `rememberDuration` of this DStream.
* This is an internal method that should not be called directly. This
* implementation overrides the default implementation to clear received
* block information.
*/
private[streaming] override def clearMetadata(time: Time) {
super.clearMetadata(time)
val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration))
receivedBlockInfo --= oldReceivedBlocks.keys
logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", "))
}
}


private[streaming] sealed trait NetworkReceiverMessage
private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage
private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any)
private[streaming] case class ReportBlock(blockId: StreamBlockId, numRecords: Long, metadata: Any)
extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage

Expand Down Expand Up @@ -156,21 +180,20 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
actor ! ReportError(e.toString)
}


/**
* Pushes a block (as an ArrayBuffer filled with data) into the block manager.
*/
def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
def pushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) {
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, arrayBuffer.size, metadata)
}

/**
* Pushes a block (as bytes) into the block manager.
*/
def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
def pushBlock(blockId: StreamBlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
env.blockManager.putBytes(blockId, bytes, level)
actor ! ReportBlock(blockId, metadata)
actor ! ReportBlock(blockId, -1 , metadata)
}

/** A helper actor that communicates with the NetworkInputTracker */
Expand All @@ -188,8 +211,8 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
}

override def receive() = {
case ReportBlock(blockId, metadata) =>
tracker ! AddBlocks(streamId, Array(blockId), metadata)
case ReportBlock(blockId, numRecords, metadata) =>
tracker ! AddBlocks(ReceivedBlockInfo(streamId, blockId, numRecords, metadata))
case ReportError(msg) =>
tracker ! DeregisterReceiver(streamId, msg)
case StopReceiver(msg) =>
Expand All @@ -211,7 +234,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
class BlockGenerator(storageLevel: StorageLevel)
extends Serializable with Logging {

case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null)
case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null)

val clock = new SystemClock()
val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time
*/
case class BatchInfo(
batchTime: Time,
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " +
timesToReschedule.mkString(", "))
timesToReschedule.foreach(time =>
jobScheduler.runJobs(time, graph.generateJobs(time))
jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time)))
)

// Restart the timer
Expand All @@ -159,7 +159,13 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) => jobScheduler.runJobs(time, jobs)
case Success(jobs) =>
val receivedBlockInfo = graph.getNetworkInputStreams.map { stream =>
val streamId = stream.id
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}

def runJobs(time: Time, jobs: Seq[Job]) {
if (jobs.isEmpty) {
logInfo("No jobs added for time " + time)
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
val jobSet = new JobSet(time, jobs)
jobSets.put(time, jobSet)
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => executor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + time)
logInfo("Added jobs for time " + jobSet.time)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time
* belong to the same batch.
*/
private[streaming]
case class JobSet(time: Time, jobs: Seq[Job]) {
case class JobSet(
time: Time,
jobs: Seq[Job],
receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty
) {

private val incompleteJobs = new HashSet[Job]()
private val submissionTime = System.currentTimeMillis() // when this jobset was submitted
Expand Down Expand Up @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) {
def toBatchInfo: BatchInfo = {
new BatchInfo(
time,
receivedBlockInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,33 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.dstream.{NetworkInputDStream, NetworkReceiver}
import org.apache.spark.streaming.dstream.{StopReceiver, ReportBlock, ReportError}
import org.apache.spark.{SparkException, Logging, SparkEnv}
import org.apache.spark.SparkContext._

import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.concurrent.duration._
import scala.collection.mutable.{HashMap, SynchronizedQueue, SynchronizedMap}

import akka.actor._
import akka.pattern.ask
import akka.dispatch._
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming.{Time, StreamingContext}

import org.apache.spark.{Logging, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
import org.apache.spark.util.AkkaUtils

/** Information about block received by the network receiver */
case class ReceivedBlockInfo(
streamId: Int,
blockId: StreamBlockId,
numRecords: Long,
metadata: Any
)

/**
* Messages used by the NetworkReceiver and the NetworkInputTracker to communicate
* with each other.
*/
private[streaming] sealed trait NetworkInputTrackerMessage
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
extends NetworkInputTrackerMessage
private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any)
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
extends NetworkInputTrackerMessage
Expand All @@ -53,9 +60,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*)
val receiverExecutor = new ReceiverExecutor()
val receiverInfo = new HashMap[Int, ActorRef]
val receivedBlockIds = new HashMap[Int, Queue[BlockId]]
val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]]
val timeout = AkkaUtils.askTimeout(ssc.conf)

val listenerBus = ssc.scheduler.listenerBus

// actor is created when generator starts.
// This not being null means the tracker has been started and not stopped
Expand Down Expand Up @@ -87,15 +95,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
}

/** Return all the blocks received from a receiver. */
def getBlockIds(receiverId: Int, time: Time): Array[BlockId] = synchronized {
val queue = receivedBlockIds.synchronized {
receivedBlockIds.getOrElse(receiverId, new Queue[BlockId]())
}
val result = queue.synchronized {
queue.dequeueAll(x => true)
}
logInfo("Stream " + receiverId + " received " + result.size + " blocks")
result.toArray
def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = {
val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true)
logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks")
receivedBlockInfo.toArray
}

private def getReceivedBlockInfoQueue(streamId: Int) = {
receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo])
}

/** Actor to receive messages from the receivers. */
Expand All @@ -110,17 +117,8 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
+ sender.path.address)
sender ! true
}
case AddBlocks(streamId, blockIds, metadata) => {
val tmp = receivedBlockIds.synchronized {
if (!receivedBlockIds.contains(streamId)) {
receivedBlockIds += ((streamId, new Queue[BlockId]))
}
receivedBlockIds(streamId)
}
tmp.synchronized {
tmp ++= blockIds
}
networkInputStreamMap(streamId).addMetadata(metadata)
case AddBlocks(receivedBlockInfo) => {
getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo
}
case DeregisterReceiver(streamId, msg) => {
receiverInfo -= streamId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.util.Distribution
/** Base trait for events related to StreamingListener */
sealed trait StreamingListenerEvent

case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent

Expand All @@ -34,14 +35,14 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
* computation.
*/
trait StreamingListener {
/**
* Called when processing of a batch has completed
*/

/** Called when a batch of jobs has been submitted for processing. */
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

/**
* Called when processing of a batch has started
*/
/** Called when processing of a batch of jobs has started. */
def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }
}

Expand Down
Loading

0 comments on commit 93f1c69

Please sign in to comment.