In [1]:
from pathlib import Path
from collections import defaultdict
from itertools import combinations
import dask.dataframe as dd
import dask
import pandas as pd

In [2]:
out_path = Path("../../Molecular_database/HAC_8")
pa = list(out_path.glob("*.parquet"))
clas = defaultdict(list)
for p in pa:
    db = p.stem.split("_")[1].split("_")[0].strip("db")
    clas[db].append(p)
comb = list(combinations(clas, 2))

In [4]:
clas.keys()

dict_keys(['002', '003', '014', '004', '006', '010', '012', '013', '011', '007', '005', '008'])

In [5]:
from dask.distributed import Client
client = Client()
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 31.08 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:44147,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:44293,Total threads: 2
Dashboard: http://127.0.0.1:43929/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:45147,
Local directory: /tmp/dask-scratch-space/worker-_7zlqogg,Local directory: /tmp/dask-scratch-space/worker-_7zlqogg

0,1
Comm: tcp://127.0.0.1:34813,Total threads: 2
Dashboard: http://127.0.0.1:38335/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:38863,
Local directory: /tmp/dask-scratch-space/worker-vxnqsnvb,Local directory: /tmp/dask-scratch-space/worker-vxnqsnvb

0,1
Comm: tcp://127.0.0.1:41825,Total threads: 2
Dashboard: http://127.0.0.1:40327/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:33977,
Local directory: /tmp/dask-scratch-space/worker-3ba2cgrr,Local directory: /tmp/dask-scratch-space/worker-3ba2cgrr

0,1
Comm: tcp://127.0.0.1:43417,Total threads: 2
Dashboard: http://127.0.0.1:43429/status,Memory: 7.77 GiB
Nanny: tcp://127.0.0.1:35809,
Local directory: /tmp/dask-scratch-space/worker-ld8ip_0c,Local directory: /tmp/dask-scratch-space/worker-ld8ip_0c




In [11]:
dedup_dfs = {}

lazy_results = []
previous = []
smiles_col="SMILES"
# Build all lazy computations first
for db_id, path in clas.items():
    print(db_id)
    df = dd.read_parquet(path, columns=[smiles_col])
    
    df_dedup = df.drop_duplicates(subset=[smiles_col])
    unique = df_dedup.map_partitions(len).sum()
    dedup_dfs[db_id] = df_dedup
    # Collect both lazy results for single batch compute
    lazy_results.append(unique)
    
# Compute all totals and uniques at once
computed_values = dask.compute(*lazy_results)

# Assign results back in the same order
counts = dict(zip(clas.keys(), computed_values))

002
003
014
004
006
010
012
013
011
007
005
008


In [6]:
def modify_counts(counts):
    names = {}
    n = []
    for x in counts:
        if len(x.split("_")) > 1:
            n.append(x)
            if x.split("_")[0] not in names:
                names[x.split("_")[0]] = 0
            names[x.split("_")[0]] += counts[x]

    counts.update(names)
    for x in n:
        del counts[x]
    return counts

In [7]:
pd.Series(counts).to_list()

[]

In [None]:
clas = ["001", "002", "003", "004"] 
clas2 = ["001", "003", "002", "004"] 
comb = [sorted(x) for x in combinations(clas, 2)]
comb2 = sorted(combinations(clas2, 2))
comb2

[('001', '002'),
 ('001', '003'),
 ('001', '004'),
 ('002', '004'),
 ('003', '002'),
 ('003', '004')]

In [22]:
[sorted(x) for x in combinations(clas2, 2)]

[['001', '003'],
 ['001', '002'],
 ['001', '004'],
 ['002', '003'],
 ['003', '004'],
 ['002', '004']]

In [21]:
comb

[('001', '002'),
 ('001', '003'),
 ('001', '004'),
 ('002', '003'),
 ('002', '004'),
 ('003', '004')]

In [8]:
pairs = comb
for db1, db2 in pairs:
    print(db1, db2)
    

002 003
002 014
002 004
002 006
002 010
002 012
002 013
002 011
002 007
002 005
002 008
003 014
003 004
003 006
003 010
003 012
003 013
003 011
003 007
003 005
003 008
014 004
014 006
014 010
014 012
014 013
014 011
014 007
014 005
014 008
004 006
004 010
004 012
004 013
004 011
004 007
004 005
004 008
006 010
006 012
006 013
006 011
006 007
006 005
006 008
010 012
010 013
010 011
010 007
010 005
010 008
012 013
012 011
012 007
012 005
012 008
013 011
013 007
013 005
013 008
011 007
011 005
011 008
007 005
007 008
005 008


