Skip to content

Commit

Permalink
Added a job's run time and attempt as environment variables exposed t…
Browse files Browse the repository at this point in the history
…o the job
  • Loading branch information
vineethvarghese committed Apr 12, 2016
1 parent b9ddbaa commit bd00b61
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import java.util.logging.Logger
import javax.inject.Inject

import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.{Fetch, BaseJob}
import org.apache.mesos.chronos.scheduler.jobs.{TaskUtils, EnvironmentVariable, Fetch, BaseJob}

import com.google.common.base.Charsets
import com.google.protobuf.ByteString
import org.apache.mesos.Protos.ContainerInfo.DockerInfo
Expand Down Expand Up @@ -50,38 +51,42 @@ class MesosTaskBuilder @Inject()(val conf: SchedulerConfiguration) {
builder.build()
}

def envs(taskIdStr: String, job: BaseJob, offer: Offer): Environment.Builder = {
val (_, start, attempt, _) = TaskUtils.parseTaskId(taskIdStr)
val baseEnv = Map(
"mesos_task_id" -> taskIdStr,
"CHRONOS_JOB_OWNER" -> job.owner,
"CHRONOS_JOB_NAME" -> job.name,
"HOST" -> offer.getHostname,
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)

// If the job defines custom environment variables, add them to the builder
// Don't add them if they already exist to prevent overwriting the defaults
val finalEnv =
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
job.environmentVariables.foldLeft(baseEnv)((envs, env) =>
if (envs.contains(env.name)) envs else envs + (env.name -> env.value)
)
} else {
baseEnv
}

finalEnv.foldLeft(Environment.newBuilder())((builder, env) =>
builder.addVariables(Variable.newBuilder().setName(env._1).setValue(env._2)))
}

def getMesosTaskInfoBuilder(taskIdStr: String, job: BaseJob, offer: Offer): TaskInfo.Builder = {
//TODO(FL): Allow adding more fine grained resource controls.
val taskId = TaskID.newBuilder().setValue(taskIdStr).build()
val taskInfo = TaskInfo.newBuilder()
.setName(taskNameTemplate.format(job.name))
.setTaskId(taskId)
val environment = Environment.newBuilder()
.addVariables(Variable.newBuilder()
.setName("mesos_task_id").setValue(taskIdStr))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_JOB_OWNER").setValue(job.owner))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_JOB_NAME").setValue(job.name))
.addVariables(Variable.newBuilder()
.setName("HOST").setValue(offer.getHostname))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_MEM").setValue(job.mem.toString))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_CPU").setValue(job.cpus.toString))
.addVariables(Variable.newBuilder()
.setName("CHRONOS_RESOURCE_DISK").setValue(job.disk.toString))

// If the job defines custom environment variables, add them to the builder
// Don't add them if they already exist to prevent overwriting the defaults
val builtinEnvNames = environment.getVariablesList.asScala.map(_.getName).toSet
if (job.environmentVariables != null && job.environmentVariables.nonEmpty) {
job.environmentVariables.foreach(env =>
if (!builtinEnvNames.contains(env.name)) {
environment.addVariables(Variable.newBuilder().setName(env.name).setValue(env.value))
}
)
}
val environment = envs(taskIdStr, job, offer)

val fetch = job.fetch ++ job.uris.map { Fetch(_) }
val uriCommand = fetch.map { f =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.apache.mesos.chronos.scheduler.mesos

import scala.collection.JavaConversions._
import org.apache.mesos.Protos._
import org.apache.mesos.chronos.scheduler.config.SchedulerConfiguration
import org.apache.mesos.chronos.scheduler.jobs.Parameter
import org.apache.mesos.chronos.scheduler.jobs.Volume
import org.apache.mesos.chronos.scheduler.jobs._
import org.apache.mesos.chronos.scheduler.jobs.constraints.{LikeConstraint, EqualsConstraint}
import org.joda.time.Minutes
import org.specs2.mock.Mockito
import org.specs2.mutable.SpecificationWithJUnit


class MesosTaskBuilderSpec extends SpecificationWithJUnit with Mockito {

val taskId = "ct:1454467003926:0:test2Execution:run"

val (_, start, attempt, _) = TaskUtils.parseTaskId(taskId)

val offer = Offer.newBuilder().mergeFrom(Offer.getDefaultInstance)
.setHostname("localport")
.setId(OfferID.newBuilder().setValue("123").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("123").build())
.setSlaveId(SlaveID.newBuilder().setValue("123").build())
.build()

val job = {
val volumes = Seq(
Volume(Option("/host/dir"), "container/dir", Option(VolumeMode.RW)),
Volume(None, "container/dir", None)
)

var parameters = scala.collection.mutable.ListBuffer[Parameter]()

val container = DockerContainer("dockerImage", volumes, parameters, NetworkMode.HOST, true)

val constraints = Seq(
EqualsConstraint("rack", "rack-1"),
LikeConstraint("rack", "rack-[1-3]")
)

new ScheduleBasedJob("FOO/BAR/BAM", "AJob", "noop", Minutes.minutes(5).toPeriod, 10L, 20L,
"fooexec", "fooflags", "none", 7, "foo@bar.com", "Foo", "Test schedule-based job", "TODAY",
"YESTERDAY", true, cpus = 2, disk = 3, mem = 5, container = container, environmentVariables = Seq(),
shell = true, arguments = Seq(), softError = true, constraints = constraints)
}

val defaultEnv = Map(
"mesos_task_id" -> taskId,
"CHRONOS_JOB_OWNER" -> job.owner,
"CHRONOS_JOB_NAME" -> job.name,
"HOST" -> offer.getHostname,
"CHRONOS_RESOURCE_MEM" -> job.mem.toString,
"CHRONOS_RESOURCE_CPU" -> job.cpus.toString,
"CHRONOS_RESOURCE_DISK" -> job.disk.toString,
"CHRONOS_JOB_RUN_TIME" -> start.toString,
"CHRONOS_JOB_RUN_ATTEMPT" -> attempt.toString
)

def toMap(envs: Environment): Map[String, String] =
envs.getVariablesList.foldLeft(Map[String, String]())((m, v) => m + (v.getName -> v.getValue))

"MesosTaskBuilder" should {
"Setup all the default environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

defaultEnv must_== toMap(target.envs(taskId, job, offer).build())
}
}

"MesosTaskBuilder" should {
"Setup all the default environment variables and job environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

val testJob = job.copy(environmentVariables = Seq(
EnvironmentVariable("FOO", "BAR"),
EnvironmentVariable("TOM", "JERRY")
))

val finalEnv = defaultEnv ++ Map("FOO" -> "BAR", "TOM" -> "JERRY")

finalEnv must_== toMap(target.envs(taskId, testJob, offer).build())
}
}

"MesosTaskBuilder" should {
"Should not allow job environment variables to overwrite any default environment variables" in {
val target = new MesosTaskBuilder(mock[SchedulerConfiguration])

val testJob = job.copy(environmentVariables = Seq(
EnvironmentVariable("CHRONOS_RESOURCE_MEM", "10000"),
EnvironmentVariable("CHRONOS_RESOURCE_DISK", "40000")
))

defaultEnv must_== toMap(target.envs(taskId, testJob, offer).build())
}
}
}

0 comments on commit bd00b61

Please sign in to comment.