diff --git a/emmet-builders/emmet/builders/vasp/materials.py b/emmet-builders/emmet/builders/vasp/materials.py index fe6f10608b..9608ae1935 100644 --- a/emmet-builders/emmet/builders/vasp/materials.py +++ b/emmet-builders/emmet/builders/vasp/materials.py @@ -91,6 +91,36 @@ def ensure_indexes(self): self.task_validation.ensure_index("task_id") self.task_validation.ensure_index("valid") + def prechunk(self, number_splits: int) -> Iterable[Dict]: + """Prechunk the materials builder for distributed computation""" + temp_query = dict(self.query) + temp_query["state"] = "successful" + if len(self.settings.BUILD_TAGS) > 0 and len(self.settings.EXCLUDED_TAGS) > 0: + temp_query["$and"] = [ + {"tags": {"$in": self.settings.BUILD_TAGS}}, + {"tags": {"$nin": self.settings.EXCLUDED_TAGS}}, + ] + elif len(self.settings.BUILD_TAGS) > 0: + temp_query["tags"] = {"$in": self.settings.BUILD_TAGS} + + self.logger.info("Finding tasks to process") + all_tasks = { + doc[self.tasks.key] + for doc in self.tasks.query(temp_query, [self.tasks.key]) + } + processed_tasks = { + t_id + for d in self.materials.query({}, ["task_ids"]) + for t_id in d.get("task_ids", []) + } + to_process_tasks = all_tasks - processed_tasks + to_process_forms = self.tasks.distinct( + "formula_pretty", {self.tasks.key: {"$in": list(to_process_tasks)}} + ) + + for formula_chunk in grouper(to_process_forms, number_splits): + yield {"formula_pretty": {"$in": list(formula_chunk)}} + def get_items(self) -> Iterator[List[Dict]]: """ Gets all items to process into materials documents. diff --git a/emmet-builders/emmet/builders/vasp/thermo.py b/emmet-builders/emmet/builders/vasp/thermo.py index b7f5df8def..e1f23f4538 100644 --- a/emmet-builders/emmet/builders/vasp/thermo.py +++ b/emmet-builders/emmet/builders/vasp/thermo.py @@ -74,6 +74,21 @@ def ensure_indexes(self): self.thermo.ensure_index("material_id") self.thermo.ensure_index("last_updated") + def prechunk(self, number_splits: int) -> Iterable[Dict]: + updated_chemsys = self.get_updated_chemsys() + new_chemsys = self.get_new_chemsys() + + affected_chemsys = self.get_affected_chemsys(updated_chemsys | new_chemsys) + + # Remove overlapping chemical systems + to_process_chemsys = set() + for chemsys in updated_chemsys | new_chemsys | affected_chemsys: + if chemsys not in to_process_chemsys: + to_process_chemsys |= chemsys_permutations(chemsys) + + for chemsys_chunk in grouper(to_process_chemsys, number_splits): + yield {"chemsys": {"$in": list(chemsys_chunk)}} + def get_items(self) -> Iterator[List[Dict]]: """ Gets whole chemical systems of entries to process