In [9]:
def get_overlap(db1, db2, dedup_dfs, counts, smiles_col="SMILES", small_threshold=100_000):
    """Efficient overlap detection using merge or isin depending on size."""
    df1 = dedup_dfs[db1]
    df2 = dedup_dfs[db2]

    len1 = counts[db1]
    len2 = counts[db2]
    
    # differentiate beween smaller and bigger
    if len1 < len2:
        small, big = df1, df2
        partitions = int(len1/small_threshold)
        
    else:
        small, big = df2, df1
        partitions = int(len2/small_threshold)
        
    if len1 < small_threshold or len2 < small_threshold:
        small = small.repartition(npartitions=1)
        overlap = dd.merge(big, small, on=smiles_col, how="inner")
    else:
        # Use merge for large–large comparisons
        ob = []
        meta = small._meta
        small = small.repartition(npartitions=partitions)
        for sma in small.to_delayed():
            sma = dd.from_delayed(sma, meta=meta)
            overlap = dd.merge(big, sma, on=smiles_col, how="inner")
            ob.append(overlap)
    
        overlap = dask.dataframe.concat(ob)
    return overlap

In [12]:
from itertools import islice

def batched(iterable, n):
    it = iter(iterable)
    while batch := list(islice(it, n)):
        yield batch
        
overlaps={}
for batch in batched(pairs, 3):  # run 3 at a time
    futures = []
    for db1, db2 in batch:
        overlap = dd.merge(
            dedup_dfs[db1],
            dedup_dfs[db2],
            on=smiles_col,
            how="inner"
        )
        #overlap = get_overlap(db1, db2, dedup_dfs, counts, smiles_col=smiles_col)
        futures.append(overlap)
    
    results = dask.compute(*futures)
    for (db1, db2), res in zip(batch, results):
        overlaps[f"{db1}_{db2}"] = res

In [15]:
len(results), batch

(3, [('007', '005'), ('007', '008'), ('005', '008')])

('007', '005')
('007', '008')
('005', '008')


In [21]:
from itertools import islice

def batched(iterable, n):
    it = iter(iterable)
    while batch := list(islice(it, n)):
        yield batch
        
overlaps={}
for batch in batched(pairs, 3):  # run 3 at a time
    futures = []
    for db1, db2 in batch:
        overlap = get_overlap(db1, db2, dedup_dfs, counts, smiles_col=smiles_col)
        futures.append(overlap)
    
    results = dask.compute(*futures)
    for (db1, db2), res in zip(batch, results):
        overlaps[f"{db1}_{db2}"] = res

In [None]:
s = dedup_dfs["004"].compute()
b = dedup_dfs["003"].compute()

In [None]:
dd.merge(s, b, on=smiles_col, how="inner").compute()

In [2]:
10/0.5

20.0

In [14]:
meta = dedup_dfs["004"]._meta  # schema of a partition

lab = []
for part in dedup_dfs["004"].to_delayed():
    sma_df = part["SMILES"]
    overlap = dedup_dfs["003"][dedup_dfs["003"]["SMILES"].isin(sma_df)]
    lab.append(overlap)

In [3]:
db1, db2 = "a", "b"
counts = {db1: 30, db2: 50}
small_threshold = 50

In [4]:
all(i <= small_threshold for i in [counts[u] for u in [db1, db2]])

True

In [12]:
a = dask.dataframe.concat(lab).compute()
a

Unnamed: 0,SMILES
5,CC=CCC=CC(C)C
10,CC(C)=CC=CCC#N
32,C=CCOCC(=O)OC
0,CN(O)c1ccccc1
31,C#CCCn1ccnc1
...,...
13,CC=C(C)C=CCC#N
2,CCC=CC(C)(C)CC
11,CCCC=CCC1CO1
0,CC(=O)OC1CC(C)C1


In [4]:
smiles_to_dbs = defaultdict(set)
redun_counts = {}
for pair, df in overlaps.items():
    redun_counts[pair] = df.shape[0]
    db1, db2 = pair.split("_")
    for smi in df["SMILES"]:
        smiles_to_dbs[smi].update([db1, db2])

