In [1]:
import sys
import os

# Set the project root (adjust this path as needed)
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) if '__file__' in globals() else os.path.abspath("..")
sys.path.insert(0, project_root)

In [2]:
import os
import numpy as np
import pandas as pd
from sklearn.model_selection import KFold
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from transformers import AutoModel, AutoTokenizer
from tqdm import tqdm
from models.autoencoder import FingerprintAutoencoder
from rdkit.Chem import rdFingerprintGenerator
from typing import Iterator, List, Tuple
from rdkit import Chem
from rdkit.Chem import rdFingerprintGenerator
import json


SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)
device = torch.device("cpu")

### 2. Experiment Setup

- **Feature types**:
  1. molformer  
  2. fingerprint  
  3. compressed_fp_5  
  4. compressed_fp_10  
  5. molformer+fp  
  6. molformer+compressed_fp_5  
  7. molformer+compressed_fp_10  

- **Prediction heads**:
  1. mlp_1  
  2. mlp_2  
  3. mlp_3  
  4. random_forest  

- **CV**: 5-fold


In [3]:
FEATURE_EXTRACTORS = [
    "molformer",
    "fingerprint",
    "compressed_fp_5",
    "compressed_fp_10",
    "molformer+fp",
    "molformer+compressed_fp_5",
    "molformer+compressed_fp_10"
]

PREDICTION_HEADS = ["mlp_1", "mlp_2", "mlp_3", "random_forest"]

COMPRESSED_FP_PATHS = {
    "compressed_fp_5": "/data/users/pjajoria/model_checkpoints/autoencoder/compression_5",
    "compressed_fp_10": "/data/users/pjajoria/model_checkpoints/autoencoder/compression_10",
    "molformer+compressed_fp_5": "/data/users/pjajoria/model_checkpoints/autoencoder/compression_5",
    "molformer+compressed_fp_10": "/data/users/pjajoria/model_checkpoints/autoencoder/compression_10",
}

N_SPLITS = 5


## Dataset

In [4]:
DATA_PATH = "/nethome/pjajoria/Github/thesis/data/processed/lipo.csv"

df = pd.read_csv(DATA_PATH)
print(f"Loaded {len(df)} molecules")
# df.head()
# TODO: Scafold splitting


Loaded 4200 molecules


## Feature Extractors

In [5]:
FPGEN = rdFingerprintGenerator.GetMorganGenerator(radius=2, fpSize=2048)
def smis2torch_fp(smiles: List[str]):
    # torch uses optimized dot products for float32 but not for int
    # or bool
    fps = np.zeros((len(smiles), 2048), dtype=np.float32)
    for i, smi in enumerate(smiles):
        mol = Chem.MolFromSmiles(smi)
        fps[i, :] = FPGEN.GetFingerprintAsNumPy(mol)

    return torch.from_numpy(fps)


In [6]:
# All Models
molformer = AutoModel.from_pretrained("ibm/MoLFormer-XL-both-10pct", deterministic_eval=True, trust_remote_code=True).to(device)
molformer_tokenizer = AutoTokenizer.from_pretrained("ibm/MoLFormer-XL-both-10pct", trust_remote_code=True)


def load_compressed_autoencoders():
    input_dim = 2048
    compression_levels = [5, 10]
    base_path = "/data/users/pjajoria/model_checkpoints/autoencoder/compression_{}"

    compressed_fps = {}
    for comp in compression_levels:
        latent_dim = int(input_dim * (comp / 100))
        model = FingerprintAutoencoder(input_dim=input_dim, latent_dim=latent_dim)
        ckpt_path = f"{base_path.format(comp)}/ae_final.pth"
        model.load_state_dict(torch.load(ckpt_path, map_location=device))
        model.to(device)
        model.eval()
        compressed_fps[str(comp)] = model

    return compressed_fps

compressed_fp_models = load_compressed_autoencoders()

  model.load_state_dict(torch.load(ckpt_path, map_location=device))


In [7]:

