Skip to content
Permalink
Browse files

Assign host to queued jobs and app parameter "jobs_use_local_host"

  • Loading branch information...
tvannini committed Aug 22, 2019
1 parent 62e4ae1 commit 4c21553814a0242275423d47a44f5a7b49ab1244
Showing with 126 additions and 82 deletions.
  1. +96 −76 lib/jxapp.inc
  2. +16 −3 lib/jxfnx.inc
  3. +13 −2 lib/prgs/o2sys_iniconf.prf
  4. +1 −1 lib/prgs/o2sys_iniconf.prg
@@ -78,6 +78,7 @@ class o2_app {
public $rpc_server = false; /* If app is used as an RPC Server */
public $services = array(); /* List of defined services */
public $service_redirect = false; /* If set all jobs are assigned to this service */
public $jobs_local_host = false; /* If set all jobs are assigned to local host */
/* _____ Session ______________________________________________________________ */
public $validazione = 0; /* Validation level (0=none,1=full,2=mixed))*/
public $session_name = ''; /* Session name (PHPSESSID) */
@@ -1352,6 +1353,9 @@ class o2_app {
if (isset($conf['host'])) {
$this->host = strtoupper($conf['host']);
}
if (isset($conf['jobs_use_local_host'])) {
$this->jobs_local_host = ($conf['jobs_use_local_host'] ? true : false);
}
if (isset($conf['service_redirect'])) {
$this->service_redirect = strtoupper($conf['service_redirect']);
}
@@ -4084,6 +4088,7 @@ class o2_run_job {
* @param string $at_date Posticipate execution to date
* @param string $at_time Posticipate execution to time
* @param integer $sched_id Scheduler task ID which requested execution
* @param string $host Host to override assigned host for execution
* @return o2_run_job
*/
static function queue($job,
@@ -4092,7 +4097,8 @@ class o2_run_job {
$service = false,
$at_date = false,
$at_time = false,
$sched_id = 0) {
$sched_id = 0,
$host = false) {
$app = $_SESSION['o2_app'];
$run_job = new o2_run_job();
@@ -4146,7 +4152,8 @@ class o2_run_job {
$f['run_service']->nome_fisico,
$f['run_at_date']->nome_fisico,
$f['run_at_time']->nome_fisico,
$f['run_sched_id']->nome_fisico);
$f['run_sched_id']->nome_fisico,
$f['run_host']->nome_fisico);
$values = array("'".$run_job->id."'",
($run_job->job ? $run_job->job : '0'),
"'".$run_job->prg."'",
@@ -4160,7 +4167,9 @@ class o2_run_job {
"'".$run_job->service."'",
"'".$at_date."'",
"'".$at_time."'",
($run_job->sched_id ? $run_job->sched_id : '0'));
($run_job->sched_id ? $run_job->sched_id : '0'),
"'".($host ? $host :
($app->jobs_local_host ? $app->host : ''))."'");
$trans = $app->isolated_trans;
$app->isolated_trans = true;
$res = o2_gateway::insertrec($server->type,
@@ -4254,6 +4263,88 @@ class o2_run_job {
}
/**
* Run queued processes assigned to local host.
*
* Processes can be assigned to host in 2 ways:
* 1. process is assigned to a service and host is registered for that service;
* 2. process is assigned directly to host.
*
* Returns number of executed processes.
*
* @return integer
*/
static function run_queued() {
$app = $_SESSION['o2_app'];
$rnt = $app->runtime;
$res = 0;
// ________________________________________________ Update local host services ___
foreach (o2_service::get_host_services($app->host) as $srv => $srv_nice) {
o2_service::update_host_service($srv);
}
// _________________________ Get services for local host and queued executions ___
$host_srvs = o2_service::get_all_host_services();
$processes = self::get_list('', true);
$running = array();
$maxs = array();
$now = time();
// ___________________________________________________ Count running instances ___
foreach ($processes as $run_job) {
if ($run_job->is_running()) {
$running[$run_job->job] = $running[$run_job->job] + 1;
if (!isset($maxs[$run_job->job])) {
$job = o2_job::get_by_id($run_job->job);
$maxs[$run_job->job] = $job->max_instances;
}
}
}
// ________________________________________________ Loop on pending executions ___
foreach ($processes as $run_job) {
if ($run_job->status == 'Q') {
// ____________________________________ Process is assigned by service ___
$by_service = !$run_job->host && isset($host_srvs[$run_job->service]);
// _______________________________________ Process is assigned by host ___
$by_host = $run_job->host && ($run_job->host == $app->host);
if (($by_service || $by_host) &&
$run_job->exe_time <= $now &&
($maxs[$run_job->job] == 0 ||
$maxs[$run_job->job] > $running[$run_job->job])) {
$res++;
// ____________________ Set run-job status from Queued to Starting ___
$run_job->update_status('S');
// ______________________________ Get developer password if needed ___
$dev = false;
if ($run_job->developer) {
$dev = true;
$devkey = $rnt->developers[$run_job->developer];
}
$cmd = '"'.$rnt->php_engine.'" '.
$app->dir_htdocs.$app->nome.'.php jxrnt="'.$rnt->rnt_script.
'" user='.$run_job->user.
' password='.$app->get_user_password($run_job->user).
($dev ? ' dev='.$run_job->developer. ' key='.$devkey : '').' '.
self::$run_word.'='.$run_job->id;
$rnt->batch_exec($cmd);
$running[$run_job->job] = $running[$run_job->job] + 1;
if (!isset($maxs[$run_job->job])) {
$job = o2_job::get_by_id($run_job->job);
$maxs[$run_job->job] = $job->max_instances;
}
}
}
}
if ($res) {
$tab = $app->get_table(self::$run_table);
o2dbs_commit($tab->db->server->nome);
}
// __________________________________________________________ Update PIDs list ___
o2_service::update_host_pids();
return $res;
}
/**
* Kills instances marked as "killed" and running on this host
*
@@ -4285,7 +4376,7 @@ class o2_run_job {
$ret = 0;
if (count($run_recs)) {
foreach ($run_recs as $run_rec) {
$run_job = o2_run_job::get_by_id($run_rec['RUN_ID']);
$run_job = self::get_by_id($run_rec['RUN_ID']);
$run_job->kill();
$ret++;
}
@@ -5698,77 +5789,6 @@ class o2_service {
}
/**
* Run queued processes for services registered on local host
*
* @return boolean
*/
static function run() {
$app = $_SESSION['o2_app'];
$rnt = $app->runtime;
$res = 0;
// ________________________________________________ Update local host services ___
foreach (self::get_host_services($app->host) as $srv => $srv_nice) {
self::update_host_service($srv);
}
// _________________________ Get services for local host and queued executions ___
$host_srvs = self::get_all_host_services();
$processes = o2_run_job::get_list('', true);
$running = array();
$maxs = array();
$now = time();
// ___________________________________________________ Count running instances ___
foreach ($processes as $run_job) {
if ($run_job->is_running()) {
$running[$run_job->job] = $running[$run_job->job] + 1;
if (!isset($maxs[$run_job->job])) {
$job = o2_job::get_by_id($run_job->job);
$maxs[$run_job->job] = $job->max_instances;
}
}
}
// ______________________________________ Loop on pending execution by service ___
foreach ($processes as $run_job) {
if ($run_job->status == 'Q' &&
isset($host_srvs[$run_job->service]) &&
$run_job->exe_time <= $now &&
($maxs[$run_job->job] == 0 ||
$maxs[$run_job->job] > $running[$run_job->job])) {
$res++;
// ________________________ Set run-job status from Queued to Starting ___
$run_job->update_status('S');
// __________________________________ Get developer password if needed ___
$dev = false;
if ($run_job->developer) {
$dev = true;
$devkey = $rnt->developers[$run_job->developer];
}
$cmd = '"'.$rnt->php_engine.'" '.
$app->dir_htdocs.$app->nome.'.php jxrnt="'.$rnt->rnt_script.
'" user='.$run_job->user.
' password='.$app->get_user_password($run_job->user).
($dev ? ' dev='.$run_job->developer. ' key='.$devkey : '').' '.
o2_run_job::$run_word.'='.$run_job->id;
$rnt->batch_exec($cmd);
$running[$run_job->job] = $running[$run_job->job] + 1;
if (!isset($maxs[$run_job->job])) {
$job = o2_job::get_by_id($run_job->job);
$maxs[$run_job->job] = $job->max_instances;
}
}
}
if ($res) {
$tab = $app->get_table(o2_run_job::$run_table);
o2dbs_commit($tab->db->server->nome);
}
// __________________________________________________________ Update PIDs list ___
self::update_host_pids();
return $res;
}
}
@@ -6404,7 +6424,7 @@ class o2_scheduler {
}
print 'Tasks executed: '.$ret."\n";
// _____________________________ Check pending processes (queued run-jobs) ___
$ps = o2_service::run();
$ps = o2_run_job::run_queued();
print 'Queued jobs run: '.$ps."\n";
// ____________________ Check killed processes (run-jobs marked as killed) ___
$ks = o2_run_job::kill_by_host();
@@ -1510,22 +1510,35 @@ function o2job_enable($job_name, $on = true) {
* logic (scheduler) will take care of executing it later.
* A list of parameters (by name) can be provided: list will override parameters passed to
* program called by job.
* If a host name is provided then execution is assigned directly to that host, regardless
* of services logics.
* NOTE: If no host name is provided execution can be assigned to local host by
* application settings: see "jobs_use_local_host" parameter in application
* settings.
* If job exists then new job execution ID is returned, else FALSE.
*
* @package jobs
* @param string $job_name Job name to look for
* @param array $params List of overriding parameters passed to program
* @param string $host Host name to override assigned host/service
* @return integer
* @see o2job_create()
* @see o2job_run()
* @see o2job_get_by_name()
* @see o2job_run()
* @see o2job_is_running()
*/
function o2job_queue($job_name, $params = array()) {
function o2job_queue($job_name, $params = array(), $host = false) {
if ($job = o2_job::get_by_name($job_name)) {
$aspid = $_SESSION['o2_app']->vars['_area']->valore;
$run_job = o2_run_job::queue($job, $params, $aspid);
$run_job = o2_run_job::queue($job,
$params,
$aspid,
false,
false,
false,
0,
$host);
return $run_job->id;
}
else {
@@ -946,7 +946,8 @@ $list = array(
"services" => array("rntval" => null,
"appval" => null,
"desc" => " *** Services ***\n\n".
"List of �-separated names of services enabled for application deferred executions.",
"List of names of services enabled for application jobs executions.\n".
"Names can be separated with \",\", \";\" or with a white space.",
"default" => "_internal"),
"service_redirect" => array("rntval" => null,
"appval" => null,
@@ -955,7 +956,16 @@ $list = array(
"When this parameter is set all jobs executions (run and queue) will be assigned ".
"to this service, regardless all other logics.\n".
"To use this parameter set it to the name of a registered service.",
"default" => ""),
"default" => ""),
"jobs_use_local_host" => array("rntval" => null,
"appval" => null,
"desc" => " *** Use local host for jobs executions ***\n\n".
"When set to ON jobs queued from this machine use local host for their execution.\n".
"This parameter overrides services logics: local host is used regardless of the service ".
"assigned to the job and of the application parameter \"service_redirect\".\n".
"Parameter \"host\" in function o2job_queue(), when passed, is still used, however.\n",
"default" => "off",
"values" => array("off", "on")),
"rpc_server" => array("rntval" => null,
"appval" => null,
"desc" => " *** Remote Procedure Call (RPC) Server ***\n\n".
@@ -1310,6 +1320,7 @@ function o2sys_iniconf_exp_34() {
"reqprg",
"services",
"service_redirect",
"jobs_use_local_host",
"rpc_server",
"data_chunk",
"tables_create",
@@ -1,5 +1,5 @@
<?php
//2.4
//2.5
//o2def::module("");
//o2def::folder("");
/*

0 comments on commit 4c21553

Please sign in to comment.
You can’t perform that action at this time.