Skip to content

Commit

Permalink
working electrode builder
Browse files Browse the repository at this point in the history
  • Loading branch information
jmmshn committed Feb 4, 2021
1 parent 11de9c2 commit 8cb5dc7
Showing 1 changed file with 133 additions and 124 deletions.
257 changes: 133 additions & 124 deletions emmet-builders/emmet/builders/materials/electrodes.py
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
from functools import lru_cache
from itertools import groupby, chain
from pprint import pprint
from typing import Iterable, Dict, List, Any

from emmet.core.structure_group import StructureGroupDoc
Expand Down Expand Up @@ -271,127 +272,135 @@ def _remove_targets(self, rm_ids):
self.sgroups.remove_docs({"task_id": {"$in": rm_ids}})


# class InsertionElectrodeBuilder(MapBuilder):
# def __init__(
# self,
# grouped_materials: MongoStore,
# insertion_electrode: MongoStore,
# thermo: MongoStore,
# material: MongoStore,
# **kwargs,
# ):
# self.grouped_materials = grouped_materials
# self.insertion_electrode = insertion_electrode
# self.thermo = thermo
# self.material = material
# super().__init__(
# source=self.grouped_materials,
# target=self.insertion_electrode,
# query={"structure_matched": True, "has_distinct_compositions": True},
# **kwargs,
# )
#
# def get_items(self):
# """"""
#
# @lru_cache(None)
# def get_working_ion_entry(working_ion):
# with self.thermo as store:
# working_ion_docs = [*store.query({"chemsys": working_ion})]
# best_wion = min(
# working_ion_docs, key=lambda x: x["thermo"]["energy_per_atom"]
# )
# return best_wion
#
# def modify_item(item):
# self.logger.debug(
# f"Looking for {len(item['grouped_ids'])} task_ids in the Thermo DB."
# )
# with self.thermo as store:
# thermo_docs = [
# *store.query(
# {
# "$and": [
# {"task_id": {"$in": item["grouped_ids"]}},
# {"_sbxn": {"$in": ["core"]}},
# ]
# },
# properties=["task_id", "_sbxn", "thermo"],
# )
# ]
#
# with self.material as store:
# material_docs = [
# *store.query(
# {
# "$and": [
# {"task_id": {"$in": item["grouped_task_ids"]}},
# {"_sbxn": {"$in": ["core"]}},
# ]
# },
# properties=["task_id", "structure"],
# )
# ]
#
# self.logger.debug(f"Found for {len(thermo_docs)} Thermo Documents.")
# working_ion_doc = get_working_ion_entry(item["working_ion"])
# return {
# "task_id": item["task_id"],
# "working_ion_doc": working_ion_doc,
# "entry_data": item["entry_data"],
# "thermo_docs": thermo_docs,
# "material_docs": material_docs,
# }
#
# yield from map(modify_item, super().get_items())
#
# def unary_function(self, item):
# """
# - Add volume information to each entry to create the insertion electrode document
# - Add the host structure
# - TODO parse the structures in the different materials documents and create a simple migration graph
# """
# entries = [tdoc_["thermo"]["entry"] for tdoc_ in item["thermo_docs"]]
# entries = list(map(ComputedEntry.from_dict, entries))
# working_ion_entry = ComputedEntry.from_dict(
# item["working_ion_doc"]["thermo"]["entry"]
# )
# working_ion = working_ion_entry.composition.reduced_formula
# decomp_energies = {
# d_["task_id"]: d_["thermo"]["e_above_hull"] for d_ in item["thermo_docs"]
# }
# for ient in entries:
# if (
# Composition(item["entry_data"][ient.entry_id]["composition"])
# != ient.composition
# ):
# raise RuntimeError(
# f"In {item['task_id']}: the compositions for task {ient.entry_id} are matched between the StructureGroup DB and the Thermo DB "
# )
# ient.data["volume"] = item["entry_data"][ient.entry_id]["volume"]
# ient.data["decomposition_energy"] = decomp_energies[ient.entry_id]
#
# failed = False
# try:
# ie = InsertionElectrode.from_entries(entries, working_ion_entry)
# except:
# failed = True
#
# if failed or ie.num_steps < 1:
# res = {"task_id": item["task_id"], "has_step": False}
# else:
# res = {"task_id": item["task_id"], "has_step": True}
# res.update(ie.get_summary_dict())
# res["InsertionElectrode"] = ie.as_dict()
# least_wion_ent = min(
# entries, key=lambda x: x.composition.get_atomic_fraction(working_ion)
# )
# mdoc_ = next(
# filter(
# lambda x: x["task_id"] == least_wion_ent.entry_id,
# item["material_docs"],
# )
# )
# host_structure = Structure.from_dict(mdoc_["structure"])
# res["host_structure"] = host_structure.as_dict()
# return res
class InsertionElectrodeBuilder(MapBuilder):
def __init__(
self,
grouped_materials: MongoStore,
insertion_electrode: MongoStore,
thermo: MongoStore,
material: MongoStore,
query: dict = None,
**kwargs,
):
self.grouped_materials = grouped_materials
self.insertion_electrode = insertion_electrode
self.thermo = thermo
self.material = material
qq_ = {} if query is None else query
qq_.update({"structure_matched": True, "has_distinct_compositions": True})
super().__init__(
source=self.grouped_materials,
target=self.insertion_electrode,
query=qq_,
**kwargs,
)

