Skip to content

Commit

Permalink
Stop using slot manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Bo Peng committed Sep 20, 2018
1 parent 81da3a3 commit abc4100
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 113 deletions.
38 changes: 16 additions & 22 deletions src/sos/actions.py
Expand Up @@ -30,7 +30,7 @@
from .syntax import SOS_ACTION_OPTIONS
from .targets import (UnknownTarget, executable, file_target, fileMD5, path,
paths, sos_targets, textMD5)
from .utils import (SlotManager, StopInputGroup, TerminateExecution,
from .utils import (StopInputGroup, TerminateExecution,
TimeoutInterProcessLock, env, get_traceback, short_repr,
transcribe)

Expand Down Expand Up @@ -640,7 +640,7 @@ def stop_if(expr, msg=''):
#


def downloadURL(URL, dest, decompress=False, index=None, slot=None):
def downloadURL(URL, dest, decompress=False, index=None):
dest = os.path.abspath(os.path.expanduser(dest))
dest_dir, filename = os.path.split(dest)
#
Expand Down Expand Up @@ -712,21 +712,21 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
message + ': \033[32m signature calculated\033[0m')
prog.update()
prog.close()
return True, slot
return True
elif env.config['sig_mode'] == 'ignore':
prog.set_description(
message + ': \033[32m use existing\033[0m')
prog.update()
prog.close()
return True, slot
return True
elif env.config['sig_mode'] == 'default':
prog.update()
if sig.validate():
prog.set_description(
message + ': \033[32m Validated\033[0m')
prog.update()
prog.close()
return True, slot
return True
else:
prog.set_description(
message + ':\033[91m Signature mismatch\033[0m')
Expand Down Expand Up @@ -784,7 +784,7 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
os.remove(dest_tmp)
except OSError:
pass
return False, slot
return False
except Exception as e:
prog.set_description(message + f':\033[91m {e}\033[0m')
prog.update()
Expand All @@ -793,7 +793,7 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
os.remove(dest_tmp)
except OSError:
pass
return False, slot
return False
#
os.rename(dest_tmp, dest)
decompressed = 0
Expand All @@ -810,7 +810,7 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
if os.path.isdir(os.path.join(dest_dir, name)):
continue
elif not os.path.isfile(os.path.join(dest_dir, name)):
return False, slot
return False
else:
#sig.add(os.path.join(dest_dir, name))
decompressed += 1
Expand All @@ -825,7 +825,7 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
files = [x.name for x in tar.getmembers() if x.isfile()]
for name in files:
if not os.path.isfile(os.path.join(dest_dir, name)):
return False, slot
return False
else:
#sig.add(os.path.join(dest_dir, name))
decompressed += 1
Expand Down Expand Up @@ -873,13 +873,13 @@ def downloadURL(URL, dest, decompress=False, index=None, slot=None):
if env.verbosity > 2:
sys.stderr.write(get_traceback())
env.logger.error(f'Failed to download: {e}')
return False, slot
return False
finally:
# if there is something wrong still remove temporary file
if os.path.isfile(dest_tmp):
os.remove(dest_tmp)
sig.write_sig()
return os.path.isfile(dest), slot
return os.path.isfile(dest)


@SoS_Action(acceptable_args=['URLs', 'dest_dir', 'dest_file', 'decompress', 'max_jobs'])
Expand Down Expand Up @@ -936,24 +936,18 @@ def download(URLs, dest_dir='.', dest_file=None, decompress=False, max_jobs=5):
with mp.Pool(processes=max_jobs) as pool:
for idx, (url, uh, filename) in enumerate(zip(urls, url_hash, filenames)):
# if there is alot, start download
sm = SlotManager(name=uh).acquire(1, max_jobs, wait=True)
succ[idx] = pool.apply_async(downloadURL, (url, filename,
decompress, idx, uh), callback=lambda x: SlotManager(name=x[1]).release(1))
decompress, idx, uh))
succ = [x.get() if isinstance(x, mp.pool.AsyncResult)
else x for x in succ]
else:
try:
sm = SlotManager(name=url_hash[0])
sm.acquire(1, max_jobs, wait=True)
succ[0] = downloadURL(urls[0], filenames[0],
decompress=decompress, slot=url_hash[0])
finally:
sm.release(1)
#
succ[0] = downloadURL(urls[0], filenames[0],
decompress=decompress)

