Skip to content

Commit

Permalink
Merge pull request #237 from materialsintelligence/aflow_adapter
Browse files Browse the repository at this point in the history
Add AFLOW functionality and improve graph algorithm
  • Loading branch information
clegaspi committed May 15, 2019
2 parents 759b193 + 00080fb commit f57e460
Show file tree
Hide file tree
Showing 26 changed files with 2,309 additions and 137 deletions.
128 changes: 81 additions & 47 deletions propnet/core/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@

import logging
from collections import defaultdict
from itertools import product
from itertools import product, chain, repeat
from chronic import Timer, timings, clear
from pandas import DataFrame
from collections import deque
import concurrent.futures
from functools import partial
from multiprocessing import cpu_count
import copy

import numpy as np
import networkx as nx

from propnet.core.materials import CompositeMaterial
Expand Down Expand Up @@ -66,11 +66,11 @@ class Graph(object):
"""

def __init__(self,
models: Dict[str, Model]=None,
composite_models: Dict[str, CompositeModel]=None,
symbol_types: Dict[str, Symbol]=None,
parallel: bool=False,
max_workers: int=None) -> None:
models: Dict[str, Model] = None,
composite_models: Dict[str, CompositeModel] = None,
symbol_types: Dict[str, Symbol] = None,
parallel: bool = False,
max_workers: int = None) -> None:
"""
Creates a graph instance.
Expand Down Expand Up @@ -569,7 +569,8 @@ def get_paths(self, start_property, end_property):
tree = self.required_inputs_for_property(end_property)
return tree.get_paths_from(start_property)

def get_degree_of_separation(self, start_property: Union[str, Symbol], end_property: Union[str, Symbol]) -> int:
def get_degree_of_separation(self, start_property: Union[str, Symbol],
end_property: Union[str, Symbol]) -> Union[int, None]:
"""
Returns the minimum number of models separating two properties.
Returns 0 if the start_property and end_property are equal.
Expand Down Expand Up @@ -642,30 +643,46 @@ def generate_input_sets(props, this_quantity_pool):
return product(*aggregated_symbols)

@staticmethod
def get_input_sets_for_model(model, fixed_quantity, quantity_pool):
def get_input_sets_for_model(model, new_quantities, old_quantities):
"""
Generates all of the valid input sets for a given model, a fixed
quantity, and a quantity pool from which to draw remaining properties
Args:
model (Model): model for which to evaluate valid input sets
fixed_quantity (Quantity): quantity which must be included
in all input sets
quantity_pool ({symbol: {Quantity}}): dict of quantity sets
keyed by symbol from which to draw additional quantities
for model inputs
new_quantities ({symbol: [Quantity]}): quantities generated
during the most recent iteration of the evaluation loop
old_quantities ({symbol: [Quantity]}): quantities generated
in previous iterations of the evaluation loop
Returns:
list of sets of input quantities for the model
"""
evaluation_lists = [c for c in model.evaluation_list
if fixed_quantity.symbol in c]

all_input_sets = []
for elist in evaluation_lists:
elist_without_fixed = elist.copy()
elist_without_fixed.remove(fixed_quantity.symbol)
input_sets = Graph.generate_input_sets(elist_without_fixed, quantity_pool)
for input_set in input_sets:
all_input_sets.append(list(input_set) + [fixed_quantity])
return all_input_sets
OLD = 0
NEW = 1
source_map = {OLD: old_quantities, NEW: new_quantities}
n_input_sets = 0

for symbols_to_evaluate in model.evaluation_list:
sources_by_symbol = []
for symbol in symbols_to_evaluate:
symbol_sources = []
if symbol in old_quantities.keys():
symbol_sources.append(OLD)
if symbol in new_quantities.keys():
symbol_sources.append(NEW)
sources_by_symbol.append(symbol_sources)
source_combinations = list(product(*sources_by_symbol))

for sources in source_combinations:
if all(s == OLD for s in sources):
continue
symbols_to_combine = [source_map[source][symbol]
for source, symbol in zip(sources, symbols_to_evaluate)]
n_input_sets += np.prod([len(v) for v in symbols_to_combine])
all_input_sets.append(product(*symbols_to_combine))

return chain.from_iterable(all_input_sets), n_input_sets

def generate_models_and_input_sets(self, new_quantities, quantity_pool):
"""
Expand All @@ -682,15 +699,24 @@ def generate_models_and_input_sets(self, new_quantities, quantity_pool):
sets, uses tuple so duplicate checking can be performed
"""
models_and_input_sets = []
n_total_input_sets = 0
new_qs_by_symbol = defaultdict(list)
for quantity in new_quantities:
for model in self._input_to_model[quantity.symbol]:
input_sets = self.get_input_sets_for_model(
model, quantity, quantity_pool)
models_and_input_sets += [
tuple([model] + sorted(list(input_set), key=lambda x: (x.symbol.name, x.value)))
for input_set in input_sets]
# Filter for duplicates
return set(models_and_input_sets)
new_qs_by_symbol[quantity.symbol].append(quantity)

candidate_models = set()
for symbol in new_qs_by_symbol.keys():
for model in self._input_to_model[symbol]:
candidate_models.add(model)

for model in candidate_models:
input_sets, n_input_sets = self.get_input_sets_for_model(
model, new_qs_by_symbol, quantity_pool)
if n_input_sets > 0:
models_and_input_sets.append(zip(repeat(model, n_input_sets), input_sets))
n_total_input_sets += n_input_sets

return chain.from_iterable(models_and_input_sets), n_total_input_sets

def derive_quantities(self, new_quantities, quantity_pool=None,
allow_model_failure=True, timeout=None):
Expand All @@ -701,7 +727,7 @@ def derive_quantities(self, new_quantities, quantity_pool=None,
Args:
new_quantities ([Quantity]): list of quantities which to
consider as new inputs to models
quantity_pool ({symbol: {Quantity}}): dict of quantity sets
quantity_pool ({symbol: [Quantity]}): dict of quantity lists
keyed by symbol from which to draw additional quantities
for model inputs
allow_model_failure (bool): True allows graph evaluation to
Expand All @@ -719,36 +745,42 @@ def derive_quantities(self, new_quantities, quantity_pool=None,
quantity pool
"""
# Update quantity pool
quantity_pool = quantity_pool or defaultdict(set)
for quantity in new_quantities:
quantity_pool[quantity.symbol].add(quantity)
quantity_pool = quantity_pool or defaultdict(list)