def get_items(self):
""""""

@lru_cache(None)
def get_working_ion_entry(working_ion):
with self.thermo as store:
working_ion_docs = [*store.query({"chemsys": working_ion})]
best_wion = min(
working_ion_docs, key=lambda x: x["thermo"]["energy_per_atom"]
)
return best_wion

def modify_item(item):
self.logger.debug(
f"Looking for {len(item['grouped_ids'])} task_ids in the Thermo DB."
)
with self.thermo as store:
thermo_docs = [
*store.query(
{
"$and": [
{"task_id": {"$in": item["grouped_ids"]}},
{"_sbxn": {"$in": ["core"]}},
]
},
properties=["task_id", "_sbxn", "thermo"],
)
]

with self.material as store:
material_docs = [
*store.query(
{
"$and": [
{"task_id": {"$in": item["grouped_ids"]}},
{"_sbxn": {"$in": ["core"]}},
]
},
properties=["task_id", "structure"],
)
]

self.logger.debug(f"Found for {len(thermo_docs)} Thermo Documents.")
if len(item["ignored_species"]) != 1:
raise ValueError(
"Insertion electrode can only be defined for one working ion species"
)
working_ion_doc = get_working_ion_entry(item["ignored_species"][0])
return {
"task_id": item["task_id"],
"working_ion_doc": working_ion_doc,
"thermo_docs": thermo_docs,
"material_docs": material_docs,
}

yield from map(modify_item, super().get_items())

def unary_function(self, item):
"""
- Add volume information to each entry to create the insertion electrode document
- Add the host structure
- TODO parse the structures in the different materials documents and create a simple migration graph
"""
entries = [tdoc_["thermo"]["entry"] for tdoc_ in item["thermo_docs"]]
entries = list(map(ComputedEntry.from_dict, entries))
working_ion_entry = ComputedEntry.from_dict(
item["working_ion_doc"]["thermo"]["entry"]
)
working_ion = working_ion_entry.composition.reduced_formula
decomp_energies = {
d_["task_id"]: d_["thermo"]["e_above_hull"] for d_ in item["thermo_docs"]
}
mat_structures = {
mat_d_["task_id"]: Structure.from_dict(mat_d_["structure"])
for mat_d_ in item["material_docs"]
}

for ient in entries:
if mat_structures[ient.entry_id].composition != ient.composition:
raise RuntimeError(
f"In {item['task_id']}: the compositions for task {ient.entry_id} are matched between the StructureGroup DB and the Thermo DB "
)
ient.data["volume"] = mat_structures[ient.entry_id].volume
ient.data["decomposition_energy"] = decomp_energies[ient.entry_id]

failed = False
try:
ie = InsertionElectrode.from_entries(entries, working_ion_entry)
except Exception:
failed = True

if failed or ie.num_steps < 1:
res = {"task_id": item["task_id"], "has_step": False}
else:
res = {"task_id": item["task_id"], "has_step": True}
res.update(ie.get_summary_dict())
res["InsertionElectrode"] = ie.as_dict()
least_wion_ent = min(
entries, key=lambda x: x.composition.get_atomic_fraction(working_ion)
)
mdoc_ = next(
filter(
lambda x: x["task_id"] == least_wion_ent.entry_id,
item["material_docs"],
)
)
host_structure = Structure.from_dict(mdoc_["structure"])
res["host_structure"] = host_structure.as_dict()
return res

0 comments on commit 8cb5dc7

Please sign in to comment.