Skip to content

Commit

Permalink
Properly fire JobQueued to all observers instead of only informing Jo…
Browse files Browse the repository at this point in the history
…bStats about that event.

Remove remaining usage of updateJobState outside of JobStats by firing JobExpired event upon removal of task for a disabled job, then handle JobExpired event in JobStats to do the actual state update.
Make many state accessors private in JobState
  • Loading branch information
azakkerman committed Apr 29, 2015
1 parent ed47f3e commit 63f3f81
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ case class JobFailed(job: Either[String, BaseJob], taskStatus: TaskStatus, attem
case class JobDisabled(job: BaseJob, cause: String) extends JobEvent
case class JobRetriesExhausted(job: BaseJob, taskStatus: TaskStatus, attempt: Int) extends JobEvent
case class JobRemoved(job: BaseJob) extends JobEvent
// For now, Chronos does not expire tasks once they are queued
//case class JobExpired(job: BaseJob, taskId: String, attempt: Int) extends JobEvent
// This event is fired when job is disabled (e.g. due to recurrence going to 0) and its queued tasks are purged
case class JobExpired(job: BaseJob, taskId: String) extends JobEvent

object JobsObserver {
type Observer = PartialFunction[JobEvent, Unit]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class TaskManager @Inject()(val listeningExecutor: ListeningScheduledExecutorSer
val persistenceStore: PersistenceStore,
val jobGraph: JobGraph,
val mesosDriver: MesosDriverFactory,
val jobStats: JobStats,
val jobsObserver: JobsObserver.Observer,
val registry: MetricRegistry) {

val log = Logger.getLogger(getClass.getName)
Expand All @@ -44,7 +44,8 @@ class TaskManager @Inject()(val listeningExecutor: ListeningScheduledExecutorSer

val taskCache = CacheBuilder.newBuilder().maximumSize(5000L).build[String, TaskState]()

val taskMapping: concurrent.Map[String, mutable.ListBuffer[(String, Future[_])]] = new ConcurrentHashMap().asScala
val taskMapping: concurrent.Map[String, mutable.ListBuffer[(String, Future[_])]] =
new ConcurrentHashMap[String, mutable.ListBuffer[(String, Future[_])]]().asScala

val queueGauge = registry.register(
MetricRegistry.name(classOf[TaskManager], "queueSize"),
Expand Down Expand Up @@ -90,7 +91,7 @@ class TaskManager @Inject()(val listeningExecutor: ListeningScheduledExecutorSer
removeTask(taskId)
None
} else if (jobOption.get.disabled) {
jobStats.updateJobState(jobOption.get.name, CurrentState.idle)
jobsObserver.apply(JobExpired(job, taskId))
None
} else {
Some(taskId, job)
Expand Down Expand Up @@ -127,14 +128,13 @@ class TaskManager @Inject()(val listeningExecutor: ListeningScheduledExecutorSer
* Cancels all tasks that are delay scheduled with the underlying executor.
*/
def flush() {
taskMapping.clone().values.map({
f =>
f.foreach({
(f) =>
log.info("Cancelling task '%s'".format(f._1))
f._2.cancel(true)
})
})
taskMapping.clone().values.foreach (
_.foreach {
case (taskId, futureTask) =>
log.info("Cancelling task '%s'".format(taskId))
futureTask.cancel(true)
}
)
taskMapping.clear()
queues.foreach(_.clear())
}
Expand All @@ -156,9 +156,9 @@ class TaskManager @Inject()(val listeningExecutor: ListeningScheduledExecutorSer
if (jobOption.isEmpty) {
log.warning("Job '%s' no longer registered.".format(jobName))
} else {
val (_, start, attempt, _) = TaskUtils.parseTaskId(taskId)
val (_, _, attempt, _) = TaskUtils.parseTaskId(taskId)
val job = jobOption.get
jobStats.jobQueued(job, taskId, attempt)
jobsObserver.apply(JobQueued(job, taskId, attempt))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
}
}

def removeJobState(job: BaseJob) = jobStates.remove(job.name)
private def removeJobState(job: BaseJob) = jobStates.remove(job.name)

/**
* Queries Cassandra table for past and current job statistics by jobName
Expand Down Expand Up @@ -314,18 +314,19 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
}

def asObserver: JobsObserver.Observer = {
case JobExpired(job, _) => updateJobState(job.name, CurrentState.idle)
case JobRemoved(job) => removeJobState(job)
case JobQueued(job, taskId, attempt) => jobQueued(job, taskId, attempt)
case JobStarted(job, taskStatus, attempt) => jobStarted(job, taskStatus, attempt)
case JobFinished(job, taskStatus, attempt) => jobFinished(job, taskStatus, attempt)
case JobFailed(job, taskStatus, attempt) => jobFailed(job, taskStatus, attempt)
}

def jobQueued(job: BaseJob, taskId: String, attempt: Int) {
private def jobQueued(job: BaseJob, taskId: String, attempt: Int) {
updateJobState(job.name, CurrentState.queued)
}

def jobStarted(job: BaseJob, taskStatus: TaskStatus, attempt: Int) {
private def jobStarted(job: BaseJob, taskStatus: TaskStatus, attempt: Int) {
updateJobState(job.name, CurrentState.running)

var jobSchedule:Option[String] = None
Expand Down Expand Up @@ -425,7 +426,7 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
_session = None
}

def jobFinished(job: BaseJob, taskStatus: TaskStatus, attempt: Int) {
private def jobFinished(job: BaseJob, taskStatus: TaskStatus, attempt: Int) {
updateJobState(job.name, CurrentState.idle)

var jobSchedule:Option[String] = None
Expand All @@ -450,7 +451,7 @@ class JobStats @Inject()(clusterBuilder: Option[Cluster.Builder], config: Cassan
isFailure=None)
}

def jobFailed(jobNameOrJob: Either[String, BaseJob], taskStatus: TaskStatus, attempt: Int): Unit = {
private def jobFailed(jobNameOrJob: Either[String, BaseJob], taskStatus: TaskStatus, attempt: Int): Unit = {
val jobName = jobNameOrJob.fold(name => name, _.name)
val jobSchedule = jobNameOrJob.fold(_ => None, {
case job: ScheduleBasedJob => Some(job.schedule)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TaskManagerSpec extends SpecificationWithJUnit with Mockito {
"TaskManager" should {
"Calculate the correct time delay between scheduling and dispatching the job" in {
val taskManager = new TaskManager(mock[ListeningScheduledExecutorService], mock[PersistenceStore],
mock[JobGraph], null, mock[JobStats], mock[MetricRegistry])
mock[JobGraph], null, MockJobUtils.mockFullObserver, mock[MetricRegistry])
val millis = taskManager.getMillisUntilExecution(new DateTime(DateTimeZone.UTC).plus(Hours.ONE))
val expectedSeconds = scala.math.round(Period.hours(1).toStandardDuration.getMillis / 1000d)
//Due to startup time / JVM overhead, millis wouldn't be totally accurate.
Expand Down

0 comments on commit 63f3f81

Please sign in to comment.