Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 126 additions & 88 deletions cscs-checks/prgenv/affinity_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,40 +41,57 @@ class to figure out the processor's topology. The content of this reference
# }
system = variable(dict, value={})

valid_systems = [
'daint:gpu', 'daint:mc', 'dom:gpu', 'dom:mc',
'eiger:mc', 'pilatus:mc',
'ault:amdv100'
]
valid_prog_environs = [
'PrgEnv-gnu', 'PrgEnv-cray', 'PrgEnv-intel', 'PrgEnv-pgi'
]
build_system = 'Make'

# The github URL can not be specifid as `self.sourcedir` as that
# would prevent the src folder from being copied to stage which is
# necessary since these tests need files from it.
sourcesdir = os.path.join('src/affinity_ref')
prebuild_cmds = ['git clone https://github.com/vkarak/affinity']

# Dict with the partition's topology - output of "lscpu -e"
topology = variable(dict, value={
'dom:gpu': 'topo_dom_gpu.json',
'dom:mc': 'topo_dom_mc.json',
'daint:gpu': 'topo_dom_gpu.json',
'daint:mc': 'topo_dom_mc.json',
'eiger:mc': 'topo_eiger_mc.json',
'pilatus:mc': 'topo_eiger_mc.json',
'ault:amdv100': 'topo_ault_amdv100.json',
})

# Reference topology file as required variable
topo_file = variable(str)

maintainers = ['RS', 'SK']
tags = {'production', 'scs', 'maintenance', 'craype'}

def __init__(self):
self.valid_systems = ['daint:gpu', 'daint:mc',
'dom:gpu', 'dom:mc', 'eiger:mc',
'ault:amdv100']
self.valid_prog_environs = [
'PrgEnv-gnu', 'PrgEnv-cray', 'PrgEnv-intel', 'PrgEnv-pgi'
]
self.build_system = 'Make'
# FIXME: These two right now cannot be set in the class body.
self.executable = './affinity/affinity'
self.build_system.options = ['-C affinity', 'MPI=1']

# The github URL can not be specifid as `self.sourcedir` as that
# would prevent the src folder from being copied to stage which is
# necessary since these tests need files from it.
self.sourcesdir = os.path.join('src/affinity_ref')
self.prebuild_cmds = ['git clone https://github.com/vkarak/affinity']
self.executable = './affinity/affinity'
@rfm.run_before('sanity')
def set_sanity(self):
self.sanity_patterns = self.assert_consumed_cpu_set()
self.maintainers = ['RS', 'SK']
self.tags = {'production', 'scs', 'maintenance', 'craype'}

# Dict with the partition's topology - output of "lscpu -e"
self.topology = {
'dom:gpu': 'topo_dom_gpu.json',
'dom:mc': 'topo_dom_mc.json',
'daint:gpu': 'topo_dom_gpu.json',
'daint:mc': 'topo_dom_mc.json',
'eiger:mc': 'topo_eiger_mc.json',
'ault:amdv100': 'topo_ault_amdv100.json',
}

@rfm.run_before('compile')
def set_topo_file(self):
'''Set the topo_file variable.

If not present in the topology dict, leave it as required.
'''
cp = self.current_partition.fullname
self.topo_file = self.topology[cp]
if cp in self.topology:
self.topo_file = self.topology[cp]

# FIXME: Update the hook below once the PR #1773 is merged.
@rfm.run_after('compile')
Expand Down Expand Up @@ -171,6 +188,7 @@ def parse_output(self):
'''Extract the data from the affinity tool.'''

re_aff_cpus = r'CPU affinity: \[\s+(?P<cpus>[\d+\s+]+)\]'

def parse_cpus(x):
return sorted([int(xi) for xi in x.split()])

Expand Down Expand Up @@ -219,10 +237,7 @@ class AffinityOpenMPBase(AffinityTestBase):

omp_bind = variable(str)
omp_proc_bind = variable(str, value='spread')

def __init__(self):
super().__init__()
self.num_tasks = 1
num_tasks = 1

@property
def ncpus_per_task(self):
Expand Down Expand Up @@ -251,10 +266,7 @@ class OneThreadPerLogicalCoreOpenMP(AffinityOpenMPBase):
'''Pin each OMP thread to a different logical core.'''

