Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Mesos fetcher cache (and other options). #637

Merged
merged 3 commits into from
Feb 29, 2016
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ curl -L -H 'Content-Type: application/json' -X POST -d '{json hash}' chronos-nod
},
"cpus": "0.5",
"mem": "512",
"uris": [],
"fetch": [],
"command": "while sleep 10; do date =u %T; done"
}
```
Expand Down Expand Up @@ -335,7 +335,8 @@ When specifying the `command` field in your job hash, use `url-runner.bash` (mak
| `mem` | Amount of Mesos Memory (in MB) for this job. | `128` or `--mesos_task_mem` |
| `disk` | Amount of Mesos disk (in MB) for this job. | `256` or `--mesos_task_disk` |
| `disabled` | If set to `true`, this job will not be run. | `false` |
| `uris` | An array of URIs which Mesos will download when the task is started. | - |
| `uris` | An array of URIs which Mesos will download when the task is started (deprecated). | - |
| `fetch` | An array of fetch configurations, one for each file that Mesos Fetcher will download when the task is started).| - |
| `schedule` | [ISO 8601][] repeating schedule for this job. If specified, `parents` must not be specified. | - |
| `scheduleTimeZone` | The time zone for the given schedule, specified in the [tz database](https://en.wikipedia.org/wiki/Tz_database) format. | - |
| `parents` | An array of parent jobs for a dependent job. If specified, `schedule` must not be specified. | - |
Expand Down Expand Up @@ -370,7 +371,14 @@ When specifying the `command` field in your job hash, use `url-runner.bash` (mak
"disk": 10240,
"mem": 1024,
"disabled": false,
"uris": [],
"fetch": [
{
"uri": "https://url-to-file",
"cache": false,
"extract": false,
"executable": false
}
],
"schedule": "R/2014-03-08T20:00:00.000Z/PT2H",
"environmentVariables": [
{
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
<joda-convert.version>1.7</joda-convert.version>
<joda-time.version>2.3</joda-time.version>
<lz4.version>1.2.0</lz4.version>
<mesos-utils.version>0.22.0-1</mesos-utils.version>
<mesos-utils.version>0.23.0</mesos-utils.version>
<metrics.version>3.1.0</metrics.version>
<raven.version>4.1.2</raven.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class HttpClient(val endpointUrl: String,
generator.writeStringField("mem", job.mem.toString())
generator.writeStringField("retries", job.retries.toString())
generator.writeStringField("successCount", job.successCount.toString())
generator.writeStringField("uris", job.uris.mkString(","))
val uris = job.fetch.map { _.uri } ++ job.uris
generator.writeStringField("uris", uris.mkString(","))


generator.writeEndObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class RavenClient(val dsn: String) extends NotificationClient {

def sendNotification(job: BaseJob, to: String, subject: String, message: Option[String]) {
val ravenMessage = subject + "\n\n" + message.getOrElse("")
val uris = job.fetch.map { _.uri } ++ job.uris
val event = new EventBuilder()
.setMessage(ravenMessage)
.setLevel(Event.Level.ERROR)
Expand All @@ -36,7 +37,7 @@ class RavenClient(val dsn: String) extends NotificationClient {
.addExtra("mem", job.mem)
.addExtra("retries", job.retries)
.addExtra("successCount", job.successCount)
.addExtra("uris", job.uris.mkString(","))
.addExtra("uris", uris.mkString(","))
.build()

raven.sendEvent(event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Iso8601JobResource @Inject()(
"the job's name is invalid. Allowed names: '%s'".format(JobUtils.jobNamePattern.toString()))
if (!Iso8601Expressions.canParse(newJob.schedule, newJob.scheduleTimeZone))
return Response.status(Response.Status.BAD_REQUEST).build()
if(! JobUtils.isValidURIDefinition(newJob)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of style across the repo there is a space following each 'if' statement but in your commit every single 'if' lacks the space between the opening parentheses. Consider adding the space to keep consistent style and removing the space between the negation (!) and statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide either new commit or amend this commit.

log.warning(s"Tried to add both uri (deprecated) and fetch parameters on ${newJob.name}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's Scala string interpolation. I don't think Chronos can be built < Scala 2.10.

return Response.status(Response.Status.BAD_REQUEST).build()
}

//TODO(FL): Create a wrapper class that handles adding & removing jobs!
jobScheduler.registerJob(List(newJob), persist = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
disabled = childJob.disabled,
softError = childJob.softError,
uris = childJob.uris,
fetch = childJob.fetch,
highPriority = childJob.highPriority
)
jobScheduler.updateJob(childJob, newChild)
Expand Down Expand Up @@ -201,13 +202,19 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
@Timed
def list(): Response = {
try {
val jobs = ListBuffer[BaseJob]()
import scala.collection.JavaConversions._
jobGraph.dag.vertexSet().map({
job =>
jobs += jobGraph.getJobForName(job).get
})
Response.ok(jobs.toList).build
val jobs = jobGraph.dag.vertexSet()
.map { jobGraph.getJobForName }
.flatten
.map { // copies fetch in uris or uris in fetch (only one can be set) __only__ in REST get, for compatibility
case j : ScheduleBasedJob =>
if(j.fetch.isEmpty) j.copy(fetch = j.uris.map { Fetch(_) })
else j.copy(uris = j.fetch.map { _.uri })
case j : DependencyBasedJob =>
if(j.fetch.isEmpty) j.copy(fetch = j.uris.map { Fetch(_) })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is duplicated on matching each type. Consider moving it to a method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cannot be moved to a method, due to the fact that there is a case class duplicate between ScheduleBasedJob and DependencyBasedJob. An alternative would be to add one case class standard copy() in the trait, just for this one usage; I don't like too much the idea.

Best course of action would be to refactor to remove duplicates in the model (out of scope of this PR).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Due to the lack of standard copy command this is okay.

else j.copy(uris = j.fetch.map { _.uri })
}
Response.ok(jobs).build
} catch {
case ex: Exception =>
log.log(Level.WARNING, "Exception while serving request", ex)
Expand Down
12 changes: 12 additions & 0 deletions src/main/scala/org/apache/mesos/chronos/scheduler/jobs/Fetch.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.mesos.chronos.scheduler.jobs

import com.fasterxml.jackson.annotation.JsonProperty

/**
* Created by Sylvain Veyrié on 17/02/2016.
*/
case class Fetch(
@JsonProperty uri: String,
@JsonProperty executable: Boolean = false,
@JsonProperty cache: Boolean = false,
@JsonProperty extract: Boolean = false)
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ object JobUtils {
}
}

def isValidURIDefinition(baseJob: BaseJob) = baseJob.uris.isEmpty || baseJob.fetch.isEmpty // when you leave the deprecated one, then it should be empty

//TODO(FL): Think about moving this back into the JobScheduler, though it might be a bit crowded.
def loadJobs(scheduler: JobScheduler, store: PersistenceStore) {
//TODO(FL): Create functions that map strings to jobs
Expand Down Expand Up @@ -183,6 +185,7 @@ object JobUtils {
errorsSinceLastSuccess = job.errorsSinceLastSuccess,
softError = job.softError,
uris = job.uris,
fetch = job.fetch,
highPriority = job.highPriority,
runAsUser = job.runAsUser,
container = job.container,
Expand Down Expand Up @@ -214,6 +217,7 @@ object JobUtils {
errorsSinceLastSuccess = job.errorsSinceLastSuccess,
softError = job.softError,
uris = job.uris,
fetch = job.fetch,
highPriority = job.highPriority,
runAsUser = job.runAsUser,
container = job.container,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ trait BaseJob {

def errorsSinceLastSuccess: Long = 0L

@Deprecated
def uris: Seq[String] = List()

def fetch: Seq[Fetch] = List()

def highPriority: Boolean = false

def runAsUser: String = ""
Expand Down Expand Up @@ -104,7 +107,8 @@ case class ScheduleBasedJob(
@JsonProperty override val mem: Double = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@JsonProperty override val uris: Seq[String] = List(),
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
@JsonProperty override val fetch: Seq[Fetch] = List(),
@JsonProperty override val highPriority: Boolean = false,
@JsonProperty override val runAsUser: String = "",
@JsonProperty override val container: DockerContainer = null,
Expand Down Expand Up @@ -140,7 +144,8 @@ case class DependencyBasedJob(
@JsonProperty override val mem: Double = 0,
@JsonProperty override val disabled: Boolean = false,
@JsonProperty override val errorsSinceLastSuccess: Long = 0L,
@JsonProperty override val uris: Seq[String] = List(),
@Deprecated @JsonProperty override val uris: Seq[String] = List(),
@JsonProperty override val fetch: Seq[Fetch] = List(),
@JsonProperty override val highPriority: Boolean = false,
@JsonProperty override val runAsUser: String = "",
@JsonProperty override val container: DockerContainer = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.util.logging.Logger
import javax.inject.Inject

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.BaseJob
import org.apache.mesos.chronos.scheduler.jobs.{Fetch, BaseJob}
import com.google.common.base.Charsets
import com.google.protobuf.ByteString
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
Expand Down Expand Up @@ -83,14 +83,18 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
)
}

val uriProtos = job.uris.map { uri =>
val fetch = job.fetch ++ job.uris.map { Fetch(_) }
val uriCommand = fetch.map { f =>
CommandInfo.URI.newBuilder()
.setValue(uri)
.setValue(f.uri)
.setExtract(f.extract)
.setExecutable(f.executable)
.setCache(f.cache)
.build()
}

if (job.executor.nonEmpty) {
appendExecutorData(taskInfo, job, environment, uriProtos)
appendExecutorData(taskInfo, job, environment, uriCommand)
} else {
val command = CommandInfo.newBuilder()
if (job.command.startsWith("http") || job.command.startsWith("ftp")) {
Expand All @@ -106,7 +110,7 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
.setShell(job.shell)
.setEnvironment(environment)
.addAllArguments(job.arguments.asJava)
.addAllUris(uriProtos.asJava)
.addAllUris(uriCommand.asJava)
}
if (job.runAsUser.nonEmpty) {
command.setUser(job.runAsUser)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,19 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
}
}

var fetch = scala.collection.mutable.ListBuffer[Fetch]()
if(node.has("fetch")) {
node.get("fetch").elements().map {
case node: ObjectNode => {
val uri = Option(node.get("uri")).map { _.asText() }.getOrElse("")
val executable = Option(node.get("executable")).map { _.asBoolean() }.getOrElse(false)
val cache = Option(node.get("cache")).map { _.asBoolean() }.getOrElse(false)
val extract = Option(node.get("extract")).map { _.asBoolean() }.getOrElse(false)
Fetch(uri, executable, cache, extract)
}
}.foreach(fetch.add)
}

var arguments = scala.collection.mutable.ListBuffer[String]()
if (node.has("arguments")) {
for (argument <- node.path("arguments")) {
Expand Down Expand Up @@ -206,7 +219,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
executor = executor, executorFlags = executorFlags, retries = retries, owner = owner,
ownerName = ownerName, description = description, lastError = lastError, lastSuccess = lastSuccess,
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell,
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType,
constraints = constraints)
Expand All @@ -217,7 +230,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
executorFlags = executorFlags, retries = retries, owner = owner, ownerName = ownerName,
description = description, lastError = lastError, lastSuccess = lastSuccess, async = async,
cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, scheduleTimeZone = scheduleTimeZone,
environmentVariables = environmentVariables, shell = shell, arguments = arguments, softError = softError,
dataProcessingJobType = dataProcessingJobType, constraints = constraints)
Expand All @@ -227,7 +240,7 @@ class JobDeserializer extends JsonDeserializer[BaseJob] {
errorCount = errorCount, executor = executor, executorFlags = executorFlags, retries = retries, owner = owner,
ownerName = ownerName, description = description, lastError = lastError, lastSuccess = lastSuccess,
async = async, cpus = cpus, disk = disk, mem = mem, disabled = disabled,
errorsSinceLastSuccess = errorsSinceLastSuccess, uris = uris, highPriority = highPriority,
errorsSinceLastSuccess = errorsSinceLastSuccess, fetch = fetch, uris = uris, highPriority = highPriority,
runAsUser = runAsUser, container = container, environmentVariables = environmentVariables, shell = shell,
arguments = arguments, softError = softError, dataProcessingJobType = dataProcessingJobType,
constraints = constraints)
Expand Down
16 changes: 16 additions & 0 deletions src/main/scala/org/apache/mesos/chronos/utils/JobSerializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,22 @@ class JobSerializer extends JsonSerializer[BaseJob] {
json.writeFieldName("errorsSinceLastSuccess")
json.writeNumber(baseJob.errorsSinceLastSuccess)

json.writeFieldName("fetch")
json.writeStartArray()
baseJob.fetch.foreach { f =>
json.writeStartObject()
json.writeFieldName("uri")
json.writeString(f.uri)
json.writeFieldName("executable")
json.writeBoolean(f.executable)
json.writeFieldName("cache")
json.writeBoolean(f.cache)
json.writeFieldName("extract")
json.writeBoolean(f.extract)
json.writeEndObject()
}
json.writeEndArray()

json.writeFieldName("uris")
json.writeStartArray()
baseJob.uris.foreach(json.writeString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ class SerDeTest extends SpecificationWithJUnit {
LikeConstraint("rack", "rack-[1-3]")
)

val fetch = Seq(Fetch("https://mesos.github.io/chronos/", true, false, true))

val a = new DependencyBasedJob(Set("B", "C", "D", "E"), "A", "noop", Minutes.minutes(5).toPeriod, 10L,
20L, "fooexec", "fooflags", 7, "foo@bar.com", "Foo", "Test dependency-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = false, arguments = arguments, softError = true, constraints = constraints)
shell = false, arguments = arguments, softError = true, constraints = constraints, fetch = fetch)

val aStr = objectMapper.writeValueAsString(a)
val aCopy = objectMapper.readValue(aStr, classOf[DependencyBasedJob])
Expand Down Expand Up @@ -85,10 +87,12 @@ class SerDeTest extends SpecificationWithJUnit {
LikeConstraint("rack", "rack-[1-3]")
)

val fetch = Seq(Fetch("https://mesos.github.io/chronos/", true, false, true))

val a = new ScheduleBasedJob("FOO/BAR/BAM", "A", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
"fooexec", "fooflags", 7, "foo@bar.com", "Foo", "Test schedule-based job", "TODAY",
"YESTERDAY", true, container = container, environmentVariables = environmentVariables,
shell = true, arguments = arguments, softError = true, constraints = constraints)
shell = true, arguments = arguments, softError = true, constraints = constraints, fetch = fetch)

val aStr = objectMapper.writeValueAsString(a)
val aCopy = objectMapper.readValue(aStr, classOf[ScheduleBasedJob])
Expand Down