Skip to content

Commit

Permalink
Merge pull request #436 from azakkerman/master
Browse files Browse the repository at this point in the history
Introduce JobsObserver interface and place both JobStats and error notification behind that interface
  • Loading branch information
brndnmtthws committed May 3, 2015
2 parents e946598 + 6dd59cc commit 3ad15f0
Show file tree
Hide file tree
Showing 17 changed files with 216 additions and 184 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.mesos.chronos.notification

import java.util.logging.Logger

import akka.actor.ActorRef
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs._
import org.joda.time.{DateTimeZone, DateTime}

class JobNotificationObserver @Inject()(val notificationClients: List[ActorRef] = List(),
val clusterName: Option[String] = None) {
private[this] val log = Logger.getLogger(getClass.getName)
val clusterPrefix = clusterName.map(name => s"[$name]").getOrElse("")

def asObserver: JobsObserver.Observer = {
case JobRemoved(job) => sendNotification(job, "%s [Chronos] Your job '%s' was deleted!".format(clusterPrefix, job.name), None)
case JobDisabled(job, cause) => sendNotification(
job,
"%s [Chronos] job '%s' disabled".format(clusterPrefix, job.name),
Some(cause))

case JobRetriesExhausted(job, taskStatus, attempts) =>
val msg = "\n'%s'. Retries attempted: %d.\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), job.retries, taskStatus.getTaskId.getValue)
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some(TaskUtils.appendSchedulerMessage(msg, taskStatus)))
}

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
for (client <- notificationClients) {
val subowners = job.owner.split("\\s*,\\s*")
for (subowner <- subowners) {
log.info("Sending mail notification to:%s for job %s using client: %s".format(subowner, job.name, client))
client !(job, subowner, subject, message)
}
}

log.info(subject)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import javax.ws.rs.{Consumes, GET, Path, Produces, WebApplicationException}

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.graph.Exporter
import com.codahale.metrics.annotation.Timed
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats

