Skip to content

Commit

Permalink
Add RW lock support. (#285)
Browse files Browse the repository at this point in the history
This allows decoupling the taskset for compilation (even supporting multiple compilation processes per cpu),
from the evaluation process which is now better isolated.
This allows setting up tasksets that let us run on multiple cores in each evaluation.
This is a prerequisite for running in parallel without prohibitive interferences.
  • Loading branch information
nicolasvasilache committed Feb 21, 2022
1 parent a2c696a commit 2b0adb8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
48 changes: 43 additions & 5 deletions python/examples/core/nevergrad_parallel_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import numpy as np
import multiprocessing as mp
import os
from prwlock import RWLock
import signal
import sys
import typing as tp
Expand Down Expand Up @@ -51,6 +52,9 @@ def ipc_state(self):


def compile_and_run_checked_mp(problem: ProblemInstance, \
lock: RWLock,
cpu_min: int,
cpu_max: int,
scheduler: NGSchedulerInterface,
proposal,
n_iters: int,
Expand All @@ -65,24 +69,39 @@ def compile_and_run_checked_mp(problem: ProblemInstance, \
"""
try:

def signal_handler(sig, frame):
while lock.nlocks > 0:
lock.release()

signal.signal(signal.SIGTERM, signal_handler)

# Construct the schedule and save the module in case we need to replay later.
def schedule_and_save(module):
scheduler.schedule(module, proposal)
# TODO: save and report on error.

f = io.StringIO()
with redirect_stdout(f):
lock.acquire_read()
problem.compile_with_schedule_builder(
entry_point_name=scheduler.entry_point_name,
fun_to_benchmark_name=scheduler.fun_to_benchmark_name,
compile_time_problem_sizes_dict= \
scheduler.build_compile_time_problem_sizes(),
schedule_builder=schedule_and_save)
lock.release()

lock.acquire_write()
# Pin process to the cpu_min-cpu_max range to allow parallel evaluations
# without prohibitive interferences.
os.system(
f'taskset -p -c {cpu_min}-{cpu_max} {os.getpid()} > /dev/null 2>&1')

throughputs = problem.run(
n_iters=n_iters,
entry_point_name=scheduler.entry_point_name,
runtime_problem_sizes_dict=problem.compile_time_problem_sizes_dict)
lock.release()

# TODO: redirect to a file if we want this information.
f.flush()
Expand All @@ -91,6 +110,9 @@ def schedule_and_save(module):
except Exception as e:
import traceback
traceback.print_exc()
while lock.nlocks > 0:
lock.release()

# TODO: save to replay errors.
print(e)
ipc_dict['result'] = IPCState(success=False, throughputs=None)
Expand All @@ -99,7 +121,17 @@ def schedule_and_save(module):
def cpu_count():
return len(os.sched_getaffinity(0))


def get_cpu_range_for_evaluation(parsed_args, evaluation_slot_idx):
num_cpus = cpu_count()
num_runs = parsed_args.num_concurrent_evaluations
max_cpus_per_concurrent_run = num_cpus / num_runs
return evaluation_slot_idx * max_cpus_per_concurrent_run, \
min((evaluation_slot_idx + 1) * max_cpus_per_concurrent_run,
num_cpus - 1)

def ask_and_fork_process(mp_manager: mp.Manager, \
lock: RWLock,
problem_definition: ProblemDefinition,
problem_types: tp.Sequence[np.dtype],
ng_mp_evaluations: tp.Sequence[NGMPEvaluation],
Expand All @@ -125,16 +157,17 @@ def ask_and_fork_process(mp_manager: mp.Manager, \
time_left=parsed_args.timeout_per_compilation)
return

cpu_min, cpu_max = get_cpu_range_for_evaluation(parsed_args,
evaluation_slot_idx)
# Start process that compiles and runs.
ipc_dict = mp_manager.dict()
p = mp.Process(target=compile_and_run_checked_mp,
args=[
problem_instance, scheduler, proposal, parsed_args.n_iters,
ipc_dict
problem_instance, lock, cpu_min, cpu_max, scheduler,
proposal, parsed_args.n_iters, ipc_dict
])
p.start()
# Best effort pin process in a round-robin fashion.
# This is noisy so suppress its name.
# Pin process in a round-robin fashion. This is noisy so suppress its io.
f = io.StringIO()
with redirect_stdout(f):
os.system(
Expand Down Expand Up @@ -296,6 +329,9 @@ def async_optim_loop(problem_definition: ProblemDefinition, \
search_number = 0
throughputs = []
ng_mp_evaluations = [None] * parsed_args.num_compilation_processes
evaluation_locks = [
RWLock() for idx in range(parsed_args.num_concurrent_evaluations)
]

interrupted = []

Expand Down Expand Up @@ -327,9 +363,11 @@ def signal_handler(sig, frame):

# We are sure there is at least one empty slot.
compilation_number = ng_mp_evaluations.index(None)
lock_idx = compilation_number % len(evaluation_locks)

# Fill that empty slot.
ask_and_fork_process(mp_manager, problem_definition, [np.float32] * 3,
ask_and_fork_process(mp_manager, evaluation_locks[lock_idx],
problem_definition, [np.float32] * 3,
ng_mp_evaluations, compilation_number, scheduler,
parsed_args)

Expand Down
4 changes: 4 additions & 0 deletions python/examples/core/nevergrad_tuner_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ def add_argparser_tuning_arguments(parser: ArgumentParser):
type=int,
nargs='?',
default=1)
parser.add_argument('--num-concurrent-evaluations',
type=int,
nargs='?',
default=1)
parser.add_argument('--random-seed', type=int, nargs='?', default=42)
parser.add_argument('--search-budget', type=int, nargs='?', default=100)
parser.add_argument(
Expand Down

0 comments on commit 2b0adb8

Please sign in to comment.