omp_bind = 'threads'

def __init__(self):
super().__init__()
self.descr = 'Pin one OMP thread per CPU.'
descr = 'Pin one OMP thread per CPU.'

@property
def num_omp_threads(self):
Expand All @@ -280,10 +292,7 @@ class OneThreadPerPhysicalCoreOpenMP(AffinityOpenMPBase):
'''Pin each OMP thread to a different physical core.'''

omp_bind = 'cores'

def __init__(self):
super().__init__()
self.descr = 'Pin one OMP thread per core.'
descr = 'Pin one OMP thread per core.'

@property
def num_omp_threads(self):
Expand Down Expand Up @@ -311,10 +320,8 @@ def consume_cpu_set(self):
class OneThreadPerPhysicalCoreOpenMPnomt(OneThreadPerPhysicalCoreOpenMP):
'''Only one cpu per core booked without multithread.'''

def __init__(self):
super().__init__()
self.descr = 'Pin one OMP thread per core wo. multithreading.'
self.use_multithreading = False
use_multithreading = False
descr = 'Pin one OMP thread per core wo. multithreading.'

@property
def ncpus_per_task(self):
Expand All @@ -332,10 +339,7 @@ class OneThreadPerSocketOpenMP(AffinityOpenMPBase):
'''Pin each OMP thread to a different socket.'''

omp_bind = 'sockets'

def __init__(self):
super().__init__()
self.descr = 'Pin one OMP thread per socket.'
descr = 'Pin one OMP thread per socket.'

@property
def num_omp_threads(self):
Expand Down Expand Up @@ -366,11 +370,8 @@ class OneTaskPerSocketOpenMPnomt(AffinityOpenMPBase):

omp_bind = 'sockets'
omp_proc_bind = 'close'

def __init__(self):
super().__init__()
self.descr = 'One task per socket - wo. multithreading.'
self.use_multithreading = False
descr = 'One task per socket - wo. multithreading.'
use_multithreading = False

@property
def num_omp_threads(self):
Expand Down Expand Up @@ -423,29 +424,24 @@ class OneTaskPerSocketOpenMP(OneTaskPerSocketOpenMPnomt):
and the number of OMP threads.
'''

def __init__(self):
super().__init__()
self.descr = 'One task per socket - w. multithreading.'
self.use_multithreading = True
descr = 'One task per socket - w. multithreading.'
use_multithreading = True

@property
def num_omp_threads(self):
return int(self.num_cpus/self.num_sockets)


@rfm.simple_test
class ConsecutiveNumaFilling(AffinityTestBase):
'''Fill the NUMA nodes with the tasks in consecutive order.
class ConsecutiveSocketFilling(AffinityTestBase):
'''Fill the sockets with the tasks in consecutive order.

This test uses as many tasks as physical cores available in a node.
Multithreading is disabled.
'''

cpu_bind = 'rank'

def __init__(self):
super().__init__()
self.use_multithreading = False
use_multithreading = False

@rfm.run_before('run')
def set_tasks(self):
Expand All @@ -456,10 +452,10 @@ def set_tasks(self):
def consume_cpu_set(self):
'''Check that all physical cores have been used in the right order.'''
task_count = 0
for numa_number in range(self.num_numa_nodes):
# Keep track of the CPUs present in this NUMA node
for socket_number in range(self.num_sockets):
# Keep track of the CPUs present in this socket
cpus_present = set()
for task_number in range(int(self.num_tasks/self.num_numa_nodes)):
for task_number in range(int(self.num_tasks/self.num_sockets)):
# Get the list of CPUs with affinity
affinity_set = self.aff_cpus[task_count]

Expand All @@ -477,14 +473,14 @@ def consume_cpu_set(self):

task_count += 1

