Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce JobsObserver interface and place both JobStats and error notification behind that interface #436

Merged
merged 8 commits into from
May 3, 2015
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package org.apache.mesos.chronos.notification

import java.util.logging.Logger

import akka.actor.ActorRef
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs._
import org.joda.time.{DateTimeZone, DateTime}

class JobNotificationObserver @Inject()(val notificationClients: List[ActorRef] = List(),
val clusterName: Option[String] = None) {
private[this] val log = Logger.getLogger(getClass.getName)
val clusterPrefix = clusterName.map(name => s"[$name]").getOrElse("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's evaluate clusterPrefix inside JobRetriesExhausted (only where it is used)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is also used in JobRemoved case

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, makes sense


def asObserver: JobsObserver.Observer = {
case JobRemoved(job) => sendNotification(job, "%s [Chronos] Your job '%s' was deleted!".format(clusterPrefix, job.name), None)
case JobDisabled(job, cause) => sendNotification(
job,
"%s [Chronos] job '%s' disabled".format(clusterPrefix, job.name),
Some(cause))

case JobRetriesExhausted(job, taskStatus, attempts) =>
val msg = "\n'%s'. Retries attempted: %d.\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), job.retries, taskStatus.getTaskId.getValue)
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some(TaskUtils.appendSchedulerMessage(msg, taskStatus)))
}

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
for (client <- notificationClients) {
val subowners = job.owner.split("\\s*,\\s*")
for (subowner <- subowners) {
log.info("Sending mail notification to:%s for job %s using client: %s".format(subowner, job.name, client))
client !(job, subowner, subject, message)
}
}

log.info(subject)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import javax.ws.rs.{Consumes, GET, Path, Produces, WebApplicationException}

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.graph.Exporter
import com.codahale.metrics.annotation.Timed
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats

/**
* The REST API for managing jobs.
Expand All @@ -21,7 +21,7 @@ import com.google.inject.Inject
@Path(PathConstants.graphBasePath)
@Consumes(Array(MediaType.APPLICATION_JSON))
class GraphManagementResource @Inject()(
val jobScheduler: JobScheduler,
val jobStats: JobStats,
val jobGraph: JobGraph,
val configuration: SchedulerConfiguration) {

Expand All @@ -48,7 +48,7 @@ class GraphManagementResource @Inject()(
def jsonGraph(): Response = {
try {
val buffer = new StringWriter
Exporter.export(buffer, jobGraph, jobScheduler)
Exporter.export(buffer, jobGraph, jobStats)
Response.ok(buffer.toString).build
} catch {
case ex: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule

import com.codahale.metrics.annotation.Timed
import com.google.inject.Inject
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.mutable.ListBuffer
Expand All @@ -33,6 +34,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
val jobGraph: JobGraph,
val configuration: SchedulerConfiguration,
val cassandraConfig: CassandraConfiguration,
val jobStats: JobStats,
val jobMetrics: JobMetrics) {

private[this] val log = Logger.getLogger(getClass.getName)
Expand Down Expand Up @@ -100,8 +102,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deleted job notification will no longer be sent. No notification is sent in deregisterJob. We should still send the notification here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deregisterJob calls jobsObserver.onEvent(JobRemoved(job)), I just need to handle that in NotifyingJobObserver

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good, could you add that so that this notification is still sent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see it was just added. Thanks!

jobScheduler.sendNotification(job, "[Chronos] - Your job '%s' was deleted!".format(jobName))

// No need to send notifications here, the jobScheduler.deregisterJob will do it
jobScheduler.deregisterJob(job, persist = true)
Response.noContent().build
} catch {
Expand All @@ -123,7 +124,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
require(!jobOpt.isEmpty, "Job '%s' not found".format(jobName))

val histoStats = jobMetrics.getJobHistogramStats(jobName)
val jobStatsList: List[TaskStat] = jobScheduler.jobStats.getMostRecentTaskStatsByJob(jobOpt.get, cassandraConfig.jobHistoryLimit())
val jobStatsList: List[TaskStat] = jobStats.getMostRecentTaskStatsByJob(jobOpt.get, cassandraConfig.jobHistoryLimit())
val jobStatsWrapper = new JobStatWrapper(jobStatsList, histoStats)

val wrapperStr = objectMapper.writeValueAsString(jobStatsWrapper)
Expand Down Expand Up @@ -183,7 +184,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
require(num >= 0,
"numAdditionalElementsProcessed (%d) is not positive".format(num))

jobScheduler.jobStats.updateTaskProgress(jobOpt.get, taskId, num)
jobStats.updateTaskProgress(jobOpt.get, taskId, num)
}
case None =>
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package org.apache.mesos.chronos.scheduler.config

import org.apache.mesos.chronos.scheduler.jobs.JobStats
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.ProtocolOptions.Compression
import com.datastax.driver.core.policies.{DowngradingConsistencyRetryPolicy, LatencyAwarePolicy, RoundRobinPolicy}
import com.google.inject.{AbstractModule, Provides, Scopes, Singleton}
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats

class JobStatsModule(config: CassandraConfiguration) extends AbstractModule {
def configure() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import java.util.logging.{Level, Logger}

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.util.Timeout
import org.apache.mesos.chronos.notification.{MailClient, RavenClient, SlackClient}
import org.apache.mesos.chronos.notification.{JobNotificationObserver, MailClient, RavenClient, SlackClient}
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.jobs.{JobMetrics, JobScheduler, JobStats, TaskManager}
import org.apache.mesos.chronos.scheduler.jobs.stats.JobStats
import org.apache.mesos.chronos.scheduler.jobs.{JobsObserver, JobMetrics, JobScheduler, TaskManager}
import org.apache.mesos.chronos.scheduler.mesos.{MesosDriverFactory, MesosJobFramework, MesosTaskBuilder}
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import com.google.common.util.concurrent.{ListeningScheduledExecutorService, MoreExecutors, ThreadFactoryBuilder}
Expand Down Expand Up @@ -90,9 +91,8 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
mesosSchedulerDriver: MesosDriverFactory,
curator: CuratorFramework,
leaderLatch: LeaderLatch,
notificationClients: List[ActorRef],
metrics: JobMetrics,
stats: JobStats): JobScheduler = {
jobsObserver: JobsObserver.Observer,
metrics: JobMetrics): JobScheduler = {
new JobScheduler(
scheduleHorizon = Seconds.seconds(config.scheduleHorizonSeconds()).toPeriod,
taskManager = taskManager,
Expand All @@ -102,12 +102,17 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
curator = curator,
leaderLatch = leaderLatch,
leaderPath = config.zooKeeperCandidatePath,
notificationClients = notificationClients,
jobsObserver = jobsObserver,
failureRetryDelay = config.failureRetryDelayMs(),
disableAfterFailures = config.disableAfterFailures(),
jobMetrics = metrics,
jobStats = stats,
clusterName = config.clusterName.get)
jobMetrics = metrics)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this method (provideJobsObservers) called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is Guice @provides annotated factory method, if I understand Guice correctly. So whenever JobsObserver instance is needed a singleton instance is provided. https://github.com/google/guice/wiki/ProvidesMethods

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, thanks

@Singleton
@Provides
def provideJobsObservers(jobStats: JobStats, notificationClients: List[ActorRef]): JobsObserver.Observer = {
val notifier = new JobNotificationObserver(notificationClients, config.clusterName.get)
JobsObserver.composite(List(notifier.asObserver, jobStats.asObserver))
}

@Singleton
Expand Down Expand Up @@ -136,8 +141,7 @@ class MainModule(val config: SchedulerConfiguration with HttpConf)
for {
webhookUrl <- config.slackWebhookUrl.get if !config.slackWebhookUrl.isEmpty
} yield {
create(classOf[SlackClient], webhookUrl)
}
create(classOf[SlackClient], webhookUrl)}
).flatten
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{TimeUnit, Executors, Future}
import java.util.logging.{Level, Logger}

import akka.actor.{ActorSystem, ActorRef}
import akka.actor.ActorSystem
import org.apache.mesos.chronos.scheduler.graph.JobGraph
import org.apache.mesos.chronos.scheduler.mesos.MesosDriverFactory
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
Expand Down Expand Up @@ -34,12 +34,10 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val curator: CuratorFramework = null,
val leaderLatch: LeaderLatch = null,
val leaderPath: String = null,
val notificationClients: List[ActorRef] = List(),
val jobsObserver: JobsObserver.Observer,
val failureRetryDelay: Long = 60000,
val disableAfterFailures: Long = 0,
val jobMetrics: JobMetrics,
val jobStats: JobStats,
val clusterName: Option[String] = None)
val jobMetrics: JobMetrics)
//Allows us to let Chaos manage the lifecycle of this class.
extends AbstractIdleService {

Expand Down Expand Up @@ -210,7 +208,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

taskManager.cancelTasks(job)
taskManager.removeTasks(job)
jobStats.removeJobState(job)
jobsObserver.apply(JobRemoved(job))

if (persist) {
log.info("Removing job from underlying state abstraction:" + job.name)
Expand All @@ -233,7 +231,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
} else {
val job = jobOption.get
val (_, _, attempt, _) = TaskUtils.parseTaskId(taskId)
jobStats.jobStarted(job, taskStatus, attempt)
jobsObserver.apply(JobStarted(job, taskStatus, attempt))

job match {
case j: DependencyBasedJob =>
Expand All @@ -246,7 +244,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
/**
* Takes care of follow-up actions for a finished task, i.e. update the job schedule in the persistence store or
* launch tasks for dependent jobs
* @param taskId
*/
def handleFinishedTask(taskStatus: TaskStatus, taskDate: Option[DateTime] = None) {
// `taskDate` is purely for unit testing
Expand All @@ -265,7 +262,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
jobMetrics.updateJobStat(jobName, timeMs = DateTime.now(DateTimeZone.UTC).getMillis - start)
jobMetrics.updateJobStatus(jobName, success = true)
val job = jobOption.get
jobStats.jobFinished(job, taskStatus, attempt)
jobsObserver.apply(JobFinished(job, taskStatus, attempt))

val newJob = job match {
case job: ScheduleBasedJob =>
Expand Down Expand Up @@ -298,13 +295,9 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
log.info("Disabling job that reached a zero-recurrence count!")

val disabledJob: ScheduleBasedJob = scheduleBasedJob.copy(disabled = true)
val clusterPrefix = getClusterPrefix(clusterName)
sendNotification(
job,
"%s [Chronos] job '%s' disabled".format(clusterPrefix, job.name),
Some( """Job '%s' has exhausted all of its recurrences and has been disabled.
|Please consider either removing your job, or updating its schedule and re-enabling it.
""".stripMargin.format(job.name)))
jobsObserver.apply(JobDisabled(job, """Job '%s' has exhausted all of its recurrences and has been disabled.
|Please consider either removing your job, or updating its schedule and re-enabling it.
""".stripMargin.format(job.name)))
replaceJob(scheduleBasedJob, disabledJob)
}
case None =>
Expand All @@ -314,23 +307,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
}
}

