Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

on cluster environment , the job scheduled should respect the remote exec policies #5089

Merged
merged 3 commits into from Jul 30, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -87,4 +87,13 @@ public interface JobScheduleManager {
* @return list dead cluster members
*/
List<String> getDeadMembers(String uuid);

/**
* Schedule a job to run later
*
* @param data dataRundeckproClusterGrailsPlugin
*
* @return true if successful
*/
boolean scheduleRemoteJob(Map data);
gschueler marked this conversation as resolved.
Show resolved Hide resolved
}
Expand Up @@ -53,6 +53,11 @@ class JobSchedulerService implements JobScheduleManager {
List<String> getDeadMembers(String uuid) {
return rundeckJobScheduleManager.getDeadMembers(uuid);
}

@Override
boolean scheduleRemoteJob(Map data) {
return rundeckJobScheduleManager.scheduleRemoteJob(data)
}
}

/**
Expand All @@ -66,7 +71,6 @@ class QuartzJobScheduleManager implements JobScheduleManager {
@Autowired
def FrameworkService frameworkService


@Override
void deleteJobSchedule(final String name, final String group) {
quartzScheduler.deleteJob(new JobKey(name, group))
Expand Down Expand Up @@ -127,4 +131,10 @@ class QuartzJobScheduleManager implements JobScheduleManager {
List<String> getDeadMembers(String uuid) {
return null;
}

@Override
boolean scheduleRemoteJob(Map data) {
false
}

}
Expand Up @@ -628,17 +628,17 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
}

def rescheduleJob(ScheduledExecution scheduledExecution) {
rescheduleJob(scheduledExecution, false, null, null)
rescheduleJob(scheduledExecution, false, null, null, false)
}

def rescheduleJob(ScheduledExecution scheduledExecution, wasScheduled, oldJobName, oldJobGroup) {
def rescheduleJob(ScheduledExecution scheduledExecution, wasScheduled, oldJobName, oldJobGroup, boolean forceLocal) {
if (scheduledExecution.shouldScheduleExecution() && shouldScheduleInThisProject(scheduledExecution.project)) {
//verify cluster member is schedule owner

def nextdate = null
def nextExecNode = null
try {
(nextdate, nextExecNode) = scheduleJob(scheduledExecution, oldJobName, oldJobGroup);
(nextdate, nextExecNode) = scheduleJob(scheduledExecution, oldJobName, oldJobGroup, forceLocal);
} catch (SchedulerException e) {
log.error("Unable to schedule job: ${scheduledExecution.extid}: ${e.message}")
}
Expand Down Expand Up @@ -685,7 +685,7 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
try {
def nexttime = null
def nextExecNode = null
(nexttime, nextExecNode) = scheduleJob(se, null, null)
(nexttime, nextExecNode) = scheduleJob(se, null, null, true)
succeededJobs << [job: se, nextscheduled: nexttime]
log.info("rescheduled job in project ${se.project}: ${se.extid}")
} catch (Exception e) {
Expand Down Expand Up @@ -1068,7 +1068,7 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
AuthConstants.ACTION_RUN,se.project)
}

def scheduleJob(ScheduledExecution se, String oldJobName, String oldGroupName) {
def scheduleJob(ScheduledExecution se, String oldJobName, String oldGroupName, boolean forceLocal=false) {
def jobid = "${se.generateFullName()} [${se.extid}]"
def jobDesc = "Attempt to schedule job $jobid in project $se.project"
if (!executionService.executionsAreActive) {
Expand All @@ -1088,6 +1088,17 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
return [null, null];
}

def data=["project": se.project,
"jobId":se.uuid]

if(!forceLocal){
boolean remoteAssign = jobSchedulerService.scheduleRemoteJob(data)

if(remoteAssign){
return [null, null]
}
}

def jobDetail = createJobDetail(se)
def trigger = createTrigger(se)
jobDetail.getJobDataMap().put("bySchedule", true)
Expand Down Expand Up @@ -2105,7 +2116,7 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
}

if (scheduledExecution.save(flush: true)) {
rescheduleJob(scheduledExecution, oldSched, oldJobName, oldJobGroup)
rescheduleJob(scheduledExecution, oldSched, oldJobName, oldJobGroup, true)
return [success: true, scheduledExecution: scheduledExecution]
} else {
scheduledExecution.discard()
Expand Down Expand Up @@ -2265,6 +2276,7 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
scheduledExecution.nextExecution = new Date(ScheduledExecutionService.TWO_HUNDRED_YEARS)
}

boolean shouldreSchedule = false
if(frameworkService.isClusterModeEnabled()){

if (originalCron != scheduledExecution.generateCrontabExression() ||
Expand All @@ -2283,11 +2295,15 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
)
if (modify) {
scheduledExecution.serverNodeUUID = frameworkService.serverUUID
//schedule meesage want sent , it should be ran locally
shouldreSchedule = true
}
}
if (!scheduledExecution.serverNodeUUID) {
scheduledExecution.serverNodeUUID = frameworkService.serverUUID
}
}else{
shouldreSchedule = true
}
def boolean renamed = oldjobname != scheduledExecution.generateJobScheduledName() || oldjobgroup != scheduledExecution.generateJobGroupName()
if (renamed) {
Expand Down Expand Up @@ -2605,7 +2621,7 @@ class ScheduledExecutionService implements ApplicationContextAware, Initializing
}
}
if (!failed && scheduledExecution.save(true)) {
if (scheduledExecution.shouldScheduleExecution() && shouldScheduleInThisProject(scheduledExecution.project)) {
if (scheduledExecution.shouldScheduleExecution() && shouldScheduleInThisProject(scheduledExecution.project) && shouldreSchedule) {
def nextdate = null
def nextExecNode = null
try {
Expand Down
Expand Up @@ -180,10 +180,11 @@ class ScheduledExecutionServiceSpec extends Specification {
getServerUUID() >> 'uuid'
isClusterModeEnabled() >> clusterEnabled
}
service.rundeckJobScheduleManager=Mock(JobScheduleManager){
service.jobSchedulerService=Mock(JobSchedulerService){
determineExecNode(*_)>>{args->
return serverNodeUUID
}
scheduleRemoteJob(_)>>false
}
def job = new ScheduledExecution(
createJobParams(
Expand Down Expand Up @@ -2685,7 +2686,7 @@ class ScheduledExecutionServiceSpec extends Specification {
def scheduleDate = new Date()

when:
def result = service.scheduleJob(job, null, null)
def result = service.scheduleJob(job, null, null, true)

then:
1 * service.executionServiceBean.getExecutionsAreActive() >> executionsAreActive
Expand Down Expand Up @@ -3280,4 +3281,72 @@ class ScheduledExecutionServiceSpec extends Specification {
true | null | 'qa'
null | null |' user'
}


@Unroll
def "cluster, should not scheduleJob because the remote policy"() {
given:
service.executionServiceBean = Mock(ExecutionService)
service.quartzScheduler = Mock(Scheduler) {
getListenerManager() >> Mock(ListenerManager)
}
def projectMock = Mock(IRundeckProject) {
getProjectProperties() >> [:]
}

def serverNodeUUID = "uuid"
def clusterEnabled = true

service.frameworkService = Mock(FrameworkService) {
getRundeckBase() >> ''
getFrameworkProject(_) >> projectMock
getServerUUID() >> serverNodeUUID
isClusterModeEnabled() >> clusterEnabled
}
service.jobSchedulerService=Mock(JobSchedulerService){
determineExecNode(*_)>>{args->
return serverNodeUUID
}
scheduleRemoteJob(_)>>true
}
def job = new ScheduledExecution(
createJobParams(
scheduled: true,
scheduleEnabled: true,
executionEnabled: true,
userRoleList: 'a,b'
)
).save()

when:
def result = service.scheduleJob(job, null, null)

then:
0 * service.quartzScheduler.scheduleJob(_, _)
result == [null, null]
}

@Unroll
def "do update job on cluster must not call quartz scheduleJob"(){
given:
def serverUUID = '802d38a5-0cd1-44b3-91ff-824d495f8105'
def uuid = setupDoUpdate(true,serverUUID)

def jobOwnerUuid = '5e0e96a0-042a-426a-80a4-488f7f6a4f13'
def se = new ScheduledExecution(createJobParams([serverNodeUUID:jobOwnerUuid, scheduled: false])).save()
service.jobSchedulerService = Mock(JobSchedulerService)

def inparams = [jobName: 'newName', scheduled: true]
when:
def results = service._doupdate([id: se.id.toString()] + inparams, mockAuth())


then:
results.success
results.scheduledExecution.serverNodeUUID == jobOwnerUuid
1 * service.jobSchedulerService.updateScheduleOwner(_, _, _) >> false
0 * service.quartzScheduler.scheduleJob(_,_,_)


}
}