In [None]:
# -*- coding: utf-8 -*-

import os
import re
import time
import numpy as np
import pandas as pd
from pathlib import Path
from typing import List, Tuple

# =================== Config ===================
PATH_XLSX = r"C:\**\**\**\input.xlsx"
SHEET_NAME = 0

ID_COL_CANDIDATES = ["Name", "ID", "Molecule", "Mol", "SMILES"]

MISSING_COL_DROP_THRES = 0.30
LOW_VAR_THRES = 1e-8
CORR_THRES = 0.90

FINGERPRINT_NAME_PATTERNS = [
    r"fingerprint", r"\bfp\b", r"maccs", r"pubchem", r"ecfp", r"fcfp",
    r"atompair", r"torsion", r"avalon", r"estatefp", r"krfp", r"subfpc"
]
BINARY_AS_FP_MIN_UNIQUE = 2
BINARY_RATE_TOL = 0.99

WRITE_SUMMARY_SHEET = True

def ts() -> str:
    return time.strftime("%Y%m%d_%H%M%S")


def load_df(path, sheet):
    return pd.read_excel(path, sheet_name=sheet)


def split_id(df: pd.DataFrame, keys: List[str]) -> Tuple[pd.DataFrame, pd.DataFrame]:
    id_cols = []
    for c in df.columns:
        c_str = str(c)
        for k in keys:
            if re.search(rf"{re.escape(k)}", c_str, flags=re.I):
                id_cols.append(c)
                break
    id_cols = list(dict.fromkeys(id_cols))
    return (
        df[id_cols].copy() if id_cols else pd.DataFrame(index=df.index),
        df.drop(columns=id_cols, errors="ignore"),
    )


def detect_fp_cols(df_num: pd.DataFrame) -> List[str]:
    fp = set()
    name_re = re.compile("|".join(FINGERPRINT_NAME_PATTERNS), flags=re.I) if FINGERPRINT_NAME_PATTERNS else None

    # 1) Column-name heuristics
    if name_re:
        for c in df_num.columns:
            if name_re.search(str(c)):
                fp.add(c)

    # 2) Binary / near-binary heuristics
    for c in df_num.columns:
        s = df_num[c]
        if pd.api.types.is_numeric_dtype(s):
            vals = s.dropna().unique()
            if len(vals) <= BINARY_AS_FP_MIN_UNIQUE and set(np.round(vals, 6)).issubset({0, 1}):
                fp.add(c)
                continue
            if s.notna().any():
                in_01 = s.dropna().isin([0, 1]).mean()
                if in_01 >= BINARY_RATE_TOL:
                    fp.add(c)

    return sorted(fp)


def drop_high_missing(df_num: pd.DataFrame, thres: float):
    miss = df_num.isna().mean()
    drop_cols = miss[miss > thres].index.tolist()
    return df_num.drop(columns=drop_cols, errors="ignore"), drop_cols


def drop_low_var(df_num: pd.DataFrame, thres: float):
    var = df_num.var(skipna=True)
    drop_cols = var[var <= thres].index.tolist()
    return df_num.drop(columns=drop_cols, errors="ignore"), drop_cols


def corr_prune(df_num: pd.DataFrame, thres: float):
    df_imp = df_num.copy()
    for c in df_imp.columns:
        med = df_imp[c].median(skipna=True)
        df_imp[c] = df_imp[c].fillna(med)

    corr = df_imp.corr().abs()
    upper = corr.where(np.triu(np.ones(corr.shape), k=1).astype(bool))

    pairs = (
        upper.stack()
        .reset_index()
        .rename(columns={"level_0": "c1", "level_1": "c2", 0: "r"})
        .sort_values("r", ascending=False)
    )

    kept = set(df_num.columns)
    drops_pairs = []
    na_ratio = df_num.isna().mean().to_dict()
    variance = df_imp.var().to_dict()

    def worse_col(a, b):
        # Higher NA ratio is worse; higher variance is better (very rough heuristic).
        score = lambda x: -(na_ratio.get(x, 0.0)) * 2.0 + variance.get(x, 0.0)
        return a if score(a) < score(b) else b

    for _, row in pairs.iterrows():
        if row["r"] <= thres:
            break
        a, b = row["c1"], row["c2"]
        if a in kept and b in kept:
            w = worse_col(a, b)
            k = b if w == a else a  # kept one
            kept.remove(w)
            drops_pairs.append((k, w))

    kept_cols = [c for c in df_num.columns if c in kept]
    return df_num[kept_cols].copy(), drops_pairs


