Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/rigdenlab/ample
Browse files Browse the repository at this point in the history
  • Loading branch information
hlasimpk committed Aug 3, 2020
2 parents 1a437be + 3d94390 commit dc72ada
Show file tree
Hide file tree
Showing 15 changed files with 264 additions and 225 deletions.
17 changes: 8 additions & 9 deletions ample/ensembler/__init__.py
Expand Up @@ -36,6 +36,8 @@
from ample.util import pdb_edit
from ample.util import printTable

from pyjob import Script

logger = logging.getLogger(__name__)


Expand All @@ -57,15 +59,12 @@ def cluster_script(amoptd, python_path="ccp4-python"):
"""
# write out script
work_dir = amoptd['work_dir']
script_path = os.path.join(work_dir, "submit_ensemble.sh")
with open(script_path, "w") as job_script:
job_script.write(ample_util.SCRIPT_HEADER + os.linesep)
job_script.write("export CCP4_SCR=${TMPDIR}" + os.linesep) # Added by Ronan after issues on CCP4online server
job_script.write("ccp4-python -m ample.ensembler -restart_pkl {0}".format(amoptd['results_path']) + os.linesep)

# Make executable
os.chmod(script_path, 0o777)
return script_path
script = Script(directory=work_dir, stem="submit_ensemble")
script.append("export CCP4_SCR=${TMPDIR}")
script.append("ccp4-python -m ample.ensembler -restart_pkl {0}".format(amoptd['results_path']))
script.write()

return script


def create_ensembles(amoptd):
Expand Down
115 changes: 66 additions & 49 deletions ample/main.py
Expand Up @@ -8,6 +8,7 @@
import shutil
import sys
import time
import warnings

from ample import ensembler
from ample.ensembler.constants import UNMODIFIED
Expand All @@ -24,9 +25,10 @@
from ample.util import process_models
from ample.util import pyrvapi_results
from ample.util import reference_manager
from ample.util import workers_util
from ample.util import version

from pyjob.factory import TaskFactory

__author__ = "Jens Thomas, Felix Simkovic, Adam Simpkin, Ronan Keegan, and Jaclyn Bibby"
__credits__ = "Daniel Rigden, Martyn Winn, and Olga Mayans"
__email__ = "drigden@liverpool.ac.uk"
Expand Down Expand Up @@ -79,6 +81,17 @@ def monitor():
else:
monitor = None

# Highlight deprecated command line arguments
if amopt.d['submit_cluster']:
message = "-%s has been deprecated and will be removed in version %s!" % ('submit_cluster', 1.6)
warnings.warn(message, DeprecationWarning)
if amopt.d["submit_pe_lsf"]:
message = "-%s has been deprecated and will be removed in version %s! Use -submit_pe instead" % ('submit_pe_lsf', 1.6)
warnings.warn(message, DeprecationWarning)
if amopt.d["submit_pe_sge"]:
message = "-%s has been deprecated and will be removed in version %s! Use -submit_pe instead" % ('submit_pe_sge', 1.6)
warnings.warn(message, DeprecationWarning)

# Process any files we may have been given
model_results = process_models.extract_and_validate_models(amopt.d)
if model_results:
Expand Down Expand Up @@ -147,24 +160,25 @@ def monitor():
return

def benchmarking(self, optd):
if optd['submit_cluster']:
if optd['submit_qtype'] != 'local':
# Pickle dictionary so it can be opened by the job to get the parameters
ample_util.save_amoptd(optd)
script = benchmark_util.cluster_script(optd)
workers_util.run_scripts(
job_scripts=[script],
monitor=monitor,
nproc=optd['nproc'],
job_time=43200,
job_name='benchmark',
submit_cluster=optd['submit_cluster'],
submit_qtype=optd['submit_qtype'],
submit_queue=optd['submit_queue'],
submit_pe_lsf=optd['submit_pe_lsf'],
submit_pe_sge=optd['submit_pe_sge'],
submit_array=optd['submit_array'],
submit_max_array=optd['submit_max_array'],
)
with TaskFactory(
optd['submit_qtype'],
script,
cwd=optd['work_dir'],
environment=optd['submit_pe'],
run_time=43200,
name='benchmark',
nprocesses=optd['nproc'],
max_array_size=optd['submit_max_array'],
queue=optd['submit_queue'],
shell="/bin/bash",
) as task:
task.run()
task.wait(interval=5, monitor_f=monitor)

# queue finished so unpickle results
optd.update(ample_util.read_amoptd(optd['results_path']))
else:
Expand Down Expand Up @@ -255,25 +269,25 @@ def ensembling(self, optd):
msg = "ERROR! Cannot find any pdb files in: {0}".format(optd['models_dir'])
exit_util.exit_error(msg)
optd['ensemble_ok'] = os.path.join(optd['work_dir'], 'ensemble.ok')
if optd['submit_cluster']:
if optd['submit_qtype'] != 'local':
# Pickle dictionary so it can be opened by the job to get the parameters
ample_util.save_amoptd(optd)
script = ensembler.cluster_script(optd)
ensembler_timeout = ensembler.get_ensembler_timeout(optd)
workers_util.run_scripts(
job_scripts=[script],
monitor=monitor,
nproc=optd['nproc'],
job_time=ensembler_timeout,
job_name='ensemble',
submit_cluster=optd['submit_cluster'],
submit_qtype=optd['submit_qtype'],
submit_queue=optd['submit_queue'],
submit_pe_lsf=optd['submit_pe_lsf'],
submit_pe_sge=optd['submit_pe_sge'],
submit_array=optd['submit_array'],
submit_max_array=optd['submit_max_array'],
)
with TaskFactory(
optd['submit_qtype'],
script,
cwd=optd['work_dir'],
environment=optd['submit_pe'],
run_time=ensembler_timeout,
name='benchmark',
nprocesses=optd['nproc'],
max_array_size=optd['submit_max_array'],
queue=optd['submit_queue'],
shell="/bin/bash",
) as task:
task.run()
task.wait(interval=5, monitor_f=monitor)
# queue finished so unpickle results
optd.update(ample_util.read_amoptd(optd['results_path']))
else:
Expand Down Expand Up @@ -402,24 +416,27 @@ def monitor():

# Change to mrbump directory before running
os.chdir(optd['mrbump_dir'])
ok = workers_util.run_scripts(
job_scripts=optd['mrbump_scripts'],
monitor=monitor,
check_success=mrbump_util.checkSuccess,
early_terminate=optd['early_terminate'],
nproc=optd['nproc'],
job_time=mrbump_util.MRBUMP_RUNTIME,
job_name='mrbump',
submit_cluster=optd['submit_cluster'],
submit_qtype=optd['submit_qtype'],
submit_queue=optd['submit_queue'],
submit_pe_lsf=optd['submit_pe_lsf'],
submit_pe_sge=optd['submit_pe_sge'],
submit_array=optd['submit_array'],
submit_max_array=optd['submit_max_array'],
)

if not ok:

with TaskFactory(
optd['submit_qtype'],
optd['mrbump_scripts'],
cwd=bump_dir,
environment=optd['submit_pe'],
run_time=mrbump_util.MRBUMP_RUNTIME,
name="mrbump",
nprocesses=optd['nproc'],
max_array_size=optd['submit_max_array'],
queue=optd['submit_queue'],
shell="/bin/bash",
) as task:
task.run()

if optd['early_terminate']:
task.wait(interval=5, monitor_f=monitor, success_f=mrbump_util.checkSuccess)
else:
task.wait(interval=5, monitor_f=monitor)

if not task.completed:
msg = (
"An error code was returned after running MRBUMP on the ensembles!\n"
+ "For further information check the logs in directory: {0}".format(optd['mrbump_dir'])
Expand Down
6 changes: 3 additions & 3 deletions ample/modelling/__main__.py
Expand Up @@ -19,7 +19,7 @@ def process_args(args):
if args.rosetta_flagsfile:
args.rosetta_flagsfile = os.path.abspath(args.rosetta_flagsfile)
if args.nproc is None:
if args.submit_cluster:
if args.submit_qtype != 'local':
args.nproc = 1
else:
args.nproc = multiprocessing.cpu_count()
Expand All @@ -37,7 +37,7 @@ def process_args(args):
argparse_util.add_cluster_submit_options(parser)

work_dir = os.path.abspath('rosetta_modelling')
parser.set_defaults(submit_cluster=False, submit_qtype='SGE', submit_array=True, nmodels=1000, work_dir=work_dir)
parser.set_defaults(submit_qtype='SGE', submit_array=True, nmodels=1000, work_dir=work_dir)
args = parser.parse_args()
process_args(args)

Expand Down Expand Up @@ -66,7 +66,7 @@ def process_args(args):
rm.nchains = args.nchains

rm.nproc = args.nproc
rm.submit_cluster = args.submit_cluster
rm.submit_cluster = args.submit_qtype != "local"
rm.submit_qtype = args.submit_qtype
rm.submit_queue = args.submit_queue
rm.submit_array = args.submit_array
Expand Down

0 comments on commit dc72ada

Please sign in to comment.