Skip to content

Commit

Permalink
Merge pull request #11 from pypr/misc-fixes
Browse files Browse the repository at this point in the history
Fix the total_cores implementation.
  • Loading branch information
prabhuramachandran committed Oct 5, 2018
2 parents 4df4ef3 + f01a340 commit d3bc2dc
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 5 deletions.
10 changes: 9 additions & 1 deletion automan/automation.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,10 @@ def __init__(self, command, output_dir, job_info=None):
)
self._job = None

def __str__(self):
return ('%s with output directory: %s ' %
(self.__class__.__name__, os.path.basename(self.output_dir)))

# #### Public protocol ###########################################

def complete(self):
Expand Down Expand Up @@ -405,7 +409,8 @@ def get_name(self):
"""Return the name of this problem, this name is used as a
directory for the simulation and the outputs.
"""
raise NotImplementedError()
# Return a sane default instead of forcing the user to do this.
return self.__class__.__name__

def get_commands(self):
"""Return a sequence of (name, command_string, job_info_dict).
Expand Down Expand Up @@ -699,6 +704,9 @@ def __init__(self, problem, match=''):
if len(match) == 0 or fnmatch(name, match)
]

def __str__(self):
return 'Problem named %s' % self.problem.get_name()

def output(self):
return self.problem.get_outputs()

Expand Down
23 changes: 21 additions & 2 deletions automan/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ def free_cores():
return round(ncore, 0)


def total_cores():
return psutil.cpu_count(logical=False)


############################################
# This class is meant to be used by execnet alone.
class _RemoteManager(object): # pragma: no cover
Expand Down Expand Up @@ -213,6 +217,8 @@ def serve(channel): # pragma: no cover
msg, data = channel.receive()
if msg == 'free_cores':
channel.send(free_cores())
elif msg == 'total_cores':
channel.send(total_cores())
else:
channel.send(getattr(manager, msg)(*data))
############################################
Expand All @@ -222,7 +228,7 @@ class Worker(object):
def __init__(self):
self.jobs = dict()
self.running_jobs = set()
self._total_cores = psutil.cpu_count(logical=False)
self._total_cores = None

def _check_running_jobs(self):
for i in self.running_jobs.copy():
Expand All @@ -231,6 +237,11 @@ def _check_running_jobs(self):
def free_cores(self):
return free_cores()

def total_cores(self):
if self._total_cores is None:
self._total_cores = total_cores()
return self._total_cores

def can_run(self, n_core):
"""Returns True if the worker can run a job with the required cores.
"""
Expand All @@ -244,7 +255,7 @@ def can_run(self, n_core):
n_cores_used = sum(
[jobs[i].n_core for i in self.running_jobs]
)
if (self._total_cores - n_cores_used) >= n_core:
if (self.total_cores() - n_cores_used) >= n_core:
result = True
return result

Expand Down Expand Up @@ -281,6 +292,9 @@ def __init__(self, worker, job_id, job):
def free_cores(self):
return self.worker.free_cores()

def total_cores(self):
return self.worker.total_cores()

def run(self):
print("JobProxy cannot be run")

Expand Down Expand Up @@ -380,6 +394,11 @@ def _call_remote(self, method, *data):
def free_cores(self):
return self._call_remote('free_cores', None)

def total_cores(self):
if self._total_cores is None:
self._total_cores = self._call_remote('total_cores', None)
return self._total_cores

def run(self, job):
print("Running %s" % job.pretty_command())
job_id = self._call_remote('run', job.to_dict())
Expand Down
11 changes: 9 additions & 2 deletions automan/tests/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ def test_free_cores(self):
self.assertTrue(n >= 0)
self.assertTrue(n <= multiprocessing.cpu_count())

def test_total_cores(self):
n = jobs.total_cores()
self.assertTrue(n >= 0)
self.assertTrue(n <= multiprocessing.cpu_count())

def test_status_when_job_is_incorrect(self):
j = jobs.Job(
[sys.executable, '--junk'],
Expand Down Expand Up @@ -433,10 +438,12 @@ def test_scheduler_only_creates_required_workers(self, mock_remote_worker):
self.assertEqual(proxy.worker.host, 'host1')
self.assertEqual(proxy1.worker.host, 'host2')

@mock.patch('automan.jobs.total_cores', return_value=2.0)
@mock.patch('automan.jobs.free_cores', return_value=2.0)
def test_scheduler_should_not_overload_worker(self, mock_free_cores):
def test_scheduler_should_not_overload_worker(self, m_total_cores,
m_free_cores):
# Given
n_core = jobs.free_cores()
n_core = jobs.total_cores()
config = [dict(host='localhost')]
s = jobs.Scheduler(worker_config=config)

Expand Down

0 comments on commit d3bc2dc

Please sign in to comment.