# for su, url in zip(succ, urls):
# if not su:
# env.logger.warning('Failed to download {}'.format(url))
failed = [y for x, y in zip(succ, urls) if not x[0]]
failed = [y for x, y in zip(succ, urls) if not x]
if failed:
if len(urls) == 1:
raise RuntimeError('Failed to download {urls[0]}')
Expand Down
4 changes: 3 additions & 1 deletion src/sos/signatures.py
Expand Up @@ -346,8 +346,10 @@ def run(self):
req_socket.send_pyobj(self.step_signatures.get(*msg[2:]))
else:
env.logger.warning(f'Unknown request {msg}')
elif msg[0] == 'clients':
elif msg[0] == 'nprocs':
req_socket.send_pyobj(self._num_clients)
else:
raise RuntimeError(f'Unrecognized request {msg}')

if monitor_socket in socks:
evt = recv_monitor_message(monitor_socket)
Expand Down
21 changes: 10 additions & 11 deletions src/sos/step_executor.py
Expand Up @@ -32,7 +32,7 @@
UnknownTarget, dynamic, file_target, path, paths, remote,
sos_targets, sos_step)
from .tasks import MasterTaskParams, TaskParams, TaskFile
from .utils import (SlotManager, StopInputGroup, TerminateExecution, ArgumentError, env,
from .utils import (StopInputGroup, TerminateExecution, ArgumentError, env,
expand_size, format_HHMMSS, get_traceback, short_repr)

__all__ = []
Expand Down Expand Up @@ -1165,7 +1165,8 @@ def prepare_task(self):

def wait_for_results(self):
if self.concurrent_substep:
sm = SlotManager()
env.signature_req_socket.send_pyobj(['nprocs'])
nProcs = env.signature_req_socket.recv_pyobj()
nMax = env.config.get(
'max_procs', max(int(os.cpu_count() / 2), 1))
if nMax > self.worker_pool._processes - 1 and len(self._substeps) > nMax:
Expand All @@ -1177,15 +1178,14 @@ def wait_for_results(self):
self.proc_results = [x.get()
for x in self.proc_results]
break
if sm.available(nMax) > 0:
extra = sm.acquire(nPending - 1, nMax)
if nMax > nProcs:
extra = max(min(nMax - nProcs, nPending), 0)
if extra > 0:
self.worker_pool.grow(extra)
env.logger.debug(f'Expand pool by {extra} slots')
time.sleep(1)
else:
self.proc_results = [x.get() for x in self.proc_results]
sm.release(self.worker_pool._processes - 1)
self.worker_pool.close()
self.worker_pool.join()
self.worker_pool = None
Expand Down Expand Up @@ -1491,11 +1491,11 @@ def run(self):
env.logger.debug(
'Input groups are executed sequentially because of existence of nested workflow.')
else:
sm = SlotManager()
# because the master process pool will count one worker in (step)
gotten = sm.acquire(len(self._substeps) - 1,
env.config.get('max_procs', max(int(os.cpu_count() / 2), 1)))
env.logger.debug(
env.signature_req_socket.send_pyobj(['nprocs'])
nProcs = env.signature_req_socket.recv_pyobj()
nMax = env.config.get('max_procs', max(int(os.cpu_count() / 2), 1))
gotten = max(min(nMax - nProcs, len(self._substeps) - 1), 0)
env.logger.trace(
f'Using process pool with size {gotten+1}')
self.worker_pool = Pool(gotten + 1)

Expand Down Expand Up @@ -1840,7 +1840,6 @@ def file_only(targets):
while self.worker_pool:
try:
self.worker_pool.terminate()
SlotManager().release(self.worker_pool._processes - 1)
self.worker_pool = None
except KeyboardInterrupt:
continue
Expand Down
62 changes: 0 additions & 62 deletions src/sos/utils.py
Expand Up @@ -999,68 +999,6 @@ def sos_handle_parameter_(key, defvalue):
# else:
# return True


class SlotManager(object):
#
# A slot file writes the number of USED slots.
#
def __init__(self, reset=False, name=None):
# if a name is not given, the slot will be workflow dependent
self.name = name if name else env.config['master_id']
self.lock_file = os.path.join(env.temp_dir, f'slot_{self.name}.lck')
self.slot_file = os.path.join(env.temp_dir, f'slot_{self.name}.slot')
if reset or not os.path.isfile(self.lock_file):
with fasteners.InterProcessLock(self.lock_file):
self._write_slot(0)

def _read_slot(self):
with open(self.slot_file, 'r') as slot:
return int(slot.read())

def _write_slot(self, val):
with open(self.slot_file, 'w') as slot:
slot.write(str(val))

def available(self, max_slots=10):
with fasteners.InterProcessLock(self.lock_file):
slots = self._read_slot()
return max_slots - slots

def acquire(self, num=None, max_slots=10, force=False, wait=False):
# if num == None, request as many as possible slots
if num is None:
num = max_slots
while True:
with fasteners.InterProcessLock(self.lock_file):
slots = self._read_slot()
# return all available slots
avail = max_slots - slots
if avail >= num or not wait:
ret = num if force else max(min(num, avail), 0)
self._write_slot(ret + slots)
env.logger.debug(
f'{self.name}: {num} slots requested {ret} returned ({slots} active, force={force})')
return ret
# if not enough is available, wait
env.logger.debug(
f'{self.name}: {num} slots requested {avail} available, waiting for more slots')
time.sleep(1)

def release(self, num):
# if slot manager is not initialized (e.g. for interactive use), do not track it.
if not os.path.isfile(self.slot_file):
return 0
with fasteners.InterProcessLock(self.lock_file):
slots = self._read_slot()
if slots < num:
env.logger.warning(
f'{self.name}: Releasing {num} slots from {slots} available ones. Please report this bug to SoS developers.')
self._write_slot(max(0, slots - num))
env.logger.debug(
f'{self.name}: {num} slots released from {slots} active, {slots - num} remain')
return max(0, slots - num)


class TimeoutInterProcessLock(fasteners.InterProcessLock):
#
# #871
Expand Down
18 changes: 1 addition & 17 deletions src/sos/workflow_executor.py
Expand Up @@ -33,7 +33,7 @@
from .targets import (BaseTarget, RemovedTarget, UnavailableLock,
UnknownTarget, file_target, path, paths,
sos_step, sos_targets, sos_variable, textMD5)
from .utils import (Error, SlotManager, WorkflowDict, env, get_traceback,
from .utils import (Error, WorkflowDict, env, get_traceback,
load_config_files, load_var, pickleable, save_var,
short_repr)

Expand Down Expand Up @@ -276,10 +276,6 @@ def __init__(self, max_workers: int, master: bool = True) -> None:

# process pool that is used to pool temporarily unused processed.
self.pool = []

self.slot_manager = SlotManager(reset=master)
self.last_num_procs = None

self.max_workers = max_workers

def execute(self, runnable: Union[SoS_Node, dummy_node], config: Dict[str, Any], args: Any, spec: Any) -> None:
Expand All @@ -303,18 +299,6 @@ def add_placeholder_worker(self, runnable, pipe):
def all_busy(self) -> bool:
n = len([x for x in self.procs if x and not x.is_pending()
and not x.in_status('failed')])
if self.last_num_procs is None:
if n > 0:
self.slot_manager.acquire(n, self.max_workers)
self.last_num_procs = n
elif n != self.last_num_procs:
if self.last_num_procs > n:
self.slot_manager.release(self.last_num_procs - n)
else:
# we force the increase of numbers because the increment is observed
self.slot_manager.acquire(
n - self.last_num_procs, self.max_workers, force=True)
self.last_num_procs = n
return n >= self.max_workers

def all_done_or_failed(self) -> bool:
Expand Down

0 comments on commit abc4100

Please sign in to comment.