In [None]:
import os, subprocess, ast
from munch import Munch
from materialbuilder.plotting import CPK_COLORS
from materialbuilder import graphbuilder, matbuilder, topologies, transformations
from rdkit.Chem import AllChem as Chem
import torch
import pandas as pd
import numpy as np

# Job Details

### I wouldnt look at the comments too much yet unless you want to take a huge deepdive into the code
### *job name* will be used to create the appropriate directory trees for all training. At the end of this notebook, you can look at AuTopologyPipeline/train/job_name to see the created files, in this case AuTopologyPipeline/train/2311_opls_gLiTfsi_Artur
### *template smiles* is how you tell AuTopology what are the species you will be getting parameters for

### *node_type* will define how many graph convolutions for atomic environment differentiation (how many covalent nearest neighbors you will consider)


In [None]:
job_details = Munch()
job_details.job_name = "240904_opls_test"
job_details.device = "cpu"

# glyme4.TFSI.Li
job_details.template_smiles = (
    "COCCOCCOCCOCCOC.O=S(=O)([N-]S(=O)(=O)C(F)(F)F)C(F)(F)F.[Li+]"
)
job_details.transformations = [
    ["OneHotEncoding", "node", "atomic_num", "one_hot_atomic_num"],
    ["GraphDistinctNodes", "one_hot_atomic_num", "one_hot_graph_distinct_r1", 1],
    ["GraphDistinctNodes", "one_hot_atomic_num", "one_hot_graph_distinct_r2", 2],
    ["GraphDistinctNodes", "one_hot_atomic_num", "one_hot_graph_distinct_r3", 3],
    ["GraphDistinctNodes", "one_hot_atomic_num", "one_hot_graph_distinct_r4", 4],
]

job_details.base_node_type = "one_hot_atomic_num"
job_details.node_type = "one_hot_graph_distinct_r3"

if "pcff" in job_details.job_name:
    job_details.forcefield_class2 = True
else:
    job_details.forcefield_class2 = False

job_details.terms = ["bond", "angle", "dihedral", "improper", "pair"]
job_details.use_1_4_pairs = True
job_details.pair_cutoff = None

In [None]:
job_details.WORKDIR = os.path.join(
    os.path.abspath('.'), '../train', job_details.job_name, 'template')
if not os.path.exists(job_details.WORKDIR):
    os.makedirs(job_details.WORKDIR)

# Create Template Dataset (Part 1)

In [None]:
template_dataset = graphbuilder.GraphDataset()
ensemble = graphbuilder.Ensemble()
geometry, rdkit_mol = matbuilder.geometry_from_smiles(
    smiles=job_details.template_smiles, dim=2, return_rdkit_mol=True)
#add geometry to ensemble.parent_graph and ensemble.graphs if first geom, else add only to ensemble.child_graphs 
#(do not add children_graphs to ensembel.graphs)
ensemble.Add(geometry)
#add ensemble to dataset, which will make template_dataset.container_batch have the parent_graph, and self.ensembles 
#have the ensemble at index ensemble.parent_graph['prop']['ensemble']
template_dataset.AddEnsemble(ensemble)
#datset.Close() is just a call to dataset.container_batch.Close(), which is just a call to dataset.container_batch._finalize()
#this calls graph_base._initialize_data, which primes the dataset.container_batch._data to have 'node' and other keys
#then, it calls graph_base._add_data_rom_graph, which will fill the container_batch._data with the info from all the container_batch.graphs
#(which is only ever 1 long??)
#finally, it calls graph_base._transform_data(join=True), which will cat all the lists in _data together 
template_dataset.Close()

# Write to MOL file using rdkit_mol

In [None]:
path = os.path.join(job_details.WORKDIR, 'template.mol')
file = open(path, 'w')
file.write(Chem.MolToMolBlock(rdkit_mol))
file.close()

# Use OpenBabel to convert MOL to PDB

In [None]:
mol_path = os.path.join(job_details.WORKDIR, 'template.mol')
pdb_path = os.path.join(job_details.WORKDIR, 'template.pdb')
subprocess.call(['obabel', mol_path, '-O', pdb_path]);

# Create Template Dataset (Part 2)

