Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions reframe/core/schedulers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ def guess_num_tasks(self):

return self.sched_flex_alloc_tasks

available_nodes = self.get_partition_nodes()
getlogger().debug('flex_alloc_tasks: total available nodes in current '
'virtual partition: %s' % len(available_nodes))
available_nodes = self.get_all_nodes()
getlogger().debug('flex_alloc_tasks: total available nodes %s ' %
len(available_nodes))

# Try to guess the number of tasks now
available_nodes = self.filter_nodes(available_nodes, self.options)
available_nodes = self.filter_nodes(available_nodes,
self.sched_access + self.options)

if self.sched_flex_alloc_tasks == 'idle':
available_nodes = {n for n in available_nodes
Expand All @@ -284,8 +285,8 @@ def guess_num_tasks(self):
return num_tasks

@abc.abstractmethod
def get_partition_nodes(self):
# Get all nodes of the current virtual partition
def get_all_nodes(self):
# Gets all the available nodes
pass

@abc.abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def submit(self):
def emit_preamble(self):
return []

def get_partition_nodes(self):
def get_all_nodes(self):
raise NotImplementedError(
'local scheduler does not support listing of available nodes')

Expand Down
2 changes: 1 addition & 1 deletion reframe/core/schedulers/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def emit_preamble(self):
preamble.append('cd %s' % self.workdir)
return preamble

def get_partition_nodes(self):
def get_all_nodes(self):
raise NotImplementedError('pbs backend does not support node listing')

def filter_nodes(self, nodes, options):
Expand Down
17 changes: 8 additions & 9 deletions reframe/core/schedulers/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def submit(self):

self._jobid = int(jobid_match.group('jobid'))

def _get_all_nodes(self):
def get_all_nodes(self):
try:
completed = _run_strict('scontrol -a show -o nodes')
except SpawnedProcessError as e:
Expand All @@ -170,10 +170,6 @@ def _get_default_partition(self):

return None

def get_partition_nodes(self):
nodes = self._get_all_nodes()
return self.filter_nodes(nodes, self.sched_access)

def filter_nodes(self, nodes, options):
option_parser = ArgumentParser()
option_parser.add_argument('--reservation')
Expand All @@ -199,6 +195,8 @@ def filter_nodes(self, nodes, options):
else:
default_partition = self._get_default_partition()
partitions = {default_partition} if default_partition else set()
getlogger().debug('flex_alloc_tasks: default partition: %s' %
default_partition)

nodes = {n for n in nodes if n.partitions >= partitions}
getlogger().debug(
Expand All @@ -217,7 +215,7 @@ def filter_nodes(self, nodes, options):
nodes &= self._get_nodes_by_name(nodelist)
getlogger().debug(
'flex_alloc_tasks: filtering nodes by nodelist: %s '
'availablenodes now: %s' % (nodelist, len(nodes)))
'available nodes now: %s' % (nodelist, len(nodes)))

if exclude_nodes:
exclude_nodes = exclude_nodes.strip()
Expand Down Expand Up @@ -431,10 +429,11 @@ def __init__(self, node_descr):
raise JobError('could not extract NodeName from node description')

self._partitions = self._extract_attribute(
'Partitions', node_descr, sep=',')
'Partitions', node_descr, sep=',') or set()
self._active_features = self._extract_attribute(
'ActiveFeatures', node_descr, sep=',')
self._states = self._extract_attribute('State', node_descr, sep='+')
'ActiveFeatures', node_descr, sep=',') or set()
self._states = self._extract_attribute(
'State', node_descr, sep='+') or set()

def __eq__(self, other):
if not isinstance(other, type(self)):
Expand Down
2 changes: 1 addition & 1 deletion unittests/test_launchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def cancel(self):
def finished(self):
pass

def get_partition_nodes(self):
def get_all_nodes(self):
pass

