Skip to content

Commit

Permalink
Merge a96a407 into fb02493
Browse files Browse the repository at this point in the history
  • Loading branch information
montoyjh committed Feb 19, 2018
2 parents fb02493 + a96a407 commit 1ee59c6
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
53 changes: 48 additions & 5 deletions fireworks/core/launchpad.py
Expand Up @@ -13,6 +13,8 @@
import time
import traceback
from collections import OrderedDict, defaultdict
from itertools import chain
from tqdm import tqdm

from pymongo import MongoClient
from pymongo import DESCENDING, ASCENDING
Expand Down Expand Up @@ -313,6 +315,47 @@ def add_wf(self, wf, reassign_all=True):
self.m_logger.info('Added a workflow. id_map: {}'.format(old_new))
return old_new

def bulk_add_wfs(self, wfs):
"""
Adds a list of workflows to the fireworks database
using insert_many for both the fws and wfs, is
more efficient than adding them one at a time.
Args:
wfs ([Workflow]): list of workflows or fireworks
Returns:
None
"""
# Make all fireworks workflows
wfs = [Workflow.from_firework(wf) if isinstance(wf, Firework)
else wf for wf in wfs]

# Initialize new firework counter, starting from the next fw id
total_num_fws = sum([len(wf.fws) for wf in wfs])
new_fw_counter = self.fw_id_assigner.find_one_and_update(
{}, {'$inc': {'next_fw_id': total_num_fws}})['next_fw_id']
for wf in tqdm(wfs):
# Reassign fw_ids and increment the counter
old_new = dict(zip(
wf.id_fw.keys(),
range(new_fw_counter, new_fw_counter + len(wf.fws))))
wf._reassign_ids(old_new)
new_fw_counter += len(wf.fws)

# Set root fws to READY
for fw_id in wf.root_fw_ids:
wf.id_fw[fw_id].state = 'READY'
wf.fw_states[fw_id] = 'READY'

# Insert all fws and wfs, do workflows first so fws don't
# get checked out prematurely
self.workflows.insert_many(wf.to_db_dict() for wf in wfs)
all_fws = chain.from_iterable(wf.fws for wf in wfs)
self.fireworks.insert_many(fw.to_db_dict() for fw in all_fws)
return None

def append_wf(self, new_wf, fw_ids, detour=False, pull_spec_mods=True):
"""
Append a new workflow on top of an existing workflow.
Expand Down Expand Up @@ -1319,17 +1362,17 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
"fw_id": {"$ne": fw_id}}, {"fw_id": 1}):
duplicates.append(d['fw_id'])
duplicates = list(set(duplicates))

# Launch recovery
if recover_launch is not None:
recovery = self.get_recovery(fw_id, recover_launch)
recovery.update({'_mode': recover_mode})
set_spec = {'$set': {'spec._recovery': recovery}}
if recover_mode == 'prev_dir':
prev_dir = self.get_launch_by_id(recovery.get('_launch_id')).launch_dir
set_spec['$set']['spec._launch_dir'] = prev_dir
set_spec['$set']['spec._launch_dir'] = prev_dir
self.fireworks.find_one_and_update({"fw_id": fw_id}, set_spec)

# If no launch recovery specified, unset the firework recovery spec
else:
set_spec = {"$unset":{"spec._recovery":""}}
Expand All @@ -1355,7 +1398,7 @@ def rerun_fw(self, fw_id, rerun_duplicates=True, recover_launch=None, recover_mo
r = self.rerun_fw(f, rerun_duplicates=False, recover_launch=recover_launch,
recover_mode=recover_mode)
reruns.extend(r)

return reruns

def get_recovery(self, fw_id, launch_id='last'):
Expand Down Expand Up @@ -1540,7 +1583,7 @@ def recover_offline(self, launch_id, ignore_errors=False, print_errors=False):
})
if f:
self._refresh_wf(fw_id)

if 'checkpoint' in offline_data:
m_launch.touch_history(checkpoint=offline_data['checkpoint'])
self.launches.find_one_and_replace({'launch_id': m_launch.launch_id},
Expand Down
20 changes: 19 additions & 1 deletion fireworks/core/tests/test_launchpad.py
Expand Up @@ -54,7 +54,8 @@ def setUp(self):


def tearDown(self):
self.lp.reset(password=None,require_password=False)
self.lp.reset(password=None, require_password=False,
max_reset_wo_password=1000)
# Delete launch locations
if os.path.exists(os.path.join('FW.json')):
os.remove('FW.json')
Expand Down Expand Up @@ -113,6 +114,23 @@ def test_add_wf(self):
self.assertEqual(len(fw_ids), 3)
self.lp.reset('',require_password=False)

def test_add_wfs(self):
ftask = ScriptTask.from_str('echo "lorem ipsum"')
wfs = []
for _ in range(50):
# Add two workflows with 3 and 5 simple fireworks
wf3 = Workflow([Firework(ftask, name='lorem') for _ in range(3)],
name='lorem wf')
wf5 = Workflow([Firework(ftask, name='lorem') for _ in range(5)],
name='lorem wf')
wfs.extend([wf3, wf5])
self.lp.bulk_add_wfs(wfs)
num_fws_total = sum([len(wf.fws) for wf in wfs])
distinct_fw_ids = self.lp.fireworks.distinct('fw_id', {'name': 'lorem'})
self.assertEqual(len(distinct_fw_ids), num_fws_total)
num_wfs_in_db = len(self.lp.get_wf_ids({"name": "lorem wf"}))
self.assertEqual(num_wfs_in_db, len(wfs))


class LaunchPadDefuseReigniteRerunArchiveDeleteTest(unittest.TestCase):

Expand Down

0 comments on commit 1ee59c6

Please sign in to comment.