# Ensure all CPUs belong to the same NUMA node
cpuset_by_numa = self.get_sibling_cpus(
next(iter(cpus_present)), by='node'
# Ensure all CPUs belong to the same socket
cpuset_by_socket = self.get_sibling_cpus(
next(iter(cpus_present)), by='socket'
)
if (not all(cpu in cpuset_by_numa for cpu in cpus_present) and
len(cpuset_by_numa)==len(cpus_present)):
if (not all(cpu in cpuset_by_socket for cpu in cpus_present) and
len(cpuset_by_socket) == len(cpus_present)):
raise SanityError(
f'numa node {numa_number} not filled in order'
f'socket {socket_number} not filled in order'
)

else:
Expand All @@ -493,54 +489,96 @@ def consume_cpu_set(self):


@rfm.simple_test
class AlternateNumaFilling(AffinityTestBase):
'''Numa nodes are filled in a round-robin fashion.
class AlternateSocketFilling(AffinityTestBase):
'''Sockets are filled in a round-robin fashion.

This test uses as many tasks as physical cores available in a node.
Multithreading is disabled.
'''

def __init__(self):
super().__init__()
self.use_multithreading = False
use_multithreading = False

@rfm.run_before('run')
def set_tasks(self):
self.num_tasks = int(self.num_cpus/self.num_cpus_per_core)
self.num_cpus_per_task = 1
self.num_tasks_per_numa = int(self.num_tasks/self.num_numa_nodes)
self.num_tasks_per_socket = int(self.num_tasks/self.num_sockets)

@rfm.run_before('sanity')
def consume_cpu_set(self):
'''Check that consecutive tasks are round-robin pinned to numa nodes.'''
'''Check that consecutive tasks are round-robin pinned to sockets.'''

# Get a set per numa node to keep track of the CPUs
numa_nodes = [set() for s in range(self.num_numa_nodes)]
# Get a set per socket to keep track of the CPUs
sockets = [set() for s in range(self.num_sockets)]
task_count = 0
for task in range(self.num_tasks_per_numa):
for s in range(self.num_numa_nodes):
for task in range(self.num_tasks_per_socket):
for s in range(self.num_sockets):
# Get the list of CPUs with affinity
affinity_set = self.aff_cpus[task_count]

# Only 1 CPU per affinity set is allowed
if ((len(affinity_set) > 1) or
(any(cpu in numa_nodes[s] for cpu in affinity_set)) or
(any(cpu not in self.numa_nodes[s] for cpu in affinity_set))):
(any(cpu in sockets[s] for cpu in affinity_set)) or
(any(cpu not in self.sockets[s] for cpu in affinity_set))):
raise SanityError(
f'incorrect affinity set for task {task_count}'
)

else:
numa_nodes[s].update(
sockets[s].update(
self.get_sibling_cpus(affinity_set[0], by='core')
)

task_count += 1

# Check that all numa nodes have the same CPU count
if not all(len(s) == (task+1)*2 for s in numa_nodes):
# Check that all sockets have the same CPU count
if not all(len(s) == (task+1)*2 for s in sockets):
self.cpu_set.add(-1)

# Decrement the NUMA nodes from the CPU set
for s in numa_nodes:
# Decrement the socket set from the CPU set
for s in sockets:
self.cpu_set -= s


@rfm.simple_test
class OneTaskPerNumaNode(AffinityTestBase):
'''Place a task on each NUMA node.

The trick here is to "pad" the tasks with --cpus-per-task.
The same could be done to target any cache level instead.
Multithreading is disabled.
'''

valid_systems = ['eiger:mc', 'pilatus:mc']
use_multithreading = False
num_cpus_per_task = required

@rfm.run_before('compile')
def build_settings(self):
self.build_system.options += ['OPENMP=0']

@rfm.run_before('run')
def set_tasks(self):
self.num_tasks = self.num_numa_nodes
if self.current_partition.fullname in {'eiger:mc', 'pilatus:mc'}:
self.num_cpus_per_task = 16

@rfm.run_before('sanity')
def consume_cpu_set(self):
'''Check that each task lives in a different NUMA node.'''

if len(self.aff_cpus) != self.num_numa_nodes:
raise SanityError(
'number of tasks does not match the number of numa nodes'
)

for numa_node, aff_set in enumerate(self.aff_cpus):
cpuset_by_numa = self.get_sibling_cpus(aff_set[0], by='node')
if (len(aff_set) != self.num_cpus_per_task or
any(cpu not in cpuset_by_numa for cpu in aff_set)):
raise SanityError(
f'incorrect affinity set for numa node {numa_node}'
)
else:
# Decrement the current NUMA node from the available CPU set
self.cpu_set -= cpuset_by_numa