NameError: name 'overlaps' is not defined

In [22]:
smiles_overlap_df = pd.DataFrame({
    "SMILES": list(smiles_to_dbs.keys()),
    "Databases": [",".join(sorted(list(v))) for v in smiles_to_dbs.values()]
})
smiles_overlap_df

Unnamed: 0,SMILES,Databases
0,C=CCNc1ccco1,003004012
1,CC(C)OC(=O)CON,003004012
2,CCOCCOCCO,003004005007012
3,N#CCCOCC(N)=O,003004012
4,CCCN(C)S(C)(=O)=O,003004012
...,...,...
110508,Oc1ncc(O)c(O)n1,005007
110509,Nc1cc(=O)nc(N)[nH]1,005007
110510,C=C1C(=O)O[C@@H](C)[C@@H]1O,005007
110511,N[C@H]1CCN[C@H]1C(=O)O,005007


In [None]:
p = list(Path("tmp").glob("*"))
u = {x.name: x for x in p}
c = {x.name: list(x.glob("*.parquet")) for x in p}
for x, f in c.items():
    if not f:
        c[x] = list(u[x].glob("part_*/*.parquet"))
        

### Analize

In [1]:
from collections import defaultdict
import datamol as dm

In [15]:
new = defaultdict(int)
internal = defaultdict(int)
data = pd.read_csv("total_stats.csv", index_col=[0,1]).fillna(0)
total = data.groupby(level=1)["total_counts"].sum()
for i, n in total.to_dict().items():
    new[int(i.split("_")[0])] += n
total_before = pd.Series(new)

to = data.groupby(level=1)["internal_counts"].sum()
for i, n in to.to_dict().items():
    internal[int(i.split("_")[0])] += n

internal = pd.Series(internal)

In [None]:
dm.parallelized()

In [19]:
data.loc[7]

Unnamed: 0,total_counts,internal_counts
2,6486.0,6470.0
3,1057.0,609.0
4,224772.0,79167.0
5,879.0,756.0
7,905.0,890.0
8,20.0,20.0
10,1532.0,1335.0
12,72953.0,43280.0
13,1599.0,1386.0


In [17]:
(total_before - internal)

1     1.275154e+09
2     1.406699e+08
3     1.187102e+08
4     2.747218e+07
5     1.114570e+05
7     2.750000e+03
8     3.900000e+01
9     4.518720e+05
10    1.004790e+05
11    4.187580e+05
12    1.025171e+06
13    5.418400e+04
dtype: float64

In [6]:
data = pd.read_csv("../../total_overlaping.csv", index_col=[0,1])
data.sum() / 1000_000_000

redundant_count    1.252054
dtype: float64

In [8]:
data = pd.read_csv("final_dedup_counts.csv", index_col=[0,1])
data

Unnamed: 0,0
1,451
2,1715
3,4040
4,8591
5,18489
...,...
819,1
825,2
865,2
873,1


In [None]:
pd.read_parquet("../../Molecular_database/7/HAC7_00.parquet").shape

In [6]:
pd.read_parquet("../../Molecular_database/008/HAC=19/part.0.parquet").drop_duplicates(subset="SMILES") 

Unnamed: 0,ID,SMILES,db_id
0,NPA000112,Cc1cnc(-c2cc(O)ccc2O)c2c1C(O)CC2,008
1,NPA000129,O=C1C=C(CCC(=O)OCCc2ccccc2)CC1,008
2,NPA000159,O=C1NC(Cc2ccc(O)cc2)C(=O)N2CCCC12,008
3,NPA000190,CCCCCCCC1C(=O)c2cccc(O)c2C1O,008
4,NPA000233,CNc1ccc(-c2nc3ccccc3c(=O)[nH]2)cc1,008
...,...,...,...
630,NPA036542,CC(CO)CCCC(C)(O)c1ccc(C(=O)O)cc1,008
632,NPA036606,CC1=C(CCO)C(=O)C2=C(CC(C)(C(=O)O)C2)C1=O,008
633,NPA036646,O=C(O)c1c(-c2ccc[nH]2)[nH]c2ccccc2c1=O,008
634,NPA036648,CC(N)C(=O)NC(CC1C=C(Cl)C(=O)CC1)C(=O)O,008
