Skip to content

Commit

Permalink
Completed support for CoulombMartrix without preprocessing.
Browse files Browse the repository at this point in the history
  • Loading branch information
muammar committed Jan 14, 2020
1 parent 3dce822 commit 3bf7e1a
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 69 deletions.
9 changes: 8 additions & 1 deletion ml4chem/features/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ def restack_image(self, index, image, scaled_feature_space, svm):

scaled = scaled_feature_space[index][j]

if isinstance(scaled, tuple):
symbol, scaled = scaled

if isinstance(scaled, np.ndarray) is False:
scaled = scaled.compute()

Expand Down Expand Up @@ -81,7 +84,11 @@ def restack_atom(self, image_index, atom, scaled_feature_space):
The hashed key image and its corresponding features.
"""

symbol = atom.symbol
features = scaled_feature_space[image_index][atom.index]

if isinstance(features, tuple):
symbol, features = features
else:
symbol = atom.symbol

return symbol, features
119 changes: 51 additions & 68 deletions ml4chem/features/coulomb_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class CoulombMatrix(AtomisticFeatures, CoulombMatrixDscribe):
Number of data points per batch to use for training. Default is None.
scheduler : str
The scheduler to be used with the dask backend.
overwrite : bool
If overwrite is set to True, ml4chem will not try to load existing
databases. Default is True.
save_preprocessor : str
Save preprocessor to file.
Notes
-----
Expand All @@ -46,16 +51,27 @@ def name(cls):
"""Returns name of class"""
return cls.NAME

def __init__(self, preprocessor=None, batch_size=None, filename="features.db", scheduler="distributed", **kwargs):
def __init__(
self,
preprocessor=None,
batch_size=None,
filename="features.db",
scheduler="distributed",
save_preprocessor="ml4chem",
overwrite=True,
**kwargs
):
super(CoulombMatrix, self).__init__()

CoulombMatrixDscribe.__init__(self, permutation='none', flatten=False, **kwargs)
CoulombMatrixDscribe.__init__(self, permutation="none", flatten=False, **kwargs)

self.batch_size = batch_size
self.filename = filename
self.preprocessor = preprocessor
self.scheduler = scheduler

self.overwrite = overwrite
self.save_preprocessor = save_preprocessor

def calculate(self, images=None, purpose="training", data=None, svm=False):
"""Calculate the features per atom in an atoms objects
Expand Down Expand Up @@ -132,32 +148,29 @@ def calculate(self, images=None, purpose="training", data=None, svm=False):
logger.info("Unique chemical elements: {}".format(unique_element_symbols))

# we make the features
if self.preprocessor is not None:
preprocessor = Preprocessing(self.preprocessor, purpose=purpose)
preprocessor.set(purpose=purpose)
preprocessor = Preprocessing(self.preprocessor, purpose=purpose)
preprocessor.set(purpose=purpose)

# We start populating computations to get atomic features.
logger.info("")
logger.info("Embarrassingly parallel computation of atomic features...")

stacked_features = []
atoms_index_map = [] # This list is used to reconstruct images from atoms.
atoms_symbols_map = [] # This list is used to reconstruct images from atoms.

if self.batch_size is None:
self.batch_size = data.get_total_number_atoms()

chunks = get_chunks(images, self.batch_size, svm=svm)

ini = end = 0
for chunk in chunks:
images_ = OrderedDict(chunk)
intermediate = []

for image in images_.items():
key, image = image
end = ini + len(image)
atoms_index_map.append(list(range(ini, end)))
ini = end
atoms_symbols_map.append(image.get_chemical_symbols())
# Use .create() class method from dscribe.
_features = dask.delayed(self.create)(image)
intermediate.append(_features)

Expand All @@ -177,67 +190,21 @@ def calculate(self, images=None, purpose="training", data=None, svm=False):
logger.info("")

if self.preprocessor is not None:
logger.info("Converting features to dask array...")
symbol = data.unique_element_symbols[purpose][0]
sample = np.zeros(len(self.GP[symbol]))
# dim = (len(stacked_features), len(sample))