/**
* The REST API for managing jobs.
Expand All @@ -21,7 +21,7 @@ import com.google.inject.Inject
@Path(PathConstants.graphBasePath)
@Consumes(Array(MediaType.APPLICATION_JSON))
class GraphManagementResource @Inject()(
val jobScheduler: JobScheduler,
val jobStats: JobStats,
val jobGraph: JobGraph,
val configuration: SchedulerConfiguration) {

Expand All @@ -48,7 +48,7 @@ class GraphManagementResource @Inject()(
def jsonGraph(): Response = {
try {
val buffer = new StringWriter
Exporter.export(buffer, jobGraph, jobScheduler)
Exporter.export(buffer, jobGraph, jobStats)
Response.ok(buffer.toString).build
} catch {
case ex: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule

import com.codahale.metrics.annotation.Timed
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.mutable.ListBuffer
Expand All @@ -33,6 +34,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
val jobGraph: JobGraph,
val configuration: SchedulerConfiguration,
val cassandraConfig: CassandraConfiguration,
val jobStats: JobStats,
val jobMetrics: JobMetrics) {

private[this] val log = Logger.getLogger(getClass.getName)
Expand Down Expand Up @@ -100,8 +102,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
}
}
}
jobScheduler.sendNotification(job, "[Chronos] - Your job '%s' was deleted!".format(jobName))

// No need to send notifications here, the jobScheduler.deregisterJob will do it
jobScheduler.deregisterJob(job, persist = true)
Response.noContent().build
} catch {
Expand All @@ -123,7 +124,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
require(!jobOpt.isEmpty, "Job '%s' not found".format(jobName))

val histoStats = jobMetrics.getJobHistogramStats(jobName)
val jobStatsList: List[TaskStat] = jobScheduler.jobStats.getMostRecentTaskStatsByJob(jobOpt.get, cassandraConfig.jobHistoryLimit())
val jobStatsList: List[TaskStat] = jobStats.getMostRecentTaskStatsByJob(jobOpt.get, cassandraConfig.jobHistoryLimit())
val jobStatsWrapper = new JobStatWrapper(jobStatsList, histoStats)

val wrapperStr = objectMapper.writeValueAsString(jobStatsWrapper)
Expand Down Expand Up @@ -183,7 +184,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
require(num >= 0,
"numAdditionalElementsProcessed (%d) is not positive".format(num))

jobScheduler.jobStats.updateTaskProgress(jobOpt.get, taskId, num)
jobStats.updateTaskProgress(jobOpt.get, taskId, num)
}
case None =>
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.mesos.chronos.scheduler.config

import org.apache.mesos.chronos.scheduler.jobs.JobStats
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.ProtocolOptions.Compression
import com.datastax.driver.core.policies.{DowngradingConsistencyRetryPolicy, LatencyAwarePolicy, RoundRobinPolicy}
import com.google.inject.{AbstractModule, Provides, Scopes, Singleton}
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats

class JobStatsModule(config: CassandraConfiguration) extends AbstractModule {
def configure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import java.util.logging.{Level, Logger}

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.util.Timeout
import org.apache.mesos.chronos.notification.{MailClient, RavenClient, SlackClient}
import org.apache.mesos.chronos.notification.{JobNotificationObserver, MailClient, RavenClient, SlackClient}
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs.{JobMetrics, JobScheduler, JobStats, TaskManager}
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.apache.mesos.chronos.scheduler.jobs.{JobsObserver, JobMetrics, JobScheduler, TaskManager}
import org.apache.mesos.chronos.scheduler.mesos.{MesosDriverFactory, MesosJobFramework, MesosTaskBuilder}
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import com.google.common.util.concurrent.{ListeningScheduledExecutorService, MoreExecutors, ThreadFactoryBuilder}
Expand Down Expand Up @@ -90,9 +91,8 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
mesosSchedulerDriver: MesosDriverFactory,
curator: CuratorFramework,
leaderLatch: LeaderLatch,
notificationClients: List[ActorRef],
metrics: JobMetrics,
stats: JobStats): JobScheduler = {
jobsObserver: JobsObserver.Observer,
metrics: JobMetrics): JobScheduler = {
new JobScheduler(
scheduleHorizon = Seconds.seconds(config.scheduleHorizonSeconds()).toPeriod,
taskManager = taskManager,
Expand All @@ -102,12 +102,17 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
curator = curator,
leaderLatch = leaderLatch,
leaderPath = config.zooKeeperCandidatePath,
notificationClients = notificationClients,
jobsObserver = jobsObserver,
failureRetryDelay = config.failureRetryDelayMs(),
disableAfterFailures = config.disableAfterFailures(),
jobMetrics = metrics,
jobStats = stats,
clusterName = config.clusterName.get)
jobMetrics = metrics)
}

@Singleton
@Provides
def provideJobsObservers(jobStats: JobStats, notificationClients: List[ActorRef]): JobsObserver.Observer = {
val notifier = new JobNotificationObserver(notificationClients, config.clusterName.get)
JobsObserver.composite(List(notifier.asObserver, jobStats.asObserver))
}

@Singleton
Expand Down Expand Up @@ -136,8 +141,7 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
for {
webhookUrl <- config.slackWebhookUrl.get if !config.slackWebhookUrl.isEmpty
} yield {
create(classOf[SlackClient], webhookUrl)
}
create(classOf[SlackClient], webhookUrl)}
).flatten
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{TimeUnit, Executors, Future}
import java.util.logging.{Level, Logger}

import akka.actor.{ActorSystem, ActorRef}
import akka.actor.ActorSystem
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.mesos.MesosDriverFactory
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
Expand Down Expand Up @@ -34,12 +34,10 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val curator: CuratorFramework = null,
val leaderLatch: LeaderLatch = null,
val leaderPath: String = null,
val notificationClients: List[ActorRef] = List(),
val jobsObserver: JobsObserver.Observer,
val failureRetryDelay: Long = 60000,
val disableAfterFailures: Long = 0,
val jobMetrics: JobMetrics,
val jobStats: JobStats,
val clusterName: Option[String] = None)
val jobMetrics: JobMetrics)
//Allows us to let Chaos manage the lifecycle of this class.
extends AbstractIdleService {

Expand Down Expand Up @@ -210,7 +208,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

taskManager.cancelTasks(job)
taskManager.removeTasks(job)
jobStats.removeJobState(job)
jobsObserver.apply(JobRemoved(job))

if (persist) {
log.info("Removing job from underlying state abstraction:" + job.name)
Expand All @@ -233,7 +231,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
} else {
val job = jobOption.get
val (_, _, attempt, _) = TaskUtils.parseTaskId(taskId)
jobStats.jobStarted(job, taskStatus, attempt)
jobsObserver.apply(JobStarted(job, taskStatus, attempt))

job match {
case j: DependencyBasedJob =>
Expand All @@ -246,7 +244,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
/**
* Takes care of follow-up actions for a finished task, i.e. update the job schedule in the persistence store or
* launch tasks for dependent jobs
* @param taskId
*/
def handleFinishedTask(taskStatus: TaskStatus, taskDate: Option[DateTime] = None) {
// `taskDate` is purely for unit testing
Expand All @@ -265,7 +262,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
jobMetrics.updateJobStat(jobName, timeMs = DateTime.now(DateTimeZone.UTC).getMillis - start)
jobMetrics.updateJobStatus(jobName, success = true)
val job = jobOption.get
jobStats.jobFinished(job, taskStatus, attempt)
jobsObserver.apply(JobFinished(job, taskStatus, attempt))

val newJob = job match {
case job: ScheduleBasedJob =>
Expand Down Expand Up @@ -298,13 +295,9 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
log.info("Disabling job that reached a zero-recurrence count!")

val disabledJob: ScheduleBasedJob = scheduleBasedJob.copy(disabled = true)
val clusterPrefix = getClusterPrefix(clusterName)
sendNotification(
job,
"%s [Chronos] job '%s' disabled".format(clusterPrefix, job.name),
Some( """Job '%s' has exhausted all of its recurrences and has been disabled.
|Please consider either removing your job, or updating its schedule and re-enabling it.
""".stripMargin.format(job.name)))
jobsObserver.apply(JobDisabled(job, """Job '%s' has exhausted all of its recurrences and has been disabled.
|Please consider either removing your job, or updating its schedule and re-enabling it.
""".stripMargin.format(job.name)))
replaceJob(scheduleBasedJob, disabledJob)
}
case None =>
Expand All @@ -314,23 +307,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
}
}

