From c8eb7878e51cc9626f0841d3c5f3c25e494ab2d6 Mon Sep 17 00:00:00 2001 From: Javier Otero Date: Tue, 2 Mar 2021 17:00:18 +0100 Subject: [PATCH 1/6] Extend affinity checks --- cscs-checks/prgenv/affinity_check.py | 37 ++++++++++++++++++++++++++++ reframe/core/pipeline.py | 1 + 2 files changed, 38 insertions(+) diff --git a/cscs-checks/prgenv/affinity_check.py b/cscs-checks/prgenv/affinity_check.py index ccd66a4db9..a14329240a 100644 --- a/cscs-checks/prgenv/affinity_check.py +++ b/cscs-checks/prgenv/affinity_check.py @@ -433,6 +433,43 @@ def num_omp_threads(self): return int(self.num_cpus/self.num_sockets) +@rfm.simple_test +class OneTaskPerSocket(AffinityTestBase): + '''One task per socket. No OMP threads.''' + + num_tasks_per_socket = 1 + num_cpus_per_task = 1 + use_multithreading = False + + def __init__(self): + super().__init__() + self.descr = 'One task per socket - wo. OMP.' + + @rfm.run_before('run') + def set_num_tasks(self): + self.num_tasks = self.num_sockets + + @rfm.run_before('sanity') + def consume_cpu_set(self): + + if len(self.aff_cpus) != self.num_sockets: + print(self.aff_cpus) + raise SanityError('num_tasks does not match num_sockets') + + for affinity_set in self.aff_cpus: + # Get CPU siblings by socket + cpu_siblings = self.get_sibling_cpus( + affinity_set[0], by='socket' + ) + + # All threads in the affinity set must belong to the same socket. + if not all(x in cpu_siblings for x in affinity_set): + raise SanityError('incorrect affinity set') + + # Remove the sockets the cpu set. + self.cpu_set -= cpu_siblings + + @rfm.simple_test class ConsecutiveNumaFilling(AffinityTestBase): '''Fill the NUMA nodes with the tasks in consecutive order. diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 41d052dc5c..71d89f6190 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1303,6 +1303,7 @@ def run(self): self.job.num_tasks = self.num_tasks self.job.num_tasks_per_node = self.num_tasks_per_node self.job.num_tasks_per_core = self.num_tasks_per_core + self.job.num_tasks_per_socket = self.num_tasks_per_socket self.job.num_cpus_per_task = self.num_cpus_per_task self.job.use_smt = self.use_multithreading self.job.time_limit = self.time_limit From bda8eb6e063f8e076b917ff09378ce5ee9d40117 Mon Sep 17 00:00:00 2001 From: Javier Otero Date: Tue, 2 Mar 2021 17:00:18 +0100 Subject: [PATCH 2/6] Extend affinity checks --- cscs-checks/prgenv/affinity_check.py | 201 +++++++++++++++------------ reframe/core/pipeline.py | 1 + 2 files changed, 115 insertions(+), 87 deletions(-) diff --git a/cscs-checks/prgenv/affinity_check.py b/cscs-checks/prgenv/affinity_check.py index ccd66a4db9..78faf84b74 100644 --- a/cscs-checks/prgenv/affinity_check.py +++ b/cscs-checks/prgenv/affinity_check.py @@ -41,35 +41,42 @@ class to figure out the processor's topology. The content of this reference # } system = variable(dict, value={}) + valid_systems = ['daint:gpu', 'daint:mc', + 'dom:gpu', 'dom:mc', 'eiger:mc', + 'ault:amdv100' + ] + valid_prog_environs = [ + 'PrgEnv-gnu', 'PrgEnv-cray', 'PrgEnv-intel', 'PrgEnv-pgi' + ] + build_system = 'Make' + + # The github URL can not be specifid as `self.sourcedir` as that + # would prevent the src folder from being copied to stage which is + # necessary since these tests need files from it. + sourcesdir = os.path.join('src/affinity_ref') + prebuild_cmds = ['git clone https://github.com/vkarak/affinity'] + + # Dict with the partition's topology - output of "lscpu -e" + topology = variable(dict, value={ + 'dom:gpu': 'topo_dom_gpu.json', + 'dom:mc': 'topo_dom_mc.json', + 'daint:gpu': 'topo_dom_gpu.json', + 'daint:mc': 'topo_dom_mc.json', + 'eiger:mc': 'topo_eiger_mc.json', + 'ault:amdv100': 'topo_ault_amdv100.json', + }) + + maintainers = ['RS', 'SK'] + tags = {'production', 'scs', 'maintenance', 'craype'} + def __init__(self): - self.valid_systems = ['daint:gpu', 'daint:mc', - 'dom:gpu', 'dom:mc', 'eiger:mc', - 'ault:amdv100'] - self.valid_prog_environs = [ - 'PrgEnv-gnu', 'PrgEnv-cray', 'PrgEnv-intel', 'PrgEnv-pgi' - ] - self.build_system = 'Make' + # FIXME: These two right now cannot be set in the class body. + self.executable = './affinity/affinity' self.build_system.options = ['-C affinity', 'MPI=1'] - # The github URL can not be specifid as `self.sourcedir` as that - # would prevent the src folder from being copied to stage which is - # necessary since these tests need files from it. - self.sourcesdir = os.path.join('src/affinity_ref') - self.prebuild_cmds = ['git clone https://github.com/vkarak/affinity'] - self.executable = './affinity/affinity' + @rfm.run_before('sanity') + def set_sanity(self): self.sanity_patterns = self.assert_consumed_cpu_set() - self.maintainers = ['RS', 'SK'] - self.tags = {'production', 'scs', 'maintenance', 'craype'} - - # Dict with the partition's topology - output of "lscpu -e" - self.topology = { - 'dom:gpu': 'topo_dom_gpu.json', - 'dom:mc': 'topo_dom_mc.json', - 'daint:gpu': 'topo_dom_gpu.json', - 'daint:mc': 'topo_dom_mc.json', - 'eiger:mc': 'topo_eiger_mc.json', - 'ault:amdv100': 'topo_ault_amdv100.json', - } @rfm.run_before('compile') def set_topo_file(self): @@ -219,10 +226,7 @@ class AffinityOpenMPBase(AffinityTestBase): omp_bind = variable(str) omp_proc_bind = variable(str, value='spread') - - def __init__(self): - super().__init__() - self.num_tasks = 1 + num_tasks = 1 @property def ncpus_per_task(self): @@ -251,10 +255,7 @@ class OneThreadPerLogicalCoreOpenMP(AffinityOpenMPBase): '''Pin each OMP thread to a different logical core.''' omp_bind = 'threads' - - def __init__(self): - super().__init__() - self.descr = 'Pin one OMP thread per CPU.' + descr = 'Pin one OMP thread per CPU.' @property def num_omp_threads(self): @@ -280,10 +281,7 @@ class OneThreadPerPhysicalCoreOpenMP(AffinityOpenMPBase): '''Pin each OMP thread to a different physical core.''' omp_bind = 'cores' - - def __init__(self): - super().__init__() - self.descr = 'Pin one OMP thread per core.' + descr = 'Pin one OMP thread per core.' @property def num_omp_threads(self): @@ -311,10 +309,8 @@ def consume_cpu_set(self): class OneThreadPerPhysicalCoreOpenMPnomt(OneThreadPerPhysicalCoreOpenMP): '''Only one cpu per core booked without multithread.''' - def __init__(self): - super().__init__() - self.descr = 'Pin one OMP thread per core wo. multithreading.' - self.use_multithreading = False + use_multithreading = False + descr = 'Pin one OMP thread per core wo. multithreading.' @property def ncpus_per_task(self): @@ -332,10 +328,7 @@ class OneThreadPerSocketOpenMP(AffinityOpenMPBase): '''Pin each OMP thread to a different socket.''' omp_bind = 'sockets' - - def __init__(self): - super().__init__() - self.descr = 'Pin one OMP thread per socket.' + descr = 'Pin one OMP thread per socket.' @property def num_omp_threads(self): @@ -366,11 +359,8 @@ class OneTaskPerSocketOpenMPnomt(AffinityOpenMPBase): omp_bind = 'sockets' omp_proc_bind = 'close' - - def __init__(self): - super().__init__() - self.descr = 'One task per socket - wo. multithreading.' - self.use_multithreading = False + descr = 'One task per socket - wo. multithreading.' + use_multithreading = False @property def num_omp_threads(self): @@ -423,10 +413,8 @@ class OneTaskPerSocketOpenMP(OneTaskPerSocketOpenMPnomt): and the number of OMP threads. ''' - def __init__(self): - super().__init__() - self.descr = 'One task per socket - w. multithreading.' - self.use_multithreading = True + descr = 'One task per socket - w. multithreading.' + use_multithreading = True @property def num_omp_threads(self): @@ -434,18 +422,15 @@ def num_omp_threads(self): @rfm.simple_test -class ConsecutiveNumaFilling(AffinityTestBase): - '''Fill the NUMA nodes with the tasks in consecutive order. +class ConsecutiveSocketFilling(AffinityTestBase): + '''Fill the sockets with the tasks in consecutive order. This test uses as many tasks as physical cores available in a node. Multithreading is disabled. ''' cpu_bind = 'rank' - - def __init__(self): - super().__init__() - self.use_multithreading = False + use_multithreading = False @rfm.run_before('run') def set_tasks(self): @@ -456,10 +441,10 @@ def set_tasks(self): def consume_cpu_set(self): '''Check that all physical cores have been used in the right order.''' task_count = 0 - for numa_number in range(self.num_numa_nodes): - # Keep track of the CPUs present in this NUMA node + for socket_number in range(self.num_sockets): + # Keep track of the CPUs present in this socket cpus_present = set() - for task_number in range(int(self.num_tasks/self.num_numa_nodes)): + for task_number in range(int(self.num_tasks/self.num_sockets)): # Get the list of CPUs with affinity affinity_set = self.aff_cpus[task_count] @@ -477,14 +462,14 @@ def consume_cpu_set(self): task_count += 1 - # Ensure all CPUs belong to the same NUMA node - cpuset_by_numa = self.get_sibling_cpus( - next(iter(cpus_present)), by='node' + # Ensure all CPUs belong to the same socket + cpuset_by_socket = self.get_sibling_cpus( + next(iter(cpus_present)), by='socket' ) - if (not all(cpu in cpuset_by_numa for cpu in cpus_present) and - len(cpuset_by_numa)==len(cpus_present)): + if (not all(cpu in cpuset_by_socket for cpu in cpus_present) and + len(cpuset_by_socket)==len(cpus_present)): raise SanityError( - f'numa node {numa_number} not filled in order' + f'socket {socket_number} not filled in order' ) else: @@ -493,54 +478,96 @@ def consume_cpu_set(self): @rfm.simple_test -class AlternateNumaFilling(AffinityTestBase): - '''Numa nodes are filled in a round-robin fashion. +class AlternateSocketFilling(AffinityTestBase): + '''Sockets are filled in a round-robin fashion. This test uses as many tasks as physical cores available in a node. Multithreading is disabled. ''' - def __init__(self): - super().__init__() - self.use_multithreading = False + use_multithreading = False @rfm.run_before('run') def set_tasks(self): self.num_tasks = int(self.num_cpus/self.num_cpus_per_core) self.num_cpus_per_task = 1 - self.num_tasks_per_numa = int(self.num_tasks/self.num_numa_nodes) + self.num_tasks_per_socket = int(self.num_tasks/self.num_sockets) @rfm.run_before('sanity') def consume_cpu_set(self): - '''Check that consecutive tasks are round-robin pinned to numa nodes.''' + '''Check that consecutive tasks are round-robin pinned to sockets.''' - # Get a set per numa node to keep track of the CPUs - numa_nodes = [set() for s in range(self.num_numa_nodes)] + # Get a set per socket to keep track of the CPUs + sockets = [set() for s in range(self.num_sockets)] task_count = 0 - for task in range(self.num_tasks_per_numa): - for s in range(self.num_numa_nodes): + for task in range(self.num_tasks_per_socket): + for s in range(self.num_sockets): # Get the list of CPUs with affinity affinity_set = self.aff_cpus[task_count] # Only 1 CPU per affinity set is allowed if ((len(affinity_set) > 1) or - (any(cpu in numa_nodes[s] for cpu in affinity_set)) or - (any(cpu not in self.numa_nodes[s] for cpu in affinity_set))): + (any(cpu in sockets[s] for cpu in affinity_set)) or + (any(cpu not in self.sockets[s] for cpu in affinity_set))): raise SanityError( f'incorrect affinity set for task {task_count}' ) else: - numa_nodes[s].update( + sockets[s].update( self.get_sibling_cpus(affinity_set[0], by='core') ) task_count += 1 - # Check that all numa nodes have the same CPU count - if not all(len(s) == (task+1)*2 for s in numa_nodes): + # Check that all sockets have the same CPU count + if not all(len(s) == (task+1)*2 for s in sockets): self.cpu_set.add(-1) - # Decrement the NUMA nodes from the CPU set - for s in numa_nodes: + # Decrement the socket set from the CPU set + for s in sockets: self.cpu_set -= s + + +@rfm.simple_test +class OneTaskPerNumaNode(AffinityTestBase): + '''Place a task on each NUMA node. + + The trick here is to "pad" the tasks with --cpus-per-task. + The same could be done to target any cache level instead. + Multithreading is disabled. + ''' + + valid_systems = ['eiger:mc'] + use_multithreading = False + num_cpus_per_task = required + + @rfm.run_before('compile') + def build_settings(self): + self.build_system.options += ['OPENMP=0'] + + @rfm.run_before('run') + def set_tasks(self): + self.num_tasks = self.num_numa_nodes + if self.current_partition.fullname == 'eiger:mc': + self.num_cpus_per_task = 16 + + @rfm.run_before('sanity') + def consume_cpu_set(self): + '''Check that each task lives in a different NUMA node.''' + + if len(self.aff_cpus) != self.num_numa_nodes: + raise SanityError( + 'number of tasks does not match the number of numa nodes' + ) + + for numa_node, aff_set in enumerate(self.aff_cpus): + cpuset_by_numa = self.get_sibling_cpus(aff_set[0], by='node') + if (len(aff_set) != self.num_cpus_per_task or + any(cpu not in cpuset_by_numa for cpu in aff_set)): + raise SanityError( + f'incorrect affinity set for numa node {numa_node}' + ) + else: + # Decrement the current NUMA node from the available CPU set + self.cpu_set -= cpuset_by_numa diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 41d052dc5c..71d89f6190 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1303,6 +1303,7 @@ def run(self): self.job.num_tasks = self.num_tasks self.job.num_tasks_per_node = self.num_tasks_per_node self.job.num_tasks_per_core = self.num_tasks_per_core + self.job.num_tasks_per_socket = self.num_tasks_per_socket self.job.num_cpus_per_task = self.num_cpus_per_task self.job.use_smt = self.use_multithreading self.job.time_limit = self.time_limit From 256f8d6f6198d5e8d270c05f9a4afe6488a338ff Mon Sep 17 00:00:00 2001 From: Javier Otero Date: Thu, 4 Mar 2021 19:01:10 +0100 Subject: [PATCH 3/6] Fix PEP8 complaints --- cscs-checks/prgenv/affinity_check.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cscs-checks/prgenv/affinity_check.py b/cscs-checks/prgenv/affinity_check.py index 78faf84b74..b30975e7ce 100644 --- a/cscs-checks/prgenv/affinity_check.py +++ b/cscs-checks/prgenv/affinity_check.py @@ -41,9 +41,9 @@ class to figure out the processor's topology. The content of this reference # } system = variable(dict, value={}) - valid_systems = ['daint:gpu', 'daint:mc', - 'dom:gpu', 'dom:mc', 'eiger:mc', - 'ault:amdv100' + valid_systems = [ + 'daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc', 'eiger:mc', + 'ault:amdv100' ] valid_prog_environs = [ 'PrgEnv-gnu', 'PrgEnv-cray', 'PrgEnv-intel', 'PrgEnv-pgi' @@ -178,6 +178,7 @@ def parse_output(self): '''Extract the data from the affinity tool.''' re_aff_cpus = r'CPU affinity: \[\s+(?P[\d+\s+]+)\]' + def parse_cpus(x): return sorted([int(xi) for xi in x.split()]) @@ -467,7 +468,7 @@ def consume_cpu_set(self): next(iter(cpus_present)), by='socket' ) if (not all(cpu in cpuset_by_socket for cpu in cpus_present) and - len(cpuset_by_socket)==len(cpus_present)): + len(cpuset_by_socket) == len(cpus_present)): raise SanityError( f'socket {socket_number} not filled in order' ) From 91ee6660e0271a70df30514e41d32afd9f583a21 Mon Sep 17 00:00:00 2001 From: "Javier J. Otero Perez" Date: Fri, 5 Mar 2021 16:55:37 +0100 Subject: [PATCH 4/6] Move pipeline changes to another branch --- reframe/core/pipeline.py | 1 - 1 file changed, 1 deletion(-) diff --git a/reframe/core/pipeline.py b/reframe/core/pipeline.py index 71d89f6190..41d052dc5c 100644 --- a/reframe/core/pipeline.py +++ b/reframe/core/pipeline.py @@ -1303,7 +1303,6 @@ def run(self): self.job.num_tasks = self.num_tasks self.job.num_tasks_per_node = self.num_tasks_per_node self.job.num_tasks_per_core = self.num_tasks_per_core - self.job.num_tasks_per_socket = self.num_tasks_per_socket self.job.num_cpus_per_task = self.num_cpus_per_task self.job.use_smt = self.use_multithreading self.job.time_limit = self.time_limit From 0849e92a5fa0c0f87d290f0bb54cdb1361e30f07 Mon Sep 17 00:00:00 2001 From: Javier Otero Date: Mon, 8 Mar 2021 11:37:14 +0100 Subject: [PATCH 5/6] Update test for pilatus --- cscs-checks/prgenv/affinity_check.py | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/cscs-checks/prgenv/affinity_check.py b/cscs-checks/prgenv/affinity_check.py index b30975e7ce..37f57141b5 100644 --- a/cscs-checks/prgenv/affinity_check.py +++ b/cscs-checks/prgenv/affinity_check.py @@ -42,7 +42,8 @@ class to figure out the processor's topology. The content of this reference system = variable(dict, value={}) valid_systems = [ - 'daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc', 'eiger:mc', + 'daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc', + 'eiger:mc', 'pilatus:mc' 'ault:amdv100' ] valid_prog_environs = [ @@ -63,9 +64,13 @@ class to figure out the processor's topology. The content of this reference 'daint:gpu': 'topo_dom_gpu.json', 'daint:mc': 'topo_dom_mc.json', 'eiger:mc': 'topo_eiger_mc.json', + 'pilatus:mc': 'topo_eiger_mc.json', 'ault:amdv100': 'topo_ault_amdv100.json', }) + # Reference topology file as required variable + topo_file = variable(str) + maintainers = ['RS', 'SK'] tags = {'production', 'scs', 'maintenance', 'craype'} @@ -80,8 +85,13 @@ def set_sanity(self): @rfm.run_before('compile') def set_topo_file(self): + '''Set the topo_file variable. + + If not present in the topology dict, leave it as required. + ''' cp = self.current_partition.fullname - self.topo_file = self.topology[cp] + if cp in self.topology: + self.topo_file = self.topology[cp] # FIXME: Update the hook below once the PR #1773 is merged. @rfm.run_after('compile') @@ -539,7 +549,7 @@ class OneTaskPerNumaNode(AffinityTestBase): Multithreading is disabled. ''' - valid_systems = ['eiger:mc'] + valid_systems = ['eiger:mc', 'pilatus:mc'] use_multithreading = False num_cpus_per_task = required From 9f82dbb80cca9a56f8cfa70ac2d58d2bbeca992e Mon Sep 17 00:00:00 2001 From: Javier Otero Date: Mon, 8 Mar 2021 12:01:50 +0100 Subject: [PATCH 6/6] Add pilatus to the affinity tests --- cscs-checks/prgenv/affinity_check.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cscs-checks/prgenv/affinity_check.py b/cscs-checks/prgenv/affinity_check.py index 37f57141b5..bab96aeb47 100644 --- a/cscs-checks/prgenv/affinity_check.py +++ b/cscs-checks/prgenv/affinity_check.py @@ -43,7 +43,7 @@ class to figure out the processor's topology. The content of this reference valid_systems = [ 'daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc', - 'eiger:mc', 'pilatus:mc' + 'eiger:mc', 'pilatus:mc', 'ault:amdv100' ] valid_prog_environs = [ @@ -560,7 +560,7 @@ def build_settings(self): @rfm.run_before('run') def set_tasks(self): self.num_tasks = self.num_numa_nodes - if self.current_partition.fullname == 'eiger:mc': + if self.current_partition.fullname in {'eiger:mc', 'pilatus:mc'}: self.num_cpus_per_task = 16 @rfm.run_before('sanity')