def extract_features(smiles_list, feature_type):
    """
    Returns np.ndarray of shape (N, D) for given smiles_list.
    """
    if feature_type == "molformer":
        # Load model directly
        inputs = molformer_tokenizer(smiles_list, padding=True, return_tensors="pt").to(device)
        with torch.no_grad():
            outputs = molformer(**inputs)
        return outputs.pooler_output
    elif feature_type == "fingerprint":
        return smis2torch_fp(smiles_list).to(device)

    elif feature_type.startswith("compressed_fp"):
        compression = feature_type.split("_")[-1]
        return compressed_fp_models[compression].encoder(smis2torch_fp(smiles_list).to(device))
    elif feature_type.startswith("molformer+"):
        # split e.g. "molformer+compressed_fp_5"
        base, extra = feature_type.split("+")
        feat1 = extract_features(smiles_list, base)
        feat2 = extract_features(smiles_list, extra)
        return torch.concatenate([feat1, feat2], axis=1)

    else:
        raise ValueError(f"Unknown feature type: {feature_type}")


### Testing all the models forward pass

In [8]:
smiles_list = ["CCO", "CC(=O)O", "c1ccccc1", "CCN(CC)CC", "C#N"]

compressed_fp_models = load_compressed_autoencoders()

# Test each feature type
feature_types = [
    "molformer",
    "fingerprint",
    "compressed_fp_5",
    "compressed_fp_10",
    "molformer+fingerprint",
    "molformer+compressed_fp_5",
    "molformer+compressed_fp_10"
]

# Test and print output shapes
for ft in feature_types:
    try:
        features = extract_features(smiles_list, ft)
        print(f"{ft}: {features.shape}")
    except Exception as e:
        print(f"{ft}: Failed with error -> {e}")

  model.load_state_dict(torch.load(ckpt_path, map_location=device))


molformer: torch.Size([5, 768])
fingerprint: torch.Size([5, 2048])
compressed_fp_5: torch.Size([5, 102])
compressed_fp_10: torch.Size([5, 204])
molformer+fingerprint: torch.Size([5, 2816])
molformer+compressed_fp_5: torch.Size([5, 870])
molformer+compressed_fp_10: torch.Size([5, 972])


## Prediction Heads

- **MLP**: 1-, 2-, or 3-layer feedforward
- **RF**: scikit-learn RandomForestRegressor


In [9]:
class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dims):
        super().__init__()
        layers = []
        dims = [input_dim] + hidden_dims + [1]
        for i in range(len(dims)-2):
            layers.append(nn.Linear(dims[i], dims[i+1]))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(dims[-2], dims[-1]))
        self.net = nn.Sequential(*layers)
    def forward(self, x):
        return self.net(x)

def get_model(model_type, input_dim):
    if model_type == "mlp_1":
        return MLP(input_dim, [128])
    if model_type == "mlp_2":
        return MLP(input_dim, [256, 128])
    if model_type == "mlp_3":
        return MLP(input_dim, [512, 256, 128])
    if model_type == "random_forest":
        return RandomForestRegressor(n_estimators=100, random_state=SEED)
    raise ValueError(model_type)


## Run k-Fold CV & Collect Metrics

In [None]:
from tqdm.notebook import tqdm  # for notebook-friendly progress bars

results = []

kf = KFold(n_splits=N_SPLITS, shuffle=True, random_state=SEED)

# Outer progress bar for total number of (feat x head) runs
total_runs = len(FEATURE_EXTRACTORS) * len(PREDICTION_HEADS)
outer_pbar = tqdm(total=total_runs, desc="Feature+Head combos")