def filter_nodes(self, nodes):
Expand Down
37 changes: 32 additions & 5 deletions unittests/test_schedulers.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,9 @@ def test_cancel(self):
def test_guess_num_tasks(self):
self.testjob._num_tasks = 0
self.testjob._sched_flex_alloc_tasks = 'all'
# monkey patch `get_partition_nodes()` to simulate extraction of
# monkey patch `get_all_nodes()` to simulate extraction of
# slurm nodes through the use of `scontrol show`
self.testjob.get_partition_nodes = lambda: set()
self.testjob.get_all_nodes = lambda: set()
# monkey patch `_get_default_partition()` to simulate extraction
# of the default partition through the use of `scontrol show`
self.testjob._get_default_partition = lambda: 'pdef'
Expand Down Expand Up @@ -566,6 +566,23 @@ def create_dummy_nodes(obj):
'AllocTRES= CapWatts=n/a CurrentWatts=100 '
'LowestJoules=100000000 ConsumedJoules=0 '
'ExtSensorsJoules=n/s ExtSensorsWatts=0 '
'ExtSensorsTemp=n/s Reason=Foo/ ',

'NodeName=nid00005 Arch=x86_64 CoresPerSocket=12 '
'CPUAlloc=0 CPUErr=0 CPUTot=24 CPULoad=0.00 '
'AvailableFeatures=f5 ActiveFeatures=f5 '
'Gres=gpu_mem:16280,gpu:1 NodeAddr=nid00003'
'NodeHostName=nid00003 Version=10.00 OS=Linux '
'RealMemory=32220 AllocMem=0 FreeMem=10000 '
'Sockets=1 Boards=1 State=ALLOCATED '
'ThreadsPerCore=2 TmpDisk=0 Weight=1 Owner=N/A '
'MCS_label=N/A Partitions=p1,p3 '
'BootTime=01 Jan 2018 '
'SlurmdStartTime=01 Jan 2018 '
'CfgTRES=cpu=24,mem=32220M '
'AllocTRES= CapWatts=n/a CurrentWatts=100 '
'LowestJoules=100000000 ConsumedJoules=0 '
'ExtSensorsJoules=n/s ExtSensorsWatts=0 '
'ExtSensorsTemp=n/s Reason=Foo/ '
'failed [reframe_user@01 Jan 2018]']

Expand All @@ -589,9 +606,9 @@ def setUp(self):
stdout=os.path.join(self.workdir, 'testjob.out'),
stderr=os.path.join(self.workdir, 'testjob.err')
)
# monkey patch `_get_all_nodes` to simulate extraction of
# monkey patch `get_all_nodes` to simulate extraction of
# slurm nodes through the use of `scontrol show`
self.testjob._get_all_nodes = self.create_dummy_nodes
self.testjob.get_all_nodes = self.create_dummy_nodes
# monkey patch `_get_default_partition` to simulate extraction
# of the default partition
self.testjob._get_default_partition = lambda: 'pdef'
Expand Down Expand Up @@ -632,6 +649,16 @@ def test_sched_access_constraint_partition(self):
self.prepare_job()
self.assertEqual(self.testjob.num_tasks, 4)

def test_sched_access_partition(self):
self.testjob._sched_access = ['--partition=p1']
self.prepare_job()
self.assertEqual(self.testjob.num_tasks, 16)

def test_default_partition_all(self):
self.testjob._sched_flex_alloc_tasks = 'all'
self.prepare_job()
self.assertEqual(self.testjob.num_tasks, 16)

def test_constraint_idle(self):
self.testjob._sched_flex_alloc_tasks = 'idle'
self.testjob.options = ['--constraint=f1']
Expand Down Expand Up @@ -876,7 +903,7 @@ def test_attributes(self):
self.assertEqual(self.allocated_node.partitions, {'p1', 'p2'})
self.assertEqual(self.allocated_node.active_features, {'f1', 'f2'})
self.assertEqual(self.no_partition_node.name, 'nid00004')
self.assertEqual(self.no_partition_node.partitions, None)
self.assertEqual(self.no_partition_node.partitions, set())
self.assertEqual(self.no_partition_node.active_features, {'f1', 'f2'})

def test_str(self):
Expand Down