Skip to content

Commit

Permalink
add prechunk to thermo and materials
Browse files Browse the repository at this point in the history
  • Loading branch information
shyamd committed May 27, 2021
1 parent 870dad4 commit 08db446
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
30 changes: 30 additions & 0 deletions emmet-builders/emmet/builders/vasp/materials.py
Expand Up @@ -91,6 +91,36 @@ def ensure_indexes(self):
self.task_validation.ensure_index("task_id")
self.task_validation.ensure_index("valid")

def prechunk(self, number_splits: int) -> Iterable[Dict]:
"""Prechunk the materials builder for distributed computation"""
temp_query = dict(self.query)
temp_query["state"] = "successful"
if len(self.settings.BUILD_TAGS) > 0 and len(self.settings.EXCLUDED_TAGS) > 0:
temp_query["$and"] = [
{"tags": {"$in": self.settings.BUILD_TAGS}},
{"tags": {"$nin": self.settings.EXCLUDED_TAGS}},
]
elif len(self.settings.BUILD_TAGS) > 0:
temp_query["tags"] = {"$in": self.settings.BUILD_TAGS}

self.logger.info("Finding tasks to process")
all_tasks = {
doc[self.tasks.key]
for doc in self.tasks.query(temp_query, [self.tasks.key])
}
processed_tasks = {
t_id
for d in self.materials.query({}, ["task_ids"])
for t_id in d.get("task_ids", [])
}
to_process_tasks = all_tasks - processed_tasks
to_process_forms = self.tasks.distinct(
"formula_pretty", {self.tasks.key: {"$in": list(to_process_tasks)}}
)

for formula_chunk in grouper(to_process_forms, number_splits):
yield {"formula_pretty": {"$in": list(formula_chunk)}}

def get_items(self) -> Iterator[List[Dict]]:
"""
Gets all items to process into materials documents.
Expand Down
15 changes: 15 additions & 0 deletions emmet-builders/emmet/builders/vasp/thermo.py
Expand Up @@ -74,6 +74,21 @@ def ensure_indexes(self):
self.thermo.ensure_index("material_id")
self.thermo.ensure_index("last_updated")

def prechunk(self, number_splits: int) -> Iterable[Dict]:
updated_chemsys = self.get_updated_chemsys()
new_chemsys = self.get_new_chemsys()

affected_chemsys = self.get_affected_chemsys(updated_chemsys | new_chemsys)

# Remove overlapping chemical systems
to_process_chemsys = set()
for chemsys in updated_chemsys | new_chemsys | affected_chemsys:
if chemsys not in to_process_chemsys:
to_process_chemsys |= chemsys_permutations(chemsys)

for chemsys_chunk in grouper(to_process_chemsys, number_splits):
yield {"chemsys": {"$in": list(chemsys_chunk)}}

def get_items(self) -> Iterator[List[Dict]]:
"""
Gets whole chemical systems of entries to process
Expand Down

0 comments on commit 08db446

Please sign in to comment.