In [None]:
import pandas as pd
import os


def read_powcod_sql():
    import sqlite3
    con = sqlite3.connect('cod2205.sq')
    df = pd.read_sql_query('SELECT * FROM id',con)
    con2 = sqlite3.connect('cod2205.sq.info')
    df2 = pd.read_sql_query('SELECT * FROM info',con2)

    df.id = pd.to_numeric(df.id)
    df2.id = pd.to_numeric(df2.id)

    combined = df.merge(df2, left_on="id", right_on="id")
    return combined

if os.path.isfile("combined.pkl"):
    combined = pd.read_pickle("combined.pkl")
else:
    combined = read_powcod_sql()
    print(combined.columns)
    combined = combined[["spacegroup_x", "a", "b", "c", "alpha", "beta",
                    "gamma", "volume", "h", "k", "l", "nd", "dvalue",
                    "intensita", "type", "id"]]
    combined.to_pickle("combined.pkl")

In [None]:
min_volume = 400
max_volume = 4000
max_cell_length = 50
min_cell_angle = 60
max_cell_angle = 120
min_data_angle = 4
max_data_angle = 44
wavelength = 1.54056
min_detectable_peaks_in_range = 10

In [None]:
combined = combined[(combined.volume <= max_volume) & (combined.volume >= min_volume)]
combined = combined[(combined.a != 0) & (combined.b != 0) & (combined.c != 0)]
combined = combined[(combined.a <= max_cell_length) & (combined.b <= max_cell_length) & (combined.c <= max_cell_length)]
combined = combined[(combined.alpha >= min_cell_angle) & (combined.alpha <= max_cell_angle)]
combined = combined[(combined.beta >= min_cell_angle) & (combined.beta <= max_cell_angle)]
combined = combined[(combined.gamma >= min_cell_angle) & (combined.gamma <= max_cell_angle)]
combined = combined[(combined.nd >= min_detectable_peaks_in_range)]
combined.describe()

In [None]:
from pymatgen.symmetry import groups
import numpy as np

symbol_to_num = {}
for i, s in enumerate(np.unique(combined.spacegroup_x.to_numpy())):
    try:
        g = groups.SpaceGroup(s)
        symbol_to_num[s] = g.int_number
    except:
        symbol_to_num[s] = -1

def label_SGs(sg):
    return symbol_to_num[sg]

combined["sg_number"] = combined.spacegroup_x.apply(label_SGs)
combined = combined[combined.sg_number > 0]
combined.describe()

In [None]:
d_from_tt = lambda x: wavelength/(2*np.sin(np.deg2rad(x)/2))
tt_from_d = lambda x: 2*np.rad2deg(np.arcsin(wavelength/(2*x)))
max_d = d_from_tt(min_data_angle)
min_d = d_from_tt(max_data_angle)
print("Wavelength:", wavelength)
print(f"Minimum angle = {min_data_angle}  : Maximum d-spacing = {max_d:.3f}")
print(f"Maximum angle = {max_data_angle} : Minimum d-spacing = {min_d:.3f}")

In [None]:
def d_spacing_limits(x, min_d_spacing=1, max_d_spacing=100):
    d = list(filter(None, x.strip().split(",")))
    d = [float(i) for i in d]
    if len(d) == 0:
        return False
    else:
        if d[-1] > min_d_spacing:
            return False
        if d[0] > max_d_spacing:
            return False
        return True

combined["d_spacing_filter"] = combined.dvalue.apply(d_spacing_limits, args=(min_d, max_d))

In [None]:
combined = combined[combined["d_spacing_filter"] == True]
combined.describe()

In [None]:
def n_peaks_in_range(dspacing, intensity, min_intensity=5):
    dspacing = list(filter(None, dspacing.strip().split(",")))
    dspacing = np.array([float(i) for i in dspacing])
    intensity = list(filter(None, intensity.strip().split(",")))
    intensity = np.array([float(i) for i in intensity])
    intensity = intensity[dspacing >= min_d]
    return (intensity > min_intensity).sum()

combined["n_detectable_peaks_in_range"] = combined.apply(lambda x: n_peaks_in_range(x.dvalue, x.intensita), axis=1)
combined["n_detectable_peaks_in_range"].describe()

In [None]:
combined = combined[combined.n_detectable_peaks_in_range >= min_detectable_peaks_in_range]
combined.describe()

In [None]:
unit_cell = combined[["a","b","c","alpha","beta","gamma"]].to_numpy()
crystal_systems, crystal_systems_numeric = np.unique(combined.type.to_numpy(), return_inverse=True)
space_group_number = combined.sg_number.to_numpy()
cod_id = combined.id.to_numpy()
print(crystal_systems)

In [None]:
def get_array(x, dtype=np.float32):
    data = np.zeros(500, dtype=dtype)
    x = list(filter(None, x.strip().split(",")))
    x = np.array(x, dtype=dtype)
    data[:len(x)] = x
    return data

h = combined.h.apply(get_array, args=(np.int64,))
k = combined.k.apply(get_array, args=(np.int64,))
l = combined.l.apply(get_array, args=(np.int64,))
intensities = combined.intensita.apply(get_array, args=(np.float32,))

intensities = np.vstack(intensities)
h = np.vstack(h)
k = np.vstack(k)
l = np.vstack(l)
hkl = np.dstack([h,k,l])

In [None]:
import json
base_name = f"{min_data_angle}-{max_data_angle}-CuKa1-data_{max_volume}_"
config = {}
config["base_name"] = base_name
config["min_volume"] = min_volume
config["max_volume"] = max_volume
config["max_cell_length"] = max_cell_length
config["min_cell_angle"] = min_cell_angle
config["max_cell_angle"] = max_cell_angle
config["min_data_angle"] = min_data_angle
config["max_data_angle"] = max_data_angle
config["wavelength"] = wavelength
config["min_detectable_peaks_in_range"] = min_detectable_peaks_in_range
config["crystal_systems"] = [x[0]+" = "+str(x[1]) for x in zip(crystal_systems, np.arange(len(crystal_systems)))]

with open("data_config.json", "w") as f:
    json.dump(config, f, indent=4)
np.save(base_name+"unit_cell.npy", unit_cell)
np.save(base_name+"crystal_systems_numeric.npy", crystal_systems_numeric)
np.save(base_name+"space_group_number.npy", space_group_number)
np.save(base_name+"hkl.npy", hkl)
np.save(base_name+"intensities.npy", intensities)
np.save(base_name+"cod_id.npy", cod_id)