Skip to content

Commit

Permalink
Fix incorrect lastErrorTime
Browse files Browse the repository at this point in the history
  • Loading branch information
zsxwing committed May 4, 2015
1 parent 3be4b7a commit d5d86f6
Showing 1 changed file with 6 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def deregisterReceiver(streamId: Int, message: String, error: String) {
val newReceiverInfo = receiverInfo.get(streamId) match {
case Some(oldInfo) =>
val lastErrorTime =
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
oldInfo.copy(endpoint = null, active = false, lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
lastError = error, lastErrorTime = lastErrorTime)
case None =>
logWarning("No prior receiver info")
val lastErrorTime =
if (error == null || error == "") -1 else ssc.scheduler.clock.getTimeMillis()
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
lastError = error, lastErrorTime = lastErrorTime)
}
receiverInfo -= streamId
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
Expand Down

0 comments on commit d5d86f6

Please sign in to comment.