In [None]:
# ## template_dataset = graphbuilder.GraphDataset()
# #now we are adding the geometery with dataset.AddGraph(geometry) rather than ensemble.Add(geometry) as before.
# #dataset.AddGraph(geometry) will first add a graph._data['prop']['ensemble'] info to specify which ensemble it belongs to
# #then the graph is added to the container_batch using container_batch.Add(geometry). this adds the graph to
# #container_batch.graphs.append(geometry), which up to this point only contains the single parentgraph, and no others added by ensemble.Add(geometry)
# template_dataset.AddGraph(geometry)
# #this close will cat the info of these new geometries from dataset.AddGraph with the parent graph info using _transform_data(join=True)
# template_dataset.Close()

for trans in job_details.transformations:  # "one_hot_graph_distinct_r2"
    # this will create the different node types based on graph neighborhood. this runs transformation.ApplyTo(template_dataset.container_batch)
    template_dataset.AddTransformation(
        eval("transformations." + trans[0])(*tuple(trans[1:])), job_details.device
    )
if "base_node_type" in job_details.keys():
    template_dataset.DefineBaseNodeTypes(job_details.base_node_type)
if "node_type" in job_details.keys():
    template_dataset.DefineNodeTypes(job_details.node_type)
# these dataset.AddTopology() calls create the topology types based on node type using Graph.AddTopology()
# this uses each Topology.apply_to()
if "bond" in job_details.terms:
    template_dataset.AddTopology(topologies.BondTopology(), job_details.device)
if "angle" in job_details.terms:
    template_dataset.AddTopology(topologies.AngleTopology(), job_details.device)
if "dihedral" in job_details.terms:
    template_dataset.AddTopology(topologies.DihedralTopology(), job_details.device)
if "improper" in job_details.terms:
    template_dataset.AddTopology(topologies.ImproperTopology(), job_details.device)
if "pair" in job_details.terms:
    template_dataset.AddTopology(
        topologies.PairTopology(
            job_details.use_1_4_pairs,
            job_details.forcefield_class2,
            job_details.pair_cutoff,
        ),
        job_details.device,
    )

In [None]:
path = os.path.join(job_details.WORKDIR, 'template.py')
torch.save(template_dataset, path)

# Create Custom Parameter File

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from mendeleev import element as Element

In [None]:
xyz = template_dataset.container_batch._data['node']['xyz']
bonds = template_dataset.container_batch._data['bond']['index']
types = template_dataset.container_batch._data['node']['type'].view(-1)
z = template_dataset.container_batch._data['node']['atomic_num'].view(-1)
z_t = torch.stack([z, types], axis=1).unique(dim=0).tolist()
types = types.tolist()
z = z.tolist()

In [None]:
fig = plt.figure(figsize=(5,10))
ax = fig.add_subplot(111)
ax.set_title('Atom Types')
ax.set_xlim([-5,5])
ax.set_ylim([0,100])

y = np.linspace(0,100,len(z_t)+2).tolist()[::-1]
for n in range(len(z_t)):
    element = Element(z_t[n][0]).symbol
    ax.scatter(0, y[n+1], s=500, color=CPK_COLORS[z_t[n][0]], alpha=1.0, edgecolors='black', lw=1);
    ax.annotate(element+str(z_t[n][1]), (1, y[n+1]-1), color='black', size=15)

plt.grid(False)
plt.axis('off')
plt.savefig(os.path.join(job_details.WORKDIR, 'atom_types_list.png'))  
plt.show()

fig = plt.figure(figsize=(30,30))
ax = fig.add_subplot(111)
ax.set_title(job_details.template_smiles)

x = xyz[:,0].view(-1).tolist()
y = xyz[:,1].view(-1).tolist()

z_t = torch.stack([template_dataset.container_batch._data['node']['atomic_num'].view(-1), template_dataset.container_batch._data['node']['type'].view(-1)], axis=1)
z_t = z_t.tolist()
for bond in bonds:
    bond = bond.view(-1).tolist()
    ax.plot([x[bond[0]], x[bond[1]]], [y[bond[0]], y[bond[1]]], c='black', zorder=0, lw=1)

for n in range(len(x)):
    ax.scatter(x[n], y[n], s=1000, color=CPK_COLORS[z[n]], alpha=1.0, edgecolors='black', lw=1);
    ax.annotate(str(z_t[n][1]), (x[n]-0.25, y[n]-0.025), color='black', size=75)

plt.grid(False)
plt.axis('off')
print(job_details.WORKDIR)
plt.savefig(os.path.join(job_details.WORKDIR, 'atom_types.png'))  

plt.show()