def main():
    raw = load_df(PATH_XLSX, SHEET_NAME)
    raw.columns = [str(c) for c in raw.columns]

    # Convert non-ID columns to numeric
    id_keys_lower = [k.lower() for k in ID_COL_CANDIDATES]
    for col in raw.columns:
        col_str = str(col)
        if not any(k in col_str.lower() for k in id_keys_lower):
            raw[col] = pd.to_numeric(raw[col], errors="coerce")

    out_dir = Path(os.path.dirname(PATH_XLSX) or ".")
    out_xlsx = out_dir / f"descriptors_dropna_{ts()}.xlsx"
    out_csv = out_dir / f"descriptors_dropna_{ts()}.csv"

    logs = {}

    id_df, feat_df = split_id(raw, ID_COL_CANDIDATES)
    logs["ID_Columns"] = list(id_df.columns)

    num_cols = [c for c in feat_df.columns if pd.api.types.is_numeric_dtype(feat_df[c])]
    non_num = [c for c in feat_df.columns if c not in num_cols]

    X = feat_df[num_cols].copy()
    logs["Dropped_NonNumeric"] = non_num

    fp_cols = detect_fp_cols(X)
    X = X.drop(columns=fp_cols, errors="ignore")
    logs["Dropped_FingerprintLike"] = fp_cols

    X, dropped_miss = drop_high_missing(X, MISSING_COL_DROP_THRES)
    logs["Dropped_HighMissing"] = dropped_miss

    X, dropped_lowvar = drop_low_var(X, LOW_VAR_THRES)
    logs["Dropped_LowVariance"] = dropped_lowvar

    if X.shape[1] >= 2:
        X, corr_drops = corr_prune(X, CORR_THRES)
    else:
        corr_drops = []
    logs["Dropped_CorrPairs(kept->dropped)"] = [f"{k} -> {d}" for (k, d) in corr_drops]

    # Drop ALL rows with any NaN (no imputation)
    kept_idx = X.dropna(axis=0).index
    X_clean = X.loc[kept_idx].reset_index(drop=True)
    id_sync = id_df.loc[kept_idx].reset_index(drop=True) if not id_df.empty else id_df
    removed_rows = X.shape[0] - X_clean.shape[0]

    # Export
    with pd.ExcelWriter(out_xlsx, engine="openpyxl") as w:
        pd.concat([id_sync, X_clean], axis=1).to_excel(w, sheet_name="descriptors_no_missing", index=False)
        pd.DataFrame({"Kept_Columns": X_clean.columns.tolist()}).to_excel(w, sheet_name="kept_columns", index=False)

        # Logs
        for k, v in logs.items():
            pd.DataFrame({k: v}).to_excel(w, sheet_name=k[:31], index=False)

        # Optional summary sheet (safe to delete/disable)
        if WRITE_SUMMARY_SHEET:
            summary = {
                "n_rows_raw": [raw.shape[0]],
                "n_rows_before_dropna": [X.shape[0]],
                "n_rows_after_dropna": [X_clean.shape[0]],
                "n_rows_removed": [removed_rows],
                "n_cols_raw": [raw.shape[1]],
                "n_numeric_candidate_cols": [len(num_cols)],
                "n_dropped_non_numeric": [len(non_num)],
                "n_dropped_fingerprint_like": [len(fp_cols)],
                "n_dropped_high_missing": [len(dropped_miss)],
                "n_dropped_low_variance": [len(dropped_lowvar)],
                "n_dropped_corr_pairs": [len(corr_drops)],
                "n_final_descriptors": [X_clean.shape[1]],
            }
            pd.DataFrame(summary).to_excel(w, sheet_name="summary", index=False)

    pd.concat([id_sync, X_clean], axis=1).to_csv(out_csv, index=False, encoding="utf-8-sig")

    print("Filtering completed (all rows containing NaN were removed).")
    print("Excel:", out_xlsx)
    print("CSV  :", out_csv)


if __name__ == "__main__":
    main()


In [None]:

import os
from pathlib import Path
import glob
import time
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA

IN_DIR = r"C:\**\**\**\PaDEL"
INPUT_GLOB = "structure_descriptors_no_NaN_*.xlsx"
SHEET_NAME = "structure_descriptors_complete"

ID_COL_CANDIDATES = ["Name", "ID", "Molecule", "Mol", "SMILES"]

TARGET_N   = 35   
N_VAR      = 35   
N_PCA      = 35  
VAR_EXPLAIN = 0.95 

def ts():
    return time.strftime("%Y%m%d_%H%M%S")

def latest_file_by_glob(folder, pattern):
    candidates = sorted(Path(folder).glob(pattern), key=lambda p: p.stat().st_mtime, reverse=True)
    if not candidates:
raise FileNotFoundError(
    f"No matching files found under {folder}: {pattern}"
)
    return str(candidates[0])