stacked_features = [
dask.array.from_delayed(lazy, dtype=float, shape=sample.shape)
for lazy in stacked_features
]

layout = {0: tuple(len(i) for i in atoms_index_map), 1: -1}
# stacked_features = dask.array.stack(stacked_features, axis=0).rechunk(layout)
stacked_features = dask.array.stack(stacked_features, axis=0).rechunk(
layout
)

logger.info(
"Shape of array is {} and chunks {}.".format(
stacked_features.shape, stacked_features.chunks
)
)

# To take advantage of dask_ml we need to convert our numpy array
# into a dask array.
raise NotImplementedError

else:
scaled_feature_space = []

# Note that dask_ml by default convert the output of .fit
# in a concrete value.
if purpose == "training":
stacked_features = preprocessor.fit(
stacked_features, scheduler=self.scheduler
)
else:
stacked_features = preprocessor.transform(stacked_features)

atoms_index_map = [client.scatter(indices) for indices in atoms_index_map]
atoms_symbols_map = [client.scatter(chunk) for chunk in atoms_symbols_map]
stacked_features = client.scatter(stacked_features, broadcast=True)

logger.info("Stacking features using atoms index map...")

for indices in atoms_index_map:
for image_index, symbols in enumerate(atoms_symbols_map):
features = client.submit(
self.stack_features, *(indices, stacked_features)
self.stack_features, *(symbols, image_index, stacked_features)
)

# features = self.stack_features(indices, stacked_features)

scaled_feature_space.append(features)

# scaled_feature_space = client.gather(scaled_feature_space)

else:
feature_space = []
atoms_index_map = [client.scatter(chunk) for chunk in atoms_index_map]
scaled_feature_space = client.gather(scaled_feature_space)

for indices in atoms_index_map:
features = client.submit(
self.stack_features, *(indices, stacked_features)
)
feature_space.append(features)
# Clean
del stacked_features

Expand All @@ -250,30 +217,39 @@ def calculate(self, images=None, purpose="training", data=None, svm=False):

for i, image in enumerate(images.items()):
restacked = client.submit(
self.restack_image, *(i, image, None, scaled_feature_space, svm)
self.restack_image, *(i, image, scaled_feature_space, svm)
)
feature_space.append(restacked)

# image = (hash, ase_image) -> tuple
for atom in image[1]:
reference_space.append(
self.restack_atom(i, atom, scaled_feature_space)
)

feature_space.append(restacked)

reference_space = dask.compute(*reference_space, scheduler=self.scheduler)

elif svm is False and purpose == "training":
for i, image in enumerate(images.items()):
restacked = client.submit(
self.restack_image, *(i, image, scaled_feature_space, svm)
)
feature_space.append(restacked)

else:
try:
for i, image in enumerate(images.items()):
restacked = client.submit(
self.restack_image, *(i, image, None, scaled_feature_space, svm)
self.restack_image, *(i, image, scaled_feature_space, svm)
)
feature_space.append(restacked)

except UnboundLocalError:
# scaled_feature_space does not exist.
for i, image in enumerate(images.items()):
restacked = client.submit(
self.restack_image, *(i, image, feature_space, None, svm)
self.restack_image, *(i, image, feature_space, svm)
)
feature_space.append(restacked)

Expand Down Expand Up @@ -317,6 +293,13 @@ def calculate(self, images=None, purpose="training", data=None, svm=False):
self.feature_space = feature_space
return self.feature_space

def stack_features(self, symbols, image_index, stacked_features):
"""Stack features """

features = list(zip(symbols, stacked_features[image_index].compute()))

return features

def to_pandas(self):
"""Convert features to pandas DataFrame"""
return pd.DataFrame.from_dict(self.feature_space, orient="index")
return pd.DataFrame.from_dict(self.feature_space, orient="index")

0 comments on commit 3bf7e1a

Please sign in to comment.