def getClusterPrefix(clusterName: Option[String]) = clusterName match {
case Some(name) => s"[$name] "
case None => ""
}

def sendNotification(job: BaseJob, subject: String, message: Option[String] = None) {
for (client <- notificationClients) {
val subowners = job.owner.split("\\s*,\\s*")
for (subowner <- subowners) {
log.info("Sending mail notification to:%s for job %s using client: %s".format(subowner, job.name, client))
client !(job, subowner, subject, message)
}
}

log.info(subject)
}

def replaceJob(oldJob: BaseJob, newJob: BaseJob) {
lock.synchronized {
jobGraph.replaceVertex(oldJob, newJob)
Expand Down Expand Up @@ -364,11 +340,6 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,

def handleFailedTask(taskStatus: TaskStatus) {
val taskId = taskStatus.getTaskId.getValue
val message = if (taskStatus.hasMessage && taskStatus.getMessage.nonEmpty) {
Some(taskStatus.getMessage)
} else {
None
}
if (!TaskUtils.isValidVersion(taskId)) {
log.warning("Found old or invalid task, ignoring!")
} else {
Expand All @@ -377,7 +348,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val jobOption = jobGraph.lookupVertex(jobName)
jobOption match {
case Some(job) =>
jobStats.jobFailed(job, taskStatus, attempt)
jobsObserver.apply(JobFailed(Right(job), taskStatus, attempt))

val hasAttemptsLeft: Boolean = attempt < job.retries
val hadRecentSuccess: Boolean = try {
Expand Down Expand Up @@ -429,37 +400,17 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
updateJob(job, newJob)
if (job.softError) processDependencies(jobName, Option(lastErrorTime))

val clusterPrefix = getClusterPrefix(clusterName)

// Handle failure by either disabling the job and notifying the owner,
// or just notifying the owner.
if (disableJob) {
log.warning("Job failed beyond retries! Job will now be disabled after "
+ newJob.errorsSinceLastSuccess + " failures (disableAfterFailures=" + disableAfterFailures + ").")
message match {
case Some(msg) =>
sendNotification(job, "%s [Chronos] JOB DISABLED: '%s'".format(clusterPrefix, job.name),
Some("\nFailed at '%s', %d failures since last success\nTask id: %s\nThe scheduler provided this message:\n\n%s"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess,
taskId, msg)))
case None =>
sendNotification(job, "%s [Chronos] JOB DISABLED: '%s'".format(clusterPrefix, job.name),
Some("\nFailed at '%s', %d failures since last success\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess, taskId)))
}
val msg = "\nFailed at '%s', %d failures since last success\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), newJob.errorsSinceLastSuccess, taskId)
jobsObserver.apply(JobDisabled(job, TaskUtils.appendSchedulerMessage(msg, taskStatus)))
} else {
log.warning("Job failed beyond retries!")
message match {
case Some(msg) =>
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some("\n'%s'. Retries attempted: %d.\nTask id: %s\nThe scheduler provided this message:\n\n%s"
.format(DateTime.now(DateTimeZone.UTC), job.retries,
taskId, msg)))
case None =>
sendNotification(job, "%s [Chronos] job '%s' failed!".format(clusterPrefix, job.name),
Some("\n'%s'. Retries attempted: %d.\nTask id: %s\n"
.format(DateTime.now(DateTimeZone.UTC), job.retries, taskId)))
}
jobsObserver.apply(JobRetriesExhausted(job, taskStatus, attempt))
}
jobMetrics.updateJobStatus(jobName, success = false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are ignoring taskStatus.getMessage here it seems. We should not remove that logging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both here and above it is taken care of in JobNotificationObserver, see calls to TaskUtils.appendSchedulerMessage(msg, taskStatus), and see the code for appendSchedulerMessage() utility method

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks for clarifying. That utility method does take care of it.

}
Expand All @@ -486,16 +437,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
val (jobName, start, attempt, _) = TaskUtils.parseTaskId(taskId)
val jobOption = jobGraph.lookupVertex(jobName)

var job :BaseJob = null
if (!jobOption.isEmpty) {
job = jobOption.get
jobStats.jobFailed(job=job, attempt=attempt,
taskStatus=taskStatus)
} else {
//for now just fake schedule based job
jobStats.jobFailed(jobName=jobName, taskStatus=taskStatus,
attempt=attempt)
}
jobsObserver.apply(JobFailed(jobOption.toRight(jobName), taskStatus, attempt))
}

/**
Expand Down Expand Up @@ -574,6 +516,7 @@ class JobScheduler @Inject()(val scheduleHorizon: Period,
//The nextDate has passed already beyond epsilon.
//TODO(FL): Think about the semantics here and see if it always makes sense to skip ahead of missed schedules.
if (!nextDate.isBefore(now)) {
jobsObserver.apply(JobSkipped(job, nextDate))
return (None, Some(stream))
}
//Needs to be scheduled at a later time, after schedule horizon.
Expand Down