# Generate all of the models and input sets to be evaluated

# TODO: With the current implementation of the input set generation, etc.,
# parallelization does not provide any speed-up. Maybe we can look into
# improving the algorithm?
logger.info("Generating models and input sets for %s", new_quantities)
models_and_input_sets = self.generate_models_and_input_sets(

models_and_input_sets, n_input_sets = self.generate_models_and_input_sets(
new_quantities, quantity_pool)

input_tuples = [(v[0], v[1:]) for v in models_and_input_sets]
for quantity in new_quantities:
quantity_pool[quantity.symbol].append(quantity)

# input_tuples = [(v[0], v[1:]) for v in models_and_input_sets]

# This doesn't eliminate many and will be caught by cyclic filter
# after evaluation. This is usually only important if the model we'd be
# re-evaluating takes a long time, which the majority of our models do not
# take a long time...can we move this into the generation step or just before
# evaluation so we don't have to break the generator open?

inputs_to_calculate = list(filter(Graph._generates_noncyclic_output,
input_tuples))
# inputs_to_calculate = list(filter(Graph._generates_noncyclic_output,
# input_tuples))

# Evaluate model for each input set and add new valid quantities
if not self._parallel:
with Timer('_graph_evaluation'):
added_quantities, model_timings = Graph._run_serial(inputs_to_calculate,
added_quantities, model_timings = Graph._run_serial(models_and_input_sets,
allow_model_failure=allow_model_failure,
timeout=timeout)
else:
if self._executor is None:
self._executor = concurrent.futures.ProcessPoolExecutor(max_workers=self._max_workers)
with Timer('_graph_evaluation'):
added_quantities, model_timings = Graph._run_parallel(self._executor, self._max_workers,
inputs_to_calculate,
models_and_input_sets,
n_input_sets,
allow_model_failure=allow_model_failure,
timeout=timeout)

Expand Down Expand Up @@ -779,10 +811,11 @@ def _generates_noncyclic_output(input_set):
if set(model.map_variables_to_symbols(s['inputs'])) == input_symbols:
outputs = outputs.union(model.map_variables_to_symbols(s['outputs']))

model_in_all_trees = all(input_q.provenance.model_in_provenance_tree(model)
model_in_all_trees = all(input_q.provenance.model_is_in_tree(model) or
model == input_q.provenance.model
for input_q in inputs)

symbol_in_all_trees = all(all(input_q.provenance.symbol_in_provenance_tree(output)
symbol_in_all_trees = all(all(input_q.provenance.symbol_is_in_tree(output)
for input_q in inputs)
for output in outputs)

Expand Down Expand Up @@ -823,6 +856,7 @@ def _run_serial(models_and_input_sets, allow_model_failure=True, timeout=None):

@staticmethod
def _run_parallel(executor, n_workers, models_and_input_sets,
n_total_input_sets,
allow_model_failure=True,
timeout=None):
"""
Expand All @@ -841,7 +875,7 @@ def _run_parallel(executor, n_workers, models_and_input_sets,
func = partial(Graph._evaluate_model,
allow_failure=allow_model_failure,
timeout=timeout)
chunk_size = int(len(models_and_input_sets) / n_workers) + 1
chunk_size = min(int(n_total_input_sets / n_workers) + 1, 200)
results = executor.map(func, models_and_input_sets, chunksize=chunk_size)
outputs = []
model_timings = []
Expand Down
8 changes: 5 additions & 3 deletions propnet/core/materials.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

from collections import defaultdict
from itertools import chain
import warnings

from propnet.core.quantity import QuantityFactory, NumQuantity
from propnet.core.symbols import Symbol

# noinspection PyUnresolvedReferences
import propnet.symbols
from propnet.core.registry import Registry
import logging

logger = logging.getLogger(__name__)


class Material(object):
Expand Down Expand Up @@ -89,8 +91,8 @@ def add_default_quantities(self):
new_syms -= set(self._quantities_by_symbol.keys())
for sym in new_syms:
quantity = QuantityFactory.from_default(sym)
warnings.warn("Adding default {} quantity with value {}".format(
sym, quantity))
logger.warning("Adding default {} quantity with value {}".format(
sym, quantity))
self.add_quantity(quantity)

def remove_symbol(self, symbol):
Expand Down
56 changes: 37 additions & 19 deletions propnet/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,10 +299,37 @@ def map_variables_to_symbols(self, variables):
return remap(variables, getattr(self, "variable_symbol_map", {}))

def _convert_inputs_for_plugin(self, inputs):
return {k: v.magnitude for k, v in inputs.items()}
converted_inputs = {}
for var, quantity in inputs.items():
converted_inputs[var] = quantity.value
if self.variable_unit_map.get(var) is not None:
# Units are being assumed by equation and we need to strip them
# or pint might get angry if it has to add or subtract quantities
# with unmatched dimensions
converted_inputs[var] = quantity.to(self.variable_unit_map[var]).magnitude
return converted_inputs

def _convert_outputs_from_plugin(self, outputs):
return outputs
converted_outputs = {}
for var, quantity in outputs.items():
symbol = self._variable_symbol_map[var]
unit = self.variable_unit_map.get(var) or Registry("units").get(symbol)
if unit is None:
converted_outputs[var] = quantity
else:
if isinstance(quantity, ureg.Quantity):
try:
converted_outputs[var] = quantity.to(unit)
except DimensionalityError:
# If the equation multiplies by constants with dimensions,
# we'll end up with an output with incorrect dimensions.
# This forces the unit conversion until we can fix inclusion of constants
# TODO: Fix when we add support for constants with dimensions
converted_outputs[var] = ureg.Quantity(quantity.magnitude,
units=unit)
else:
converted_outputs[var] = ureg.Quantity(quantity, units=unit)
return converted_outputs

def evaluate(self, symbol_quantity_dict, allow_failure=True):
"""
Expand Down Expand Up @@ -355,8 +382,10 @@ def evaluate(self, symbol_quantity_dict, allow_failure=True):

# Plug in and check constraints
try:
with PrintToLogger():
out: dict = self.plug_in(input_variable_value_dict)
with PrintToLogger(level="DEBUG") as plog:
with np.errstate(all='log'):
np.seterrcall(plog)
out: dict = self.plug_in(input_variable_value_dict)
except Exception as err:
if allow_failure:
return {"successful": False,
Expand All @@ -367,15 +396,15 @@ def evaluate(self, symbol_quantity_dict, allow_failure=True):
return {"successful": False,
"message": "Constraints not satisfied"}

provenance = ProvenanceElement(
model=self.name, inputs=list(input_variable_quantity_dict.values()),
source="propnet")

out = self._convert_outputs_from_plugin(out)
out = self.map_variables_to_symbols(out)

symbol_unit_map = self.map_variables_to_symbols(self.variable_unit_map)
for symbol, value in out.items():
provenance = ProvenanceElement(
model=self.name, inputs=list(input_variable_quantity_dict.values()),
source="propnet")

try:
quantity = QuantityFactory.create_quantity(
symbol, value,
Expand Down Expand Up @@ -837,17 +866,6 @@ def __setstate__(self, state):
self.__dict__.update(state)
self._generate_lambdas()

def _convert_inputs_for_plugin(self, inputs):
converted_inputs = {}
for var, quantity in inputs.items():
converted_inputs[var] = quantity.value
if var in self.variable_unit_map.keys():
# Units are being assumed by equation and we need to strip them
# or pint might get angry if it has to add or subtract quantities
# with unmatched dimensions
converted_inputs[var] = quantity.to(self.variable_unit_map[var]).magnitude
return converted_inputs

def _convert_outputs_from_plugin(self, outputs):
converted_outputs = {}
for var, quantity in outputs.items():
Expand Down
Loading

0 comments on commit f57e460

Please sign in to comment.