Skip to content

Commit

Permalink
task arguments (#717)
Browse files Browse the repository at this point in the history
* using arguments if provided otherwise using the job arguments

* fixing tests

* adding additional tests for argument override

* command injection fixed for this feature
  • Loading branch information
kensipe authored and gkleiman committed Apr 26, 2017
1 parent f7b074d commit f770c28
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ class JobManagementResource @Inject()(val jobScheduler: JobScheduler,
require(jobGraph.lookupVertex(jobName).isDefined, "Job '%s' not found".format(jobName))
val job = jobGraph.getJobForName(jobName).get
log.info("Manually triggering job:" + jobName)
jobScheduler.taskManager.enqueue(TaskUtils.getTaskId(job, DateTime.now(DateTimeZone.UTC), 0), job.highPriority)
jobScheduler.taskManager.enqueue(TaskUtils.getTaskId(job, DateTime.now(DateTimeZone.UTC), 0, Option(arguments).filter(_.trim.nonEmpty))
, job.highPriority)
Response.noContent().build
} catch {
case ex: IllegalArgumentException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package org.apache.mesos.chronos.scheduler.jobs

import java.util.logging.Logger

import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import org.apache.mesos.Protos.{TaskID, TaskState, TaskStatus}
import org.apache.mesos.chronos.scheduler.state.PersistenceStore
import org.joda.time.{DateTime, DateTimeZone}

import scala.collection.mutable
Expand All @@ -22,6 +22,8 @@ object TaskUtils {
val taskIdTemplate = "ct:%d:%d:%s:%s"
val argumentsPattern = """(.*)?""".r
val taskIdPattern = """ct:(\d+):(\d+):%s:?%s""".format(JobUtils.jobNamePattern, argumentsPattern).r
val commandInjectionFilter = ";".toSet

private[this] val log = Logger.getLogger(getClass.getName)

def getTaskStatus(job: BaseJob, due: DateTime, attempt: Int = 0): TaskStatus = {
Expand Down Expand Up @@ -99,8 +101,9 @@ object TaskUtils {
})
}

def getTaskId(job: BaseJob, due: DateTime, attempt: Int = 0): String = {
taskIdTemplate.format(due.getMillis, attempt, job.name, job.arguments.mkString(" "))
def getTaskId(job: BaseJob, due: DateTime, attempt: Int = 0, arguments: Option[String] = None): String = {
val args: String = arguments.getOrElse(job.arguments.mkString(" ")).filterNot(commandInjectionFilter)
taskIdTemplate.format(due.getMillis, attempt, job.name, args)
}

def getDueTimes(tasks: Map[String, Array[Byte]]): Map[String, (BaseJob, Long, Int)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class JobSchedulerIntegrationTest extends SpecificationWithJUnit with Mockito {
jobMarkedSuccess.successCount must_== 1
jobMarkedSuccess.errorsSinceLastSuccess must_== 0
val lastSuccess = DateTime.parse(jobMarkedSuccess.lastSuccess)
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(dependentJob, lastSuccess, 0),
there was one(mockTaskManager).enqueue(TaskUtils.getTaskId(dependentJob, lastSuccess, 0, None),
highPriority = false)
scheduler.handleStartedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
scheduler.handleFinishedTask(TaskUtils.getTaskStatus(dependentJob, lastSuccess, 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,22 @@ class TaskUtilsSpec extends SpecificationWithJUnit with Mockito {
"Get taskId" in {
val schedule = "R/2012-01-01T00:00:01.000Z/P1M"
val arguments = "-a 1 -b 2"
val cmdArgs = "-c 1 -d 2"
val job1 = new ScheduleBasedJob(schedule, "sample-name", "sample-command", arguments = List(arguments))
val job2 = new ScheduleBasedJob(schedule, "sample-name", "sample-command")
val job3 = new ScheduleBasedJob(schedule, "sample-name", "sample-command", arguments = List(arguments))
val ts = 1420843781398L
val due = new DateTime(ts)

val taskIdOne = TaskUtils.getTaskId(job1, due, 0)
val taskIdTwo = TaskUtils.getTaskId(job2, due, 0)
val taskIdThree = TaskUtils.getTaskId(job3, due, 0, Option(cmdArgs))
val taskIdFour = TaskUtils.getTaskId(job2, due, 0, Option(cmdArgs))

taskIdOne must_== "ct:1420843781398:0:sample-name:" + arguments
taskIdTwo must_== "ct:1420843781398:0:sample-name:"
taskIdThree must_== "ct:1420843781398:0:sample-name:" + cmdArgs // test override
taskIdFour must_== "ct:1420843781398:0:sample-name:" + cmdArgs // test adding args
}

"Get job arguments for taskId" in {
Expand All @@ -30,6 +36,19 @@ class TaskUtilsSpec extends SpecificationWithJUnit with Mockito {
jobArguments must_== arguments
}

"Disable command injection" in {
val schedule = "R/2012-01-01T00:00:01.000Z/P1M"
val cmdArgs = "-c 1 ; ./evil.sh"
val expectedArgs = "-c 1 ./evil.sh"
val job1 = new ScheduleBasedJob(schedule, "sample-name", "sample-command")
val ts = 1420843781398L
val due = new DateTime(ts)

val taskIdOne = TaskUtils.getTaskId(job1, due, 0, Option(cmdArgs))

taskIdOne must_== "ct:1420843781398:0:sample-name:" + expectedArgs
}

"Parse taskId" in {
val arguments = "-a 1 -b 2"
val arguments2 = "-a 1:2 --B test"
Expand Down

0 comments on commit f770c28

Please sign in to comment.