Skip to content

Commit

Permalink
Fix issue #628.
Browse files Browse the repository at this point in the history
New method `ShellcmdLrms.count_running_tasks()` to count
currently-running tasks on the resource and
`ShellcmdLrms.has_running_tasks()` to test if a resource is busy or
can be let go.
  • Loading branch information
Sergio Maffioletti authored and riccardomurri committed Aug 14, 2017
1 parent 02e3afe commit 0c4ba9b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 8 deletions.
4 changes: 2 additions & 2 deletions gc3libs/backends/ec2.py
Expand Up @@ -965,7 +965,7 @@ def free(self, app):
# if no more applications are currently running, turn the instance off
# check with the associated resource
resource.get_resource_status()
if len(resource.job_infos) == 0:
if not resource.has_running_tasks():
# turn VM off
vm = self._get_vm(app.execution._lrms_vm_id)
gc3libs.log.info("VM instance %s at %s is no longer needed."
Expand All @@ -987,7 +987,7 @@ def close(self):
# Update status of VMs and remote resources
self.get_resource_status()
for vm_id, resource in self.subresources.items():
if resource.updated and not resource.job_infos:
if resource.updated and resource.has_running_tasks():
vm = self._get_vm(vm_id)
gc3libs.log.warning(
"VM instance %s at %s is no longer needed. "
Expand Down
4 changes: 2 additions & 2 deletions gc3libs/backends/openstack.py
Expand Up @@ -1079,7 +1079,7 @@ def free(self, app):
# if no more applications are currently running, turn the instance off
# check with the associated resource
subresource.get_resource_status()
if len(subresource.job_infos) == 0:
if not subresource.has_running_tasks():
# turn VM off
vm = self._get_vm(app.execution._lrms_vm_id)

Expand All @@ -1101,7 +1101,7 @@ def close(self):
# Update status of VMs and remote resources
self.get_resource_status()
for vm_id, subresource in self.subresources.items():
if subresource.updated and not subresource.job_infos:
if subresource.updated and subresource.has_running_tasks():
vm = self._get_vm(vm_id)
gc3libs.log.warning(
"VM instance %s at %s is no longer needed. "
Expand Down
32 changes: 30 additions & 2 deletions gc3libs/backends/shellcmd.py
Expand Up @@ -844,6 +844,24 @@ def _locate_gnu_time(self):
# the same resources.
#

def count_running_tasks(self):
"""
Returns number of currently running tasks.
.. note::
1. The count of running tasks includes also tasks that may
have been started by another GC3Pie process so this count
can be positive when the resource has just been opened.
2. The count is updated every time the resource is updated,
so the returned number can be stale if the
`ShellcmdLrms.get_resource_status()` has not been called
for a while.
"""
return sum(1 for info in self._job_infos.values() if not info['terminated'])


def _compute_used_cores(self, job_infos):
"""
Accepts a dictionary of job informations and returns the
Expand Down Expand Up @@ -1134,8 +1152,7 @@ def _update_resource_usage_info(self):
used_memory = self._compute_used_memory(self._job_infos)
self.available_memory = self.total_memory - used_memory
self.free_slots = self.max_cores - self._compute_used_cores(self._job_infos)
self.user_run = sum(1 for info in self._job_infos.values()
if not info['terminated'])
self.user_run = self.count_running_tasks()
log.debug("Recovered resource information from files in %s:"
" total nr. of cores: %s, requested by jobs: %s;"
" available memory: %s, requested by jobs: %s.",
Expand Down Expand Up @@ -1206,6 +1223,17 @@ def _get_remote_and_local_path_pair(self, app, remote_relpath,
return [(remote_path, local_path)]


def has_running_tasks():
"""
Return ``True`` if tasks are running on the resource.
See `ShellcmdLrms.count_running_tasks`:meth: for caveats
about the count of "running jobs" upon which this boolean
check is based.
"""
return self.user_run > 0


@same_docstring_as(LRMS.peek)
def peek(self, app, remote_filename, local_file, offset=0, size=None):
# `remote_filename` must be relative to the execution directory
Expand Down
4 changes: 2 additions & 2 deletions gc3utils/commands.py
Expand Up @@ -1800,7 +1800,7 @@ def _print_vms(vms, res, header=True):
ips = []
if vm.id in res.subresources:
if res.subresources[vm.id].updated:
remote_jobs = str(len(res.subresources[vm.id].job_infos))
remote_jobs = str(res.subresources[vm.id].running_jobs())
ncores = str(res.subresources[vm.id].max_cores)

if res.type.startswith('ec2'):
Expand Down Expand Up @@ -1892,7 +1892,7 @@ def cleanup_vms(self):
vms = res._vmpool.get_all_vms()
if vms:
for vm in vms:
remote_jobs = len(res.subresources[vm.id].job_infos)
remote_jobs = res.subresources[vm.id].running_jobs()
if remote_jobs == 0:
if self.params.dry_run:
print("No job running on VM `%s` of resource `%s`;"
Expand Down

0 comments on commit 0c4ba9b

Please sign in to comment.