In [16]:
import pandas as pd
import glob
from tqdm.notebook import tqdm, trange
from typing import List, Tuple, Set, Union, Callable
import re
from rdkit import Chem
from joblib import Parallel, delayed
from multiprocessing import cpu_count
from rdkit.DataStructs.cDataStructs import ExplicitBitVect

In [2]:
df = pd.read_csv('coremof-mofid.txt', sep=';')
df

Unnamed: 0,mofid,mof_name
0,[Co].[O-]C(=O)c1ccncc1 MOFid-v1.rtl.cat0,ABAVIJ_clean
1,[Co].[O-]C(=O)c1ccncc1 MOFid-v1.rtl.cat0,ABAVOP_clean
2,Cl[Mn][Mn]Cl.[O-]C(=O)c1cc(cc(c1)C(=O)[O-])C(=...,ABAYIO_clean
3,Cl[Co][Co]Cl.[O-]C(=O)c1cc(cc(c1)C(=O)[O-])C(=...,ABAYOU_clean
4,[O]S(c1ccc(cc1)C1=C2C=CC(=C(c3ccc(cc3)S([O])([...,ABEFUL_clean
...,...,...
14021,[O]P(=O)(c1ccccc1)c1ccccc1.[Zn][OH][Zn] MOFid-...,ncomms13008_ncomms13008-s4_clean
14022,CC1=NC(=N[N]1)CC1=N[N]C(=N1)C.CC1=NN=C([N]1)CC...,ncomms7350-s2_clean
14023,[O-]C(=O)c1ccc(cc1)[Si](c1ccc(cc1)C(=O)[O-])(c...,om701262m-file004_clean
14024,[Fe].[Na].[O-]C(=O)C(=O)[O-] MOFid-v1.ERROR.cat5,sciadv.1600621_S1_Crystal_Structures_clean


In [3]:
df['mofid'].iloc[24]

'CC(=O)O[Pb].[K].[N]=[C]SS[C]=[N].[O-]C(=O)C.[Pb] MOFid-v1.ERROR.cat0'

In [4]:
metals = []
with open('metals.txt') as f:
    metals = f.read().splitlines()
halides = ['F', 'Cl', 'Br', 'I']

In [5]:
def process_topology(s: str) -> str:
    query = re.compile('MOFid-v1.(.*).cat')
    result = query.search(s)
    if result is not None:
        return result.group(1)
    else:
        return None

def process_metal_and_organic(s: str) -> Tuple[List[str], List[str]]:
    metal_nodes = []
    organic_linkers = []
    items = s.split('.')
    for i in items:
        for m in metals:
            if m in i:
                metal_nodes.append(i)
                break
    for i in items:
        if i not in metal_nodes:
            organic_linkers.append(i)
    return metal_nodes, organic_linkers

def process_mofid(s: str) -> Tuple[List[str], List[str], str]:
    """
    Returns three things in a tuple from a MOFid:
    1. list of metal nodes
    2. list of organic linkers
    3. topology string
    """
    s1, s2 = s.split(' ')
    metal_nodes, organic_linkers = process_metal_and_organic(s1)
    topology = process_topology(s2)
    return metal_nodes, organic_linkers, topology

In [6]:
process_mofid(df['mofid'].iloc[1])

(['[Co]'], ['[O-]C(=O)c1ccncc1'], 'rtl')

In [7]:
[process_mofid(i) for i in tqdm(df['mofid'])]

  0%|          | 0/14026 [00:00<?, ?it/s]

[(['[Co]'], ['[O-]C(=O)c1ccncc1'], 'rtl'),
 (['[Co]'], ['[O-]C(=O)c1ccncc1'], 'rtl'),
 (['Cl[Mn][Mn]Cl'], ['[O-]C(=O)c1cc(cc(c1)C(=O)[O-])C(=O)[O-]'], 'tbo'),
 (['Cl[Co][Co]Cl'], ['[O-]C(=O)c1cc(cc(c1)C(=O)[O-])C(=O)[O-]'], 'tbo'),
 (['[Tb]'],
  ['[O]S(c1ccc(cc1)C1=C2C=CC(=C(c3ccc(cc3)S([O])([O])[O])C3=NC(=C(C4=CC=C(C(=C5N=C1C=C5)c1ccc(cc1)S([O])([O])[O])[N]4)c1ccc(cc1)S([O])([O])[O])C=C3)[N]2)([O])[O]'],
  'bcu,eft'),
 (['Cl[Cu]', '[Cu]'], ['[O-]C(=O)c1cc(cc(c1)C(=O)[O-])C(=O)[O-]'], 'the'),
 (['[Cu]'], ['n1ccc(cc1)c1cc(c2ccncc2)c(cc1c1ccncc1)c1ccncc1'], 'ubo,TIMEOUT'),
 (['[La]'], ['[O-]C(=O)C(=O)[O-]', '[O-]C(=O)c1ncnc(c1)C(=O)[O-]'], 'ERROR'),
 (['[In]'],
  ['[O-]C(=O)c1ccc(cc1)c1ccc(cc1)C(=O)[O-]', '[O-]C(=O)c1ccc(s1)C(=O)[O-]'],
  'UNKNOWN'),
 (['[Ce]'], ['[O-]C(=O)C(=O)[O-]', '[O-]C(=O)c1ncnc(c1)C(=O)[O-]'], 'ERROR'),
 (['[Pr]'], ['[O-]C(=O)C(=O)[O-]', '[O-]C(=O)c1ncnc(c1)C(=O)[O-]'], 'ERROR'),
 (['[Nd]'], ['[O-]C(=O)C(=O)[O-]', '[O-]C(=O)c1ncnc(c1)C(=O)[O-]'], 'ERROR'),
 (['[Sm

In [8]:
s = '[Cu][Cu]'
query = re.compile('\[(.*?)\]')
query.findall(s)

['Cu', 'Cu']

In [9]:
def canonicalize_smiles(smi: str) -> Union[None, str]:
    # TODO: I think there's an RDKit builtin function for doing this already; maybe use that instead
    mol = Chem.MolFromSmiles(smi, sanitize=False)
    if mol is None:
        return None
    else:
        return Chem.MolToSmiles(mol)

def process_topology(s: str) -> Set[str]:
    query = re.compile('MOFid-v1.(.*).cat')
    result = query.search(s)
    if result is not None:
        return set(result.group(1).split(','))
    else:
        return None

def process_metal_and_organic(s: str) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
    metal_nodes = set()
    metal_node_types = set()
    metal_list = set()
    organic_linkers = set()
    items = s.split('.')
    bracket_query = re.compile('\[(.*?)\]')
    for i in items:
        has_metal = False
        
        results = bracket_query.findall(i)
            
        # search for metals 
        for n in results:
            if n in metals:
                has_metal = True
                metal_list.add(n)
                metal_nodes.add(i)
        
        # add metal node type
        if has_metal:
            for m in metals:
                i = i.replace('[%s]'%m, '[M]')
            for h in halides:
                i = i.replace(h, '$')
            metal_node_types.add(i)
        else:
            organic_linkers.add(canonicalize_smiles(i))
            
    return metal_nodes, organic_linkers, metal_list, metal_node_types

def process_mofid(s: str) -> dict:
    """
    Returns five things in a tuple from a MOFid:
    1. set of metal nodes with explicit metals
    2. set of organic linkers
    3. set of topologies (usually only 1, but some have multiple?)
    4. set of metals found
    5. set of metal node motifs
    """
    s1, s2 = s.split(' ')
    metal_nodes, organic_linkers, metal_list, metal_node_types = process_metal_and_organic(s1)
    topology = process_topology(s2)
#     return metal_nodes, organic_linkers, topology, metal_list, metal_node_types
    return {
        'metal nodes': metal_nodes,
        'organic linkers': organic_linkers,
        'topology': topology,
        'metal list': metal_list,
        'metal node types': metal_node_types
    }

In [10]:
processed_mofids = [process_mofid(i) for i in tqdm(df['mofid'])]

  0%|          | 0/14026 [00:00<?, ?it/s]

In [11]:
processed_mofids

[({'[Co]'}, {'O=C([O-])c1ccncc1'}, {'rtl'}, {'Co'}, {'[M]'}),
 ({'[Co]'}, {'O=C([O-])c1ccncc1'}, {'rtl'}, {'Co'}, {'[M]'}),
 ({'Cl[Mn][Mn]Cl'},
  {'O=C([O-])c1cc(C(=O)[O-])cc(C(=O)[O-])c1'},
  {'tbo'},
  {'Mn'},
  {'$[M][M]$'}),
 ({'Cl[Co][Co]Cl'},
  {'O=C([O-])c1cc(C(=O)[O-])cc(C(=O)[O-])c1'},
  {'tbo'},
  {'Co'},
  {'$[M][M]$'}),
 ({'[Tb]'},
  {'OS(O)(O)c1ccc(C2=C3C=CC(=C(c4ccc(S(O)(O)O)cc4)C4=NC(=C(c5ccc(S(O)(O)O)cc5)C5=CC=C(N5)C(c5ccc(S(O)(O)O)cc5)=C5C=CC2=N5)C=C4)N3)cc1'},
  {'bcu', 'eft'},
  {'Tb'},
  {'[M]'}),
 ({'Cl[Cu]', '[Cu]'},
  {'O=C([O-])c1cc(C(=O)[O-])cc(C(=O)[O-])c1'},
  {'the'},
  {'Cu'},
  {'$[M]', '[M]'}),
 ({'[Cu]'},
  {'c1cc(c2cc(c3ccncc3)c(c3ccncc3)cc2c2ccncc2)ccn1'},
  {'TIMEOUT', 'ubo'},
  {'Cu'},
  {'[M]'}),
 ({'[La]'},
  {'O=C([O-])C(=O)[O-]', 'O=C([O-])c1cc(C(=O)[O-])ncn1'},
  {'ERROR'},
  {'La'},
  {'[M]'}),
 ({'[In]'},
  {'O=C([O-])c1ccc(C(=O)[O-])s1', 'O=C([O-])c1ccc(c2ccc(C(=O)[O-])cc2)cc1'},
  {'UNKNOWN'},
  {'In'},
  {'[M]'}),
 ({'[Ce]'},
  {'O=C([O-])C

In [12]:
def collect_set(list_of_sets) -> List[str]:
    return list(set().union(*[l for l in list_of_sets if l is not None]))

metal_nodes, organic_linkers, topologies, metal_lists, metal_node_types = tuple(zip(*processed_mofids))
all_metal_nodes = collect_set(metal_nodes)
all_organic_linkers = collect_set(organic_linkers)
all_topologies = collect_set(topologies)
all_metal_list = collect_set(metal_lists)
all_metal_node_types = collect_set(metal_node_types)

In [13]:
len(all_metal_nodes), len(all_organic_linkers), len(all_topologies), len(all_metal_list), len(all_metal_node_types)

(1505, 4533, 377, 60, 811)

In [17]:
def fp_smiles(s: str, **kwargs):
    mol = Chem.MolFromSmiles(s, sanitize=False)
    return Chem.RDKFingerprint(mol, **kwargs)

def metal_node_type_fp(s: str):
    return fp_smiles(s.replace('[M]', '[Zn]').replace('$', 'Cl'), maxPath=3)

all_metal_node_type_fps = [metal_node_type_fp(i) for i in all_metal_node_types]

In [None]:
def one_hot_fp(s: str, unique: List[str]):
    fp = ExplicitBitVect(len(unique))
    fp.SetBit(unique.index(s))
    return fp

In [18]:
def union_bv(list_of_bv: List[ExplicitBitVect], bv=None):
    if len(list_of_bv) == 0:
        return bv
    if bv is None:
        bv = list_of_bv.pop()
        return union_bv(list_of_bv, bv)
    else:
        new_bv = list_of_bv.pop()
        return union_bv(list_of_bv, bv|new_bv)
    
def collective_fp(list_to_fp: Set[str], fingerprint_fn: Callable):
    return union_bv(
        [fingerprint_fn(i) for i in list_to_fp]
    )

collective_fp({'C1=NNN(CCCn2ncnn2)N1', 'c1nnn(CCCn2ncnn2)n1'}, fp_smiles)

<rdkit.DataStructs.cDataStructs.ExplicitBitVect at 0x7f546976d210>

In [None]:
class MOFCollection:
    def __init__(self, all_mofids: List[str]):
        processed_mofids = [process_mofid(i) for i in all_mofids]