def getClusterPrefix(clusterName: Option[String]) = clusterName match {
case Some(name) => s"[$name] "
case None => ""
}

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
for (client <- notificationClients) {
val subowners = job.owner.split("\\s*,\\s*")
for (subowner <- subowners) {
log.info("Sending mail notification to:%s for job %s using client: %s".format(subowner, job.name, client))
client !(job, subowner, subject, message)
}
}

log.info(subject)
}

def replaceJob(oldJob: BaseJob, newJob: BaseJob) {
lock.synchronized {
jobGraph.replaceVertex(oldJob, newJob)
Expand Down Expand Up @@ -364,11 +340,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

def handleFailedTask(taskStatus: TaskStatus) {
val taskId = taskStatus.getTaskId.getValue
val message = if (taskStatus.hasMessage && taskStatus.getMessage.nonEmpty) {
Some(taskStatus.getMessage)
} else {
None
}
if (!TaskUtils.isValidVersion(taskId)) {
log.warning("Found old or invalid task, ignoring!")
} else {
Expand All @@ -377,7 +348,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val jobOption = jobGraph.lookupVertex(jobName)
jobOption match {
case Some(job) =>
jobStats.jobFailed(job, taskStatus, attempt)
jobsObserver.apply(JobFailed(Right(job), taskStatus, attempt))

val hasAttemptsLeft: Boolean = attempt < job.retries
val hadRecentSuccess: Boolean = try {
Expand Down Expand Up @@ -429,37 +400,17 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
updateJob(job, newJob)
if (job.softError) processDependencies(jobName, Option(lastErrorTime))

val clusterPrefix = getClusterPrefix(clusterName)

// Handle failure by either disabling the job and notifying the owner,
// or just notifying the owner.
if (disableJob) {
log.warning("Job failed beyond retries! Job will now be disabled after "
+ newJob.errorsSinceLastSuccess + " failures (disableAfterFailures=" + disableAfterFailures + ").")
message match {
case Some(msg) =>
sendNotification(job, "%s [Chronos] JOB DISABLED: '%s'".format(clusterPrefix, job.name),
Some("\nFailed at '%s', %d failures since last success\nTask id: %s\nThe scheduler provided this message:\n\n%s"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess,
taskId, msg)))
case None =>
sendNotification(job, "%s [Chronos] JOB DISABLED: '%s'".format(clusterPrefix, job.name),
Some("\nFailed at '%s', %d failures since last success\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess, taskId)))
}
val msg = "\nFailed at '%s', %d failures since last success\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess, taskId)
jobsObserver.apply(JobDisabled(job, TaskUtils.appendSchedulerMessage(msg, taskStatus)))
} else {
log.warning("Job failed beyond retries!")
message match {
case Some(msg) =>
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some("\n'%s'. Retries attempted: %d.\nTask id: %s\nThe scheduler provided this message:\n\n%s"
.format(DateTime.now(DateTimeZone.UTC), job.retries,
taskId, msg)))
case None =>
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some("\n'%s'. Retries attempted: %d.\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), job.retries, taskId)))
}
jobsObserver.apply(JobRetriesExhausted(job, taskStatus, attempt))
}
jobMetrics.updateJobStatus(jobName, success = false)
}
Expand All @@ -486,16 +437,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val (jobName, start, attempt, _) = TaskUtils.parseTaskId(taskId)
val jobOption = jobGraph.lookupVertex(jobName)

var job :BaseJob = null
if (!jobOption.isEmpty) {
job = jobOption.get
jobStats.jobFailed(job=job, attempt=attempt,
taskStatus=taskStatus)
} else {
//for now just fake schedule based job
jobStats.jobFailed(jobName=jobName, taskStatus=taskStatus,
attempt=attempt)
}
jobsObserver.apply(JobFailed(jobOption.toRight(jobName), taskStatus, attempt))
}

/**
Expand Down Expand Up @@ -574,6 +516,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
//The nextDate has passed already beyond epsilon.
//TODO(FL): Think about the semantics here and see if it always makes sense to skip ahead of missed schedules.
if (!nextDate.isBefore(now)) {
jobsObserver.apply(JobSkipped(job, nextDate))
return (None, Some(stream))
}
//Needs to be scheduled at a later time, after schedule horizon.
Expand Down

0 comments on commit 3ad15f0

Please sign in to comment.