-
Notifications
You must be signed in to change notification settings - Fork 62
/
electrodes.py
391 lines (348 loc) · 13.9 KB
/
electrodes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
import math
import operator
from collections import namedtuple
from datetime import datetime
from functools import lru_cache
from itertools import chain, groupby
from pprint import pprint
from typing import Any, Dict, Iterable, List
from maggma.builders import Builder, MapBuilder
from maggma.stores import MongoStore
from monty.json import MontyEncoder
from numpy import unique
from pymatgen.analysis.structure_matcher import ElementComparator, StructureMatcher
from pymatgen.apps.battery.insertion_battery import InsertionElectrode
from pymatgen.core import Composition, Structure
from pymatgen.entries.computed_entries import ComputedEntry, ComputedStructureEntry
from emmet.core.electrode import InsertionElectrodeDoc
from emmet.core.structure_group import StructureGroupDoc
from emmet.core.utils import jsanitize
__author__ = "Jimmy Shen"
__email__ = "jmmshn@lbl.gov"
def s_hash(el):
return el.data["comp_delith"]
# MatDoc = namedtuple("MatDoc", ["material_id", "structure", "formula_pretty", "framework"])
REDOX_ELEMENTS = [
"Ti",
"V",
"Cr",
"Mn",
"Fe",
"Co",
"Ni",
"Cu",
"Nb",
"Mo",
"Sn",
"Sb",
"W",
"Re",
"Bi",
"C",
"Hf",
]
WORKING_IONS = ["Li", "Be", "Na", "Mg", "K", "Ca", "Rb", "Sr", "Cs", "Ba"]
MAT_PROPS = [
"structure",
"material_id",
"formula_pretty",
]
sg_fields = ["number", "hall_number", "international", "hall", "choice"]
def generic_groupby(list_in, comp=operator.eq):
"""
Group a list of unsortable objects
Args:
list_in: A list of generic objects
comp: (Default value = operator.eq) The comparator
Returns:
[int] list of labels for the input list
"""
list_out = [None] * len(list_in)
label_num = 0
for i1, ls1 in enumerate(list_out):
if ls1 is not None:
continue
list_out[i1] = label_num
for i2, ls2 in list(enumerate(list_out))[i1 + 1 :]:
if comp(list_in[i1], list_in[i2]):
if list_out[i2] is None:
list_out[i2] = list_out[i1]
else:
list_out[i1] = list_out[i2]
label_num -= 1
label_num += 1
return list_out
class StructureGroupBuilder(Builder):
def __init__(
self,
materials: MongoStore,
sgroups: MongoStore,
working_ion: str,
query: dict = None,
ltol: float = 0.2,
stol: float = 0.3,
angle_tol: float = 5.0,
check_newer: bool = True,
**kwargs,
):
"""
Aggregate materials entries into sgroups that are topotactically similar to each other.
This is an incremental builder that makes ensures that each materials id belongs to one StructureGroupDoc document
Args:
materials (Store): Store of materials documents that contains the structures
sgroups (Store): Store of grouped material ids
query (dict): dictionary to limit materials to be analyzed ---
only applied to the materials when we need to group structures
the phase diagram is still constructed with the entire set
"""
self.materials = materials
self.sgroups = sgroups
self.working_ion = working_ion
self.query = query if query else {}
self.ltol = ltol
self.stol = stol
self.angle_tol = angle_tol
self.check_newer = check_newer
super().__init__(sources=[materials], targets=[sgroups], **kwargs)
def prechunk(self, number_splits: int) -> Iterable[Dict]:
"""
TODO can implement this for distributed runs by adding filters
"""
pass
def get_items(self):
"""
Summary of the steps:
- query the materials database for different chemical systems that satisfies the base query
"contains redox element and working ion"
- Get the full chemsys list of interest
- The main loop is over all these chemsys. within the main loop:
- get newest timestamp for the material documents (max_mat_time)
- get the oldest timestamp for the target documents (min_target_time)
- if min_target_time is < max_mat_time then nuke all the target documents
"""
other_wions = list(set(WORKING_IONS) - {self.working_ion})
# All potentially interesting chemsys must contain the working ion
base_query = {
"$and": [
self.query.copy(),
{"elements": {"$in": REDOX_ELEMENTS}},
{"elements": {"$in": [self.working_ion]}},
{"elements": {"$nin": other_wions}},
]
}
self.logger.debug(f"Initial Chemsys QUERY: {base_query}")
# get a chemsys that only contains the working ion since the working ion
# must be present for there to be voltage steps
all_chemsys = self.materials.distinct("chemsys", criteria=base_query)
# Contains the working ion but not ONLY the working ion
all_chemsys = [
*filter(
lambda x: self.working_ion in x and len(x) > 1,
[chemsys_.split("-") for chemsys_ in all_chemsys],
)
]
self.logger.debug(
f"Performing initial checks on {len(all_chemsys)} chemical systems containing redox elements with or without the Working Ion."
)
self.total = len(all_chemsys)
for chemsys_l in all_chemsys:
chemsys = "-".join(sorted(chemsys_l))
chemsys_wo = "-".join(sorted(set(chemsys_l) - {self.working_ion}))
chemsys_query = {
"$and": [
{"chemsys": {"$in": [chemsys_wo, chemsys]}},
self.query.copy(),
]
}
self.logger.debug(f"QUERY: {chemsys_query}")
all_mats_in_chemsys = list(
self.materials.query(
criteria=chemsys_query,
properties=MAT_PROPS + [self.materials.last_updated_field],
)
)
self.logger.debug(
f"Found {len(all_mats_in_chemsys)} materials in {chemsys_wo}"
)
if self.check_newer:
all_target_docs = list(
self.sgroups.query(
criteria={"chemsys": chemsys},
properties=[
"group_id",
self.sgroups.last_updated_field,
"material_ids",
],
)
)
self.logger.debug(
f"Found {len(all_target_docs)} Grouped documents in {chemsys_wo}"
)
mat_times = [
mat_doc[self.materials.last_updated_field]
for mat_doc in all_mats_in_chemsys
]
max_mat_time = max(mat_times, default=datetime.min)
self.logger.debug(
f"The newest material doc was generated at {max_mat_time}."
)
target_times = [
g_doc[self.materials.last_updated_field]
for g_doc in all_target_docs
]
min_target_time = min(target_times, default=datetime.max)
self.logger.debug(
f"The newest GROUP doc was generated at {min_target_time}."
)
mat_ids = set(
[mat_doc["material_id"] for mat_doc in all_mats_in_chemsys]
)
# If any material id is missing or if any material id has been updated
target_ids = set()
for g_doc in all_target_docs:
target_ids |= set(g_doc["material_ids"])
self.logger.debug(
f"There are {len(mat_ids)} material ids in the source database vs {len(target_ids)} in the target database."
)
if mat_ids == target_ids and max_mat_time < min_target_time:
yield None
elif len(target_ids) == 0:
self.logger.info(
f"No documents in chemsys {chemsys} in the target database."
)
else:
self.logger.info(
f"Nuking all {len(target_ids)} documents in chemsys {chemsys} in the target database."
)
self._remove_targets(list(target_ids))
else:
yield {"chemsys": chemsys, "materials": all_mats_in_chemsys}
def update_targets(self, items: List):
items = list(filter(None, chain.from_iterable(items)))
if len(items) > 0:
self.logger.info("Updating {} sgroups documents".format(len(items)))
for struct_group_dict in items:
struct_group_dict[self.sgroups.last_updated_field] = datetime.utcnow()
self.sgroups.update(docs=items, key=["material_id"])
else:
self.logger.info("No items to update")
def _entry_from_mat_doc(self, mdoc):
# Note since we are just structure grouping we don't need to be careful with energy or correction
# All of the energy analysis is left to other builders
d_ = {
"entry_id": mdoc["material_id"],
"structure": mdoc["structure"],
"energy": -math.inf,
"correction": -math.inf,
}
return ComputedStructureEntry.from_dict(d_)
def process_item(self, item: Any) -> Any:
if item is None:
return None
entries = [*map(self._entry_from_mat_doc, item["materials"])]
s_groups = StructureGroupDoc.from_ungrouped_structure_entries(
entries=entries,
ignored_species=[self.working_ion],
ltol=self.ltol,
stol=self.stol,
angle_tol=self.angle_tol,
)
return [sg.dict() for sg in s_groups]
def _remove_targets(self, rm_ids):
self.sgroups.remove_docs({"material_ids": {"$in": rm_ids}})
class InsertionElectrodeBuilder(MapBuilder):
def __init__(
self,
grouped_materials: MongoStore,
insertion_electrode: MongoStore,
thermo: MongoStore,
query: dict = None,
**kwargs,
):
self.grouped_materials = grouped_materials
self.insertion_electrode = insertion_electrode
self.thermo = thermo
qq_ = {} if query is None else query
qq_.update({"structure_matched": True, "has_distinct_compositions": True})
super().__init__(
source=self.grouped_materials,
target=self.insertion_electrode,
query=qq_,
**kwargs,
)
def get_items(self):
""""""
@lru_cache()
def get_working_ion_entry(working_ion):
with self.thermo as store:
working_ion_docs = [*store.query({"chemsys": working_ion})]
best_wion = min(working_ion_docs, key=lambda x: x["energy_per_atom"])
return best_wion
def modify_item(item):
self.logger.debug(
f"Looking for {len(item['material_ids'])} material_id in the Thermo DB."
)
with self.thermo as store:
thermo_docs = [
*store.query(
{
"$and": [
{"material_id": {"$in": item["material_ids"]}},
]
},
properties=[
"material_id",
"_sbxn",
"thermo",
"entries",
"energy_type",
"energy_above_hull",
],
)
]
self.logger.debug(f"Found for {len(thermo_docs)} Thermo Documents.")
if len(item["ignored_species"]) != 1:
raise ValueError(
"Insertion electrode can only be defined for one working ion species"
)
working_ion_doc = get_working_ion_entry(item["ignored_species"][0])
return {
"material_id": item["material_id"],
"working_ion_doc": working_ion_doc,
"working_ion": item["ignored_species"][0],
"thermo_docs": thermo_docs,
}
yield from map(modify_item, super().get_items())
def unary_function(self, item):
"""
- Add volume information to each entry to create the insertion electrode document
- Add the host structure
"""
entries = [
tdoc_["entries"][tdoc_["energy_type"]] for tdoc_ in item["thermo_docs"]
]
entries = list(map(ComputedStructureEntry.from_dict, entries))
working_ion_entry = ComputedEntry.from_dict(
item["working_ion_doc"]["entries"][item["working_ion_doc"]["energy_type"]]
)
working_ion = working_ion_entry.composition.reduced_formula
decomp_energies = {
d_["material_id"]: d_["energy_above_hull"] for d_ in item["thermo_docs"]
}
least_wion_ent = min(
entries, key=lambda x: x.composition.get_atomic_fraction(working_ion)
)
host_structure = least_wion_ent.structure.copy()
host_structure.remove_species([item["working_ion"]])
for ient in entries:
ient.data["volume"] = ient.structure.volume
ient.data["decomposition_energy"] = decomp_energies[ient.entry_id]
ie = InsertionElectrodeDoc.from_entries(
grouped_entries=entries,
working_ion_entry=working_ion_entry,
task_id=item["material_id"],
host_structure=host_structure,
)
if ie is None:
return {"failed_reason": "unable to create InsertionElectrode document"}
return jsanitize(ie.dict())