### For custom_parameter_dict, if one of the parameters is set to None, then the parameter will be learned. If the parameter is given a numeric value, then the value will NOT be learned and will remain as the set value

In [None]:
# glyme.Li.TFSI for OPLS parameterization
# litfsi comes from this (Doherty 2017): https://pubs.acs.org/doi/10.1021/acs.jctc.7b00520
custom_parameter_dict = {
    '0': {'types': [0], 'sigma': 2.500, 'epsilon': 0.030, 'charge': None}, #H, CO-()
    '1': {'types': [1], 'sigma': 2.500, 'epsilon': 0.030, 'charge': None}, #H, (CCO)
    '2': {'types': [2], 'sigma': 2.13, 'epsilon': 0.018, 'charge': None}, #Li 
    '3': {'types': [3], 'sigma': 3.500, 'epsilon': 0.066, 'charge': None}, #C, CO-()
    '4': {'types': [4], 'sigma': 3.500, 'epsilon': 0.066, 'charge': None}, #C, first CO-C*CO(CCO)
    '5': {'types': [5], 'sigma': 3.500, 'epsilon': 0.066, 'charge': None}, #C, (CCO)
    '6': {'types': [6], 'sigma': 3.500, 'epsilon': 0.066, 'charge': None}, #C, TFSI
    '7': {'types': [7], 'sigma': 3.250, 'epsilon': 0.170, 'charge': None}, #N, TFSI
    '8': {'types': [8], 'sigma': 2.900, 'epsilon': 0.140, 'charge': None}, #O, CO-(), glyme
    '9': {'types': [9], 'sigma': 2.900, 'epsilon': 0.140, 'charge': None}, #O, (CCO), glyme
    '10': {'types': [10], 'sigma': 2.960, 'epsilon': 0.210, 'charge': None}, #O=, TFSI
    '11': {'types': [11], 'sigma': 2.950, 'epsilon': 0.053, 'charge': None}, #F, TFSI
    '12': {'types': [12], 'sigma': 3.550, 'epsilon': 0.250, 'charge': None}, #S, TFSI    
}

In [None]:
df = pd.DataFrame()
df['custom_type'] = list(custom_parameter_dict.keys())
for key in ['types', 'sigma', 'epsilon', 'charge']:
    df[key] = df['custom_type'].apply(lambda c: custom_parameter_dict[c][key])
df = df.set_index('custom_type')
path = os.path.join(job_details.WORKDIR, 'template.params')
df.to_csv(path)
df = pd.read_csv(path, index_col=0)
df

# Code Validation

### now the files needed to specify the atomic environments have all been created in AuTopologyPipeline/train/job_name/template.
### The rest of this code now shows how the parameters set above in custom_parameter_dict are used to always use those paramters to compute forces in Forcefield.pair()

In [None]:
num_types = len(template_dataset.container_batch.get_data('node', 'type').unique())

In [None]:
num_types

In [None]:
_param = {}
for name in ['sigma', 'epsilon', 'charge']:
    _param[name] = torch.tensor(list(df[name].fillna(0))).to(torch.float)

In [None]:
_param

In [None]:
type_map = {}
to_type = df['types'].to_dict()
for name in _param.keys():
    to_custom_type = -1*torch.ones(num_types).to(torch.long)
    for custom_type in to_type.keys():
        if np.isnan(df[name][custom_type]):
            continue
        for t in ast.literal_eval(to_type[custom_type]):
            to_custom_type[t] = custom_type
    type_map[name] = to_custom_type.view(-1,1)

In [None]:
type_map

In [None]:
param = {}
param['sigma'] = 0.0001*torch.randn(num_types)
param['epsilon'] = 0.0001*torch.randn(num_types)
param['charge'] = 0.0001*torch.randn(num_types)

In [None]:
types = torch.arange(num_types).to(torch.long).view(-1,1)

In [None]:
types

In [None]:
PARAM = {}
for name in param.keys():
    custom_types = type_map[name][types.view(-1)]
    custom_type_mask = (custom_types<0).to(torch.float)
    PARAM[name] = 0.0
    PARAM[name] += _param[name][custom_types]*(1-custom_type_mask)
    PARAM[name] += param[name][types]*custom_type_mask

In [None]:
PARAM

In [None]:
li_mask = template_dataset._data['node']['atomic_num']==3
print()
print(template_dataset._data['node']['one_hot_graph_distinct_r1'][li_mask.view(-1)])
# print(template_dataset._data['node']['base_type'].view(-1))