for feat in FEATURE_EXTRACTORS:
    X = extract_features(df["smiles"].tolist(), feat)  # (N, D)
    y = df["target"].values

    for head in PREDICTION_HEADS:
        fold_metrics = []

        # Optional: progress bar for folds if folds are large
        # fold_pbar = tqdm(total=N_SPLITS, desc=f"Fold ({feat}, {head})")

        for fold, (train_idx, test_idx) in enumerate(kf.split(X)):
            X_tr, y_tr = X[train_idx], y[train_idx]
            X_te, y_te = X[test_idx], y[test_idx]

            model = get_model(head, input_dim=X.shape[1])

            if head.startswith("mlp"):
                EPOCHS = 20
                BATCH_SIZE = 64
                LR = 1e-3

                model = model.to(device)
                model.train()

                ds = TensorDataset(torch.from_numpy(X_tr).float(), torch.from_numpy(y_tr).float().unsqueeze(1))
                loader = DataLoader(ds, batch_size=BATCH_SIZE, shuffle=True)

                optimizer = torch.optim.Adam(model.parameters(), lr=LR)
                criterion = nn.MSELoss()

                # Epoch progress bar
                epoch_pbar = tqdm(range(EPOCHS), desc=f"Training {feat}+{head} fold {fold+1}", leave=False)

                for epoch in epoch_pbar:
                    running_loss = 0.0
                    for xb, yb in loader:
                        xb, yb = xb.to(device), yb.to(device)
                        optimizer.zero_grad()
                        preds = model(xb)
                        loss = criterion(preds, yb)
                        loss.backward()
                        optimizer.step()
                        running_loss += loss.item()

                    avg_loss = running_loss / len(loader)
                    epoch_pbar.set_postfix(loss=f"{avg_loss:.4f}")

                model.eval()
                with torch.no_grad():
                    preds = model(torch.from_numpy(X_te).float().to(device)).cpu().numpy().squeeze()
            else:
                model.fit(X_tr, y_tr)
                preds = model.predict(X_te)

            rmse = mean_squared_error(y_te, preds, squared=False)
            r2   = r2_score(y_te, preds)
            mae  = mean_absolute_error(y_te, preds)

            fold_metrics.append({
                "rmse": rmse,
                "r2":   r2,
                "mae":  mae
            })

            # fold_pbar.update(1)

        # fold_pbar.close()

        rmse_vals = [m["rmse"] for m in fold_metrics]
        r2_vals   = [m["r2"]   for m in fold_metrics]
        mae_vals  = [m["mae"]  for m in fold_metrics]

        result_entry = {
            "feature": feat,
            "head":    head,
            "rmse_mean": np.mean(rmse_vals),
            "rmse_std":  np.std(rmse_vals),
            "r2_mean":   np.mean(r2_vals),
            "r2_std":    np.std(r2_vals),
            "mae_mean":  np.mean(mae_vals),
            "mae_std":   np.std(mae_vals),
        }
        results.append(result_entry)

        print(f"{feat} + {head}:  "
              f"RMSE {result_entry['rmse_mean']:.3f}±{result_entry['rmse_std']:.3f},  "
              f"R² {result_entry['r2_mean']:.3f}±{result_entry['r2_std']:.3f},  "
              f"MAE {result_entry['mae_mean']:.3f}±{result_entry['mae_std']:.3f}")

        outer_pbar.update(1)

outer_pbar.close()

os.makedirs(output_dir, exist_ok=True)
with open(output_dir / "results.json", "w") as f:
    json.dump(results, f, indent=2)

Feature+Head combos:   0%|          | 0/28 [00:00<?, ?it/s]

## Plots

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Suppose your `results` list contains one entry per fold:
# [
#   {"feature": ..., "head": ..., "r2": 0.75, "mae": 0.45},
#   ...
# ]
df = pd.DataFrame(results)

# 1. Melt into “long” form so we can Facet by metric
df_long = df.melt(
    id_vars=["feature", "head"],
    value_vars=["r2", "mae"],
    var_name="metric",
    value_name="value"
)

# 2. Set up one subplot per head
heads = df_long["head"].unique()
n_heads = len(heads)
fig, axes = plt.subplots(1, n_heads, figsize=(4*n_heads, 6), sharey=False)

for ax, head in zip(axes, heads):
    sub = df_long[df_long["head"] == head]
    # 3. Draw a boxplot of value by feature, with hue=metric
    #    (so R2 and MAE appear next to each other for each feature)
    pd.plotting.boxplot(
        sub, 
        by=["feature", "metric"], 
        column="value",
        ax=ax,
        rot=45,
        grid=False,
        showmeans=True
    )
    ax.set_title(head)
    ax.get_figure().suptitle("")  # remove default “Boxplot grouped by ...” title
    ax.set_xlabel("Feature extractor")
    ax.set_ylabel("Score")

plt.tight_layout()
plt.show()
