In [1]:
import warnings
warnings.filterwarnings('ignore')

In [2]:
import numpy as np
import subprocess
import tempfile
import os
import sys
from collections import defaultdict
from typing import Union

class Miner:
    def __init__(self, s: int,  miner_path="./miner", base_file_name: Union[None, str] = None, verbose: bool = False):
        self.s = s
        self.miner_path = miner_path
        self.verbose = verbose
        self.base_file_name = tempfile.mktemp() if not base_file_name else base_file_name
        self.input_file_name = self.base_file_name + "_input"
        self.substructure_file_name = self.base_file_name + "_output.txt"
        self.identifiers_file_name = self.base_file_name + "_ids.txt"


    def run(self, x: list[str], **kwargs) -> dict[str, list[str]]:
        self.save_input_to_file(x)
        command = self.build_command(**kwargs)
        success = self.run_miner(command)
        if success is True:
            result = self.parse_output_but_cooler()
            result = self.populate_missing_data(result, x)
            return result
        return {}

    def save_input_to_file(self, x: list[str]) -> None:
        with open(self.input_file_name, 'w') as input_file:
            for smile in x:
                line = f"{smile},0,{smile}\n"
                input_file.write(line)

    def build_command(self, **kwargs) -> list[str]:
        command = [self.miner_path, self.input_file_name]
        for key, value in kwargs.items():
            if isinstance(value, bool):
                if value:
                    command.append(f"-{key}")
            else:
                command.append(f"-{key}{value}")

        command.append(f"-s{self.s}")
        command.append(self.substructure_file_name)
        command.append(self.identifiers_file_name)

        return command

    def run_miner(self, command: list[str]) -> bool:
        try:
            result = subprocess.run(command, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, text=True)
            result.check_returncode()
        except subprocess.CalledProcessError as e:
            print("Error running MoSS:", e.stderr)
            return False
        except Exception as e:
            print("An error occurred:", str(e))
            return False
        return True

    def parse_output_but_cooler(self) -> dict[str, list[str]]:
        result = defaultdict(list)
        with open(self.substructure_file_name, "r") as substructure_file, open(self.identifiers_file_name, "r") as identifiers_file:
            substucture_lines = substructure_file.read().strip().split("\n")[1:]
            molecular_lines = identifiers_file.read().strip().split("\n")[1:]

            assert(len(substucture_lines) == len(molecular_lines))

            for substructure_line, molecular_line in zip(substucture_lines, molecular_lines):
                entities = molecular_line.split(':')[1].split(',')
                smile = substructure_line.split(',')[1]
                for entity in entities:
                    result[entity].append(smile)

        return dict(result)

    def populate_missing_data(self, results: dict[str, list[str]], initial_data: list[str]):
        for molecule in initial_data:
            if molecule not in results:
                results[molecule] = [] # No common substructure has been found for this particular molecule
        return results

In [3]:
import time
def test_dataset(x, y, s, clf, name="test"):
    start_time = time.time()
    mols_train, mols_test, y_train, y_test = scaffold_train_test_split(x, y, test_size=0.2)

    train_values_map = {molecule: prediction for molecule, prediction in zip(mols_train, y_train)}
    test_values_map = {molecule: prediction for molecule, prediction in zip(mols_test, y_test)}
    
    miner = Miner(s=s, base_file_name=name)
    mols_train_processed = miner.run(mols_train, jS=True)

    vocabulary = list(set([substructure for x in mols_train_processed.values() for substructure in x]))
    mlb = MultiLabelBinarizer(classes=vocabulary)
    mlb.fit(vocabulary)

    mols_train_encoded = {key: (mlb.transform([value])[0], train_values_map[key]) for key, value in mols_train_processed.items()}

    mols_test_processed = miner.run(mols_test, jS=True)
    mols_test_encoded = {key: (mlb.transform([value])[0], test_values_map[key]) for key, value in mols_test_processed.items()}

    X_train = np.array([x[0] for x in mols_train_encoded.values()])
    y_train = np.array([x[1] for x in mols_train_encoded.values()])

    X_test = np.array([x[0] for x in mols_test_encoded.values()])
    y_test = np.array([x[1] for x in mols_test_encoded.values()])

    clf.fit(X_train, y_train)

    y_pred = clf.predict_proba(X_test)[:, 1]
    auroc = roc_auc_score(y_test, y_pred)

    end_time = time.time() - start_time

    return auroc

In [4]:
from skfp.datasets.moleculenet import load_bace, load_bbbp, load_hiv
from skfp.model_selection import scaffold_train_test_split
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import RandomForestClassifier

In [5]:
x, y = load_bace()

clf = RandomForestClassifier(random_state=10)
results_bace = [(test_dataset(x, y, s, clf, "bace"), s) for s in range(25, 2, -1)]
print(f"highest score: {max(results_bace)}")

highest score: (0.5344040801717966, 19)


In [6]:
x, y = load_bbbp()

clf = RandomForestClassifier(random_state=10)
results_bbbp = [(test_dataset(x, y, s, clf, "bbbp"), s) for s in range(15, 2, -1)]
print(f"highest score: {max(results_bbbp)}")



highest score: (0.6567611316469765, 3)


In [7]:
x, y = load_hiv()

clf = RandomForestClassifier(random_state=10)
results_hiv = [(test_dataset(x, y, s, clf, "hiv"), s) for s in range(15, 2, -1)]
print(f"highest score: {max(results_hiv)}")



highest score: (0.6346849656893324, 8)


In [8]:
from sklearn.linear_model import LogisticRegression
x, y = load_bace()

clf = LogisticRegression()
results_bace = [(test_dataset(x, y, s, clf, "bace"), s) for s in range(25, 2, -1)]
print(f"highest score: {max(results_bace)}")

highest score: (0.5407569792412312, 13)


In [9]:
x, y = load_bbbp()

clf = LogisticRegression()
results_bbbp = [(test_dataset(x, y, s, clf, "bbbp"), s) for s in range(15, 2, -1)]
print(f"highest score: {max(results_bbbp)}")



highest score: (0.6691792901838564, 3)


In [10]:
x, y = load_hiv()

clf = LogisticRegression()
results_hiv = [(test_dataset(x, y, s, clf, "hiv"), s) for s in range(15, 2, -1)]
print(f"highest score: {max(results_hiv)}")



highest score: (0.631786076462084, 8)