def split_id_cols(df: pd.DataFrame, keys):
    id_cols = []
    for c in df.columns:
        cs = str(c)
        for k in keys:
            if k.lower() in cs.lower():
                id_cols.append(c); break
    id_cols = list(dict.fromkeys(id_cols))
    return (df[id_cols].copy() if id_cols else pd.DataFrame(index=df.index),
            df.drop(columns=id_cols, errors="ignore"))

def main():
    input_path = latest_file_by_glob(IN_DIR, INPUT_GLOB)
    out_tag = ts()
    out_xlsx = Path(IN_DIR) / f"structure_descriptors_second_dedup_{out_tag}.xlsx"
    out_csv  = Path(IN_DIR) / f"structure_descriptors_second_dedup_{out_tag}.csv"

    raw = pd.read_excel(input_path, sheet_name=SHEET_NAME)
    raw.columns = [str(c) for c in raw.columns]

    id_df, X = split_id_cols(raw, ID_COL_CANDIDATES)
    X = X.apply(pd.to_numeric, errors="coerce")
    X = X.dropna(axis=0).reset_index(drop=True)

    var_series = X.var(ddof=0)
    topvar_cols = var_series.sort_values(ascending=False).head(min(N_VAR, X.shape[1])).index.tolist()

    scaler = StandardScaler(with_mean=True, with_std=True)
    Xs = scaler.fit_transform(X.values)
    n_comp = min(Xs.shape[0], Xs.shape[1])  # 样本数/特征数较小者
    pca = PCA(n_components=n_comp, svd_solver="full", random_state=0)
    Xp = pca.fit_transform(Xs)
    evr = pca.explained_variance_ratio_
    cum = np.cumsum(evr)
    k = int(np.searchsorted(cum, VAR_EXPLAIN) + 1)
    k = max(1, min(k, n_comp))

    loadings = pca.components_.T  # 形状：(n_features, n_components)

    pca_scores = (loadings[:, :k] ** 2).sum(axis=1)
    pca_scores = pd.Series(pca_scores, index=X.columns, name="PCA_Score")


    top_pca_cols = pca_scores.sort_values(ascending=False).head(min(N_PCA, X.shape[1])).index.tolist()


    union_cols = list(dict.fromkeys(top_pca_cols + topvar_cols)) 

    _p = pca_scores.reindex(union_cols).fillna(-1e9)
    _v = var_series.reindex(union_cols).fillna(-1e9)
    order = sorted(union_cols, key=lambda c: (_p[c], _v[c]), reverse=True)
    final_cols = order[:min(TARGET_N, len(order))]

    X_sel = X[final_cols].copy()
    out_df = pd.concat([id_df.reset_index(drop=True), X_sel.reset_index(drop=True)], axis=1)

    info_df = pd.DataFrame({
        "Input_File": [input_path],
        "Rows": [X.shape[0]],
        "Cols_Before": [X.shape[1]],
        "TopVar_Take": [len(topvar_cols)],
        "PCA_Take": [len(top_pca_cols)],
        "Final_Cols": [len(final_cols)],
        "PCA_Used_k": [k],
        "PCA_CumExplained": [float(cum[k-1])]
    })


    var_table = var_series.sort_values(ascending=False).rename("Variance").reset_index().rename(columns={"index":"Feature"})

    pca_table = pca_scores.sort_values(ascending=False).rename("PCA_Score").reset_index().rename(columns={"index":"Feature"})

    final_table = pd.DataFrame({
        "Feature": final_cols,
        "PCA_Score": [pca_scores[c] for c in final_cols],
        "Variance":  [var_series[c] for c in final_cols],
        "Selected_By": [("PCA" if c in top_pca_cols else "") + ("+VAR" if c in topvar_cols else "") for c in final_cols]
    })

with pd.ExcelWriter(out_xlsx, engine="openpyxl") as w:
    out_df.to_excel(w, sheet_name="Deduplicated_Data_Matrix", index=False)
    final_table.to_excel(w, sheet_name="Final_Feature_List", index=False)
    var_table.to_excel(w, sheet_name="Variance_Ranking", index=False)
    pca_table.to_excel(w, sheet_name="PCA_Scores", index=False)

    pd.DataFrame({
        "Component": np.arange(1, len(evr) + 1),
        "ExplainedVarianceRatio": evr,
        "Cumulative": cum
    }).to_excel(w, sheet_name="PCA_Explained_Variance", index=False)
    info_df.to_excel(w, sheet_name="Run_Info", index=False)

out_df.to_csv(out_csv, index=False, encoding="utf-8-sig")

print("Second-stage deduplication completed")
print("Input:", input_path)
print("Output:", out_xlsx)
print("CSV:", out_csv)
print(
    f"Final retained {len(final_cols)} features (target {TARGET_N}) | "
    f"Cumulative explained variance of first {k} PCs before PCA = {cum[k-1]:.4f}"
)

if __name__ == "__main__":
    main()
