Skip to content

Commit

Permalink
Merge pull request mesos#437 from azakkerman/cleanup
Browse files Browse the repository at this point in the history
Cleanup of compiler warnings and complete JobStats refactoring
  • Loading branch information
elingg committed May 7, 2015
2 parents 8bbc588 + 129e8f6 commit a527e07
Show file tree
Hide file tree
Showing 16 changed files with 182 additions and 239 deletions.
Expand Up @@ -12,7 +12,7 @@ class JobNotificationObserver @Inject()(val notificationClients: List[ActorRef]
private[this] val log = Logger.getLogger(getClass.getName)
val clusterPrefix = clusterName.map(name => s"[$name]").getOrElse("")

def asObserver: JobsObserver.Observer = {
def asObserver: JobsObserver.Observer = JobsObserver.withName({
case JobRemoved(job) => sendNotification(job, "%s [Chronos] Your job '%s' was deleted!".format(clusterPrefix, job.name), None)
case JobDisabled(job, cause) => sendNotification(
job,
Expand All @@ -24,7 +24,7 @@ class JobNotificationObserver @Inject()(val notificationClients: List[ActorRef]
.format(DateTime.now(DateTimeZone.UTC), job.retries, taskStatus.getTaskId.getValue)
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some(TaskUtils.appendSchedulerMessage(msg, taskStatus)))
}
}, getClass.getSimpleName)

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
for (client <- notificationClients) {
Expand Down
Expand Up @@ -9,9 +9,6 @@ import org.apache.mesos.chronos.scheduler.config.{CassandraConfiguration, Schedu
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs._

import com.codahale.metrics.Histogram
import com.datastax.driver.core.Row
import com.datastax.driver.core.ColumnDefinitions
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule

Expand Down Expand Up @@ -106,13 +103,15 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
jobScheduler.deregisterJob(job, persist = true)
Response.noContent().build
} catch {
case ex: IllegalArgumentException =>
case ex: IllegalArgumentException => {
log.log(Level.INFO, "Bad Request", ex)
Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage)
.build()
case ex: Exception =>
}
case ex: Exception => {
log.log(Level.WARNING, "Exception while serving request", ex)
Response.serverError().build
}
}
}

Expand All @@ -121,7 +120,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
def getStat(@PathParam("jobName") jobName: String): Response = {
try {
val jobOpt = jobGraph.lookupVertex(jobName)
require(!jobOpt.isEmpty, "Job '%s' not found".format(jobName))
require(jobOpt.nonEmpty, "Job '%s' not found".format(jobName))

val histoStats = jobMetrics.getJobHistogramStats(jobName)
val jobStatsList: List[TaskStat] = jobStats.getMostRecentTaskStatsByJob(jobOpt.get, cassandraConfig.jobHistoryLimit())
Expand Down Expand Up @@ -173,32 +172,27 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
@PathParam("taskId") taskId: String,
taskStat: TaskStat) : Response = {
try {
var jobOpt = jobGraph.lookupVertex(jobName)
require(!jobOpt.isEmpty, "Job '%s' not found".format(jobName))
val jobOpt = jobGraph.lookupVertex(jobName)
require(jobOpt.nonEmpty, "Job '%s' not found".format(jobName))
require(TaskUtils.isValidVersion(taskId), "Invalid task id format %s".format(taskId))
require(jobOpt.get.dataProcessingJobType, "Job '%s' is not enabled to track data".format(jobName))

taskStat.numAdditionalElementsProcessed match {
case Some(num: Int) => {
taskStat.numAdditionalElementsProcessed.foreach {
num =>
//NOTE: 0 is a valid value
require(num >= 0,
"numAdditionalElementsProcessed (%d) is not positive".format(num))

jobStats.updateTaskProgress(jobOpt.get, taskId, num)
}
case None =>
}
Response.noContent().build
} catch {
case ex: IllegalArgumentException => {
case ex: IllegalArgumentException =>
log.log(Level.INFO, "Bad Request", ex)
Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage)
.build()
}
case ex: Exception => {
Response.status(Response.Status.BAD_REQUEST).entity(ex.getMessage).build
case ex: Exception =>
log.log(Level.WARNING, "Exception while serving request", ex)
Response.serverError().build
}
}
}

Expand Down
Expand Up @@ -5,8 +5,8 @@ import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.JsonSerializer
import com.fasterxml.jackson.databind.SerializerProvider

import org.joda.time.{DateTime, Duration}
import org.joda.time.format.{DateTimeFormat, DateTimeFormatter, PeriodFormat, PeriodFormatter, PeriodFormatterBuilder}
import org.joda.time.DateTime
import org.joda.time.format.{DateTimeFormat, PeriodFormatterBuilder}

class JobStatWrapperSerializer extends JsonSerializer[JobStatWrapper] {
def serialize(jobStat: JobStatWrapper, json: JsonGenerator, provider: SerializerProvider) {
Expand All @@ -30,54 +30,40 @@ class JobStatWrapperSerializer extends JsonSerializer[JobStatWrapper] {
json.writeFieldName("slaveId")
json.writeString(taskStat.taskSlaveId)

var fmt = DateTimeFormat.forPattern("MM/dd/yy HH:mm:ss")
val fmt = DateTimeFormat.forPattern("MM/dd/yy HH:mm:ss")
def writeTime(timeOpt: Option[DateTime]): Unit =
timeOpt.fold(json.writeString("N/A"))(dT => json.writeString(fmt.print(dT)))

//always show start time
json.writeFieldName("startTime")
taskStat.taskStartTs match {
case Some(dT: DateTime) => {
json.writeString(fmt.print(dT))
}
case None => {
json.writeString("N/A")
}
}
writeTime(taskStat.taskStartTs)
//show either end time or currently running
json.writeFieldName("endTime")
taskStat.taskEndTs match {
case Some(dT: DateTime) => {
json.writeString(fmt.print(dT))
}
case None => {
json.writeString("N/A")
}
}
writeTime(taskStat.taskEndTs)

taskStat.taskDuration match {
case Some(dur: Duration) => {
taskStat.taskDuration.foreach {
dur =>
val pFmt = new PeriodFormatterBuilder()
.appendDays().appendSuffix("d")
.appendHours().appendSuffix("h")
.appendMinutes().appendSuffix("m")
.printZeroIfSupported()
.appendSeconds().appendSuffix("s")
.toFormatter()
.toFormatter

json.writeFieldName("duration")
json.writeString(pFmt.print(dur.toPeriod()))
}
case None =>
json.writeString(pFmt.print(dur.toPeriod))

}

json.writeFieldName("status")
json.writeString(taskStat.taskStatus.toString())
json.writeString(taskStat.taskStatus.toString)

//only write elements processed, ignore numAdditionalElementsProcessed
taskStat.numElementsProcessed match {
case Some(num: Long) => {
taskStat.numElementsProcessed.foreach {
num =>
json.writeFieldName("numElementsProcessed")
json.writeNumber(num)
}
case None =>
}

json.writeEndObject()
Expand Down
Expand Up @@ -78,9 +78,9 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
}

/**
* Update
* @param oldJob
* @param newJob
* Update job definition
* @param oldJob job definition
* @param newJob new job definition
*/
def updateJob(oldJob: BaseJob, newJob: BaseJob) {
//TODO(FL): Ensure we're using job-ids rather than relying on jobs names for identification.
Expand All @@ -94,12 +94,12 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
.filter(_.nonEmpty).map(_.get)
if (newStreams.nonEmpty) {
log.info("updating ScheduleBasedJob:" + scheduleBasedJob.toString)
val tmpStreams = streams.filter(_.head()._2 != scheduleBasedJob.name)
val tmpStreams = streams.filter(_.head._2 != scheduleBasedJob.name)
streams = iteration(DateTime.now(DateTimeZone.UTC), newStreams ++ tmpStreams)
}
} else {
log.info("updating ScheduleBasedJob:" + scheduleBasedJob.toString)
val tmpStreams = streams.filter(_.head()._2 != scheduleBasedJob.name)
val tmpStreams = streams.filter(_.head._2 != scheduleBasedJob.name)
streams = iteration(DateTime.now(DateTimeZone.UTC), tmpStreams)
}
}
Expand Down Expand Up @@ -163,7 +163,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val parents = jobGraph.parentJobs(job)
log.info("Job parent: [ %s ], name: %s, command: %s".format(job.parents.mkString(","), job.name, job.command))
jobGraph.addVertex(job)
parents.map(x => jobGraph.addDependency(x.name, job.name))
parents.foreach(x => jobGraph.addDependency(x.name, job.name))
if (persist) {
log.info("Persisting job:" + job.name)
persistenceStore.persistJob(job)
Expand Down Expand Up @@ -318,7 +318,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val dependents = jobGraph.getExecutableChildren(jobName)
if (dependents.nonEmpty) {
log.fine("%s has dependents: %s .".format(jobName, dependents.mkString(",")))
dependents.map({
dependents.foreach {
//TODO(FL): Ensure that the job for the given x exists. Lock.
x =>
val dependentJob = jobGraph.getJobForName(x).get
Expand All @@ -332,7 +332,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

log.fine("Enqueued depedent job." + x)
}
})
}
} else {
log.fine("%s does not have any ready dependents.".format(jobName))
}
Expand Down Expand Up @@ -443,9 +443,9 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
/**
* Iterates through the stream for the given DateTime and a list of schedules, removing old schedules and acting on
* the available schedules.
* @param dateTime
* @param schedules
* @return
* @param dateTime for which to process schedules
* @param schedules schedules to be processed
* @return list of updated schedules
*/
def iteration(dateTime: DateTime, schedules: List[ScheduleStream]): List[ScheduleStream] = {
log.info("Checking schedules with time horizon:%s".format(scheduleHorizon.toString))
Expand All @@ -469,13 +469,13 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
* Given a stream and a DateTime(@see org.joda.DateTime), this method returns a 2-tuple with a ScheduleTask and
* a clipped schedule stream in case that the ScheduleTask was not none. Returns no task and the input stream,
* if nothing needs scheduling within the time horizon.
* @param now
* @param stream
* @param now time to start iteration with
* @param stream schedule stream
* @return
*/
@tailrec
final def next(now: DateTime, stream: ScheduleStream): (Option[ScheduledTask], Option[ScheduleStream]) = {
val (schedule, jobName, scheduleTimeZone) = stream.head()
val (schedule, jobName, scheduleTimeZone) = stream.head

log.info("Calling next for stream: %s, jobname: %s".format(stream.schedule, jobName))
assert(schedule != null && !schedule.equals(""), "No valid schedule found: " + schedule)
Expand Down Expand Up @@ -507,21 +507,24 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
(None, None)
} else {
val job = jobOption.get
if (nextDate.isAfter(now.minus(job.epsilon)) && nextDate.isBefore(now.plus(scheduleHorizon))) {
val scheduleWindowBegin = now.minus(job.epsilon)
val scheduleWindowEnd = now.plus(scheduleHorizon)
if (nextDate.isAfter(scheduleWindowBegin) && nextDate.isBefore(scheduleWindowEnd)) {
log.info("Task ready for scheduling: %s".format(nextDate))
//TODO(FL): Rethink passing the dispatch queue all the way down to the ScheduledTask.
val task = new ScheduledTask(TaskUtils.getTaskId(job, nextDate), nextDate, job, taskManager)
return (Some(task), stream.tail())
return (Some(task), stream.tail)
}
//The nextDate has passed already beyond epsilon.
//TODO(FL): Think about the semantics here and see if it always makes sense to skip ahead of missed schedules.
// Next instance is too far in the future
// Needs to be scheduled at a later time, after schedule horizon.
if (!nextDate.isBefore(now)) {
jobsObserver.apply(JobSkipped(job, nextDate))
return (None, Some(stream))
}
//Needs to be scheduled at a later time, after schedule horizon.
// Next instance is too far in the past (beyond epsilon)
//TODO(FL): Think about the semantics here and see if it always makes sense to skip ahead of missed schedules.
log.fine("No need to work on schedule: '%s' yet".format(nextDate))
val tail = stream.tail()
jobsObserver.apply(JobSkipped(job, nextDate))
val tail = stream.tail
if (tail.isEmpty) {
//TODO(FL): Verify that this can go.
persistenceStore.removeJob(job)
Expand Down Expand Up @@ -595,10 +598,9 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
log.info("Loading jobs")
JobUtils.loadJobs(this, persistenceStore)
} catch {
case e: Exception => {
case e: Exception =>
log.log(Level.SEVERE, "Loading tasks or jobs failed. Exiting.", e)
System.exit(1)
}
}
}

Expand Down Expand Up @@ -679,13 +681,13 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

private def removeOldSchedules(scheduleStreams: List[Option[ScheduleStream]]): List[ScheduleStream] = {
log.fine("Filtering out empty streams")
scheduleStreams.filter(s => s.isDefined && s.get.tail().isDefined).map(_.get)
scheduleStreams.filter(s => s.isDefined && s.get.tail.isDefined).map(_.get)
}

/**
* Adds a List of ScheduleStream and runs a iteration at the current time.
* @param now
* @param newStreams
* @param now time from which to evaluate schedule
* @param newStreams new schedules to be evaluated
*/
private def addSchedule(now: DateTime, newStreams: List[ScheduleStream]) {
log.info("Adding schedule for time:" + now.toString(DateTimeFormat.fullTime()))
Expand Down
Expand Up @@ -15,8 +15,8 @@ case class JobFailed(job: Either[String, BaseJob], taskStatus: TaskStatus, attem
case class JobDisabled(job: BaseJob, cause: String) extends JobEvent
case class JobRetriesExhausted(job: BaseJob, taskStatus: TaskStatus, attempt: Int) extends JobEvent
case class JobRemoved(job: BaseJob) extends JobEvent
// For now, Chronos does not expire tasks once they are queued
//case class JobExpired(job: BaseJob, taskId: String, attempt: Int) extends JobEvent
// This event is fired when job is disabled (e.g. due to recurrence going to 0) and its queued tasks are purged
case class JobExpired(job: BaseJob, taskId: String) extends JobEvent

object JobsObserver {
type Observer = PartialFunction[JobEvent, Unit]
Expand All @@ -28,4 +28,10 @@ object JobsObserver {
Some(Unit)
})
}

def withName(observer: Observer, name: String): Observer = new Observer {
override def isDefinedAt(event: JobEvent) = observer.isDefinedAt(event)
override def apply(event: JobEvent): Unit = observer.apply(event)
override def toString(): String = name
}
}
Expand Up @@ -8,15 +8,13 @@ package org.apache.mesos.chronos.scheduler.jobs
*/
class ScheduleStream(val schedule: String, val jobName: String, val scheduleTimeZone: String = "") {

def head(): (String, String, String) = {
(schedule, jobName, scheduleTimeZone)
}
def head: (String, String, String) = (schedule, jobName, scheduleTimeZone)

/**
* Returns a clipped schedule.
* @return
*/
def tail(): Option[ScheduleStream] = {
def tail: Option[ScheduleStream] =
//TODO(FL) Represent the schedule as a data structure instead of a string.
Iso8601Expressions.parse(schedule, scheduleTimeZone) match {
case Some((rec, start, per)) =>
Expand All @@ -31,5 +29,4 @@ class ScheduleStream(val schedule: String, val jobName: String, val scheduleTime
case None =>
None
}
}
}

0 comments on commit a527e07

Please sign in to comment.