In [13]:
import os
import glob
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score, accuracy_score, roc_auc_score
import joblib

# ----------------------
# Настройки
# ----------------------
DATA_DIR = r"C:\Users\Professional\Documents\VirusNeutr\parquet"
PARQUET_PATTERN = os.path.join(DATA_DIR, "*.parquet")
FILES = sorted(glob.glob(PARQUET_PATTERN))

# Ограничения для cell-level набора (чтобы не перегружать память)
MAX_TOTAL_CELLS_FOR_CELL_MODEL = 200_000
MAX_CELLS_PER_SAMPLE = 3000

# Random state
RANDOM_STATE = 42

# Feature completeness threshold (fraction of samples that must have a non-null value)
FEATURE_COMPLETE_THRESHOLD = 0.6  # 60% (настройка)

# ----------------------
# Функции помощники
# ----------------------
def drop_rows_with_nonfinite_numeric(df):
    """
    Удаляет строки, где хотя бы в одном числовом столбце присутствует NaN/inf/-inf.
    Возвращает очищенный df и количество удалённых строк.
    """
    num_cols = df.select_dtypes(include=[np.number]).columns
    if len(num_cols) == 0:
        return df.copy(), 0
    mask_finite = np.isfinite(df[num_cols].values).all(axis=1)
    removed = (~mask_finite).sum()
    return df.loc[mask_finite].copy(), int(removed)

# ----------------------
# 1) Сбор summary по всем файлам (с удалением строк с inf)
# ----------------------
summary_rows = []
total_rows_before = 0
total_rows_after = 0
total_rows_removed = 0

print(f"Найдено файлов: {len(FILES)}")
for p in tqdm(FILES, desc="Processing files for summary"):
    try:
        df = pd.read_parquet(p)
    except Exception as e:
        print(f"Warning: не удалось прочитать {p}: {e}")
        continue

    n_before = len(df)
    total_rows_before += n_before

    # Удаляем строки с inf/NaN в числовых колонках
    df_clean, removed = drop_rows_with_nonfinite_numeric(df)
    n_after = len(df_clean)
    total_rows_after += n_after
    total_rows_removed += removed

    # соберём summary
    summ = {
        "sample_file": os.path.basename(p),
        "n_cells_original": int(n_before),
        "n_cells_clean": int(n_after),
        "n_rows_removed": int(removed),
    }

    # процент инфицированных (если есть колонка label)
    if "label" in df_clean.columns:
        try:
            summ["pct_infected_true"] = float(100.0 * df_clean["label"].astype(bool).mean())
        except Exception:
            summ["pct_infected_true"] = None
    else:
        summ["pct_infected_true"] = None

    # статистики для всех числовых колонок на чистом df
    num_cols = df_clean.select_dtypes(include=[np.number]).columns
    for col in num_cols:
        series = df_clean[col]
        if series.dropna().empty:
            summ[f"{col}_median"] = None
            summ[f"{col}_mean"] = None
            summ[f"{col}_mad"] = None
            summ[f"{col}_p90"] = None
            summ[f"{col}_p99"] = None
        else:
            summ[f"{col}_median"] = float(series.median())
            summ[f"{col}_mean"] = float(series.mean())
            summ[f"{col}_mad"] = float((series - series.median()).abs().median())
            summ[f"{col}_p90"] = float(series.quantile(0.90))
            summ[f"{col}_p99"] = float(series.quantile(0.99))

    summary_rows.append(summ)

summary_df = pd.DataFrame(summary_rows)
summary_path = os.path.join(DATA_DIR, "summary_clean.parquet")
summary_df.to_parquet(summary_path, index=False)
print(f"Summary saved to: {summary_path}")
print(f"Total rows before: {total_rows_before}, after cleaning: {total_rows_after}, removed: {total_rows_removed}")


Найдено файлов: 1435


Processing files for summary:   0%|          | 0/1435 [00:00<?, ?it/s]

Processing files for summary: 100%|██████████| 1435/1435 [01:03<00:00, 22.57it/s]


Summary saved to: C:\Users\Professional\Documents\VirusNeutr\parquet\summary_clean.parquet
Total rows before: 48487827, after cleaning: 48463283, removed: 24544


In [14]:
# ----------------------
# 2) Sample-level модель (изменённый блок: выбираем только "полные" фичи + impute)
# ----------------------
# Отберём строки с доступной целевой метрикой
train_summary = summary_df.dropna(subset=["pct_infected_true"]).copy()
print("Samples with true label:", len(train_summary))

if len(train_summary) < 5:
    print("Недостаточно образцов с true меткой для обучения sample-level модели. Пропускаем.")
else:
    # Выбираем фичи: все колонки кроме sample_file и целевой
    exclude = {"sample_file", "pct_infected_true"}
    candidate_features = [c for c in train_summary.columns if c not in exclude]

    # 2.1 Рассчитываем completeness для каждой candidate feature
    completeness = train_summary[candidate_features].notna().mean(axis=0)
    # оставляем те фичи, у которых completeness >= порога
    thresh = FEATURE_COMPLETE_THRESHOLD
    selected_features = completeness[completeness >= thresh].index.tolist()

    # Если после фильтрации мало фич — понижаем порог автоматически, пока не найдём хотя бы 5 фич или не упадём до 0.1
    min_features_needed = 5
    min_thresh_allowed = 0.1
    while len(selected_features) < min_features_needed and thresh > min_thresh_allowed:
        thresh = max(thresh - 0.1, min_thresh_allowed)
        selected_features = completeness[completeness >= thresh].index.tolist()

    dropped_features = [f for f in candidate_features if f not in selected_features]
    print(f"Selected {len(selected_features)} features (threshold={thresh:.2f}). Dropped {len(dropped_features)} features due to low completeness.")
    if dropped_features:
        print("Example dropped features:", dropped_features[:10])

    if len(selected_features) == 0:
        print("Нет доступных фичей после фильтрации по полноте. Пропускаем обучение.")
    else:
        # Формируем X из выбранных фич
        X_df = train_summary[selected_features].apply(pd.to_numeric, errors='coerce')

        # Заполним NaN медианой каждой колонки
        X_imputed = X_df.fillna(X_df.median())

        # Убеждаемся, что остались только конечные значения
        mask_finite = np.isfinite(X_imputed.values).all(axis=1)
        n_dropped_rows = (~mask_finite).sum()
        if n_dropped_rows > 0:
            print(f"Dropping {n_dropped_rows} rows with non-finite values even после импутации.")
        X_final = X_imputed.loc[mask_finite].reset_index(drop=True)
        y_final = train_summary.loc[mask_finite, "pct_infected_true"].values.astype(float)

        if X_final.shape[0] < 5:
            print("После очистки и импутации слишком мало строк для обучения. Пропускаем.")
        else:
            # Train/test split
            X_tr, X_te, y_tr, y_te = train_test_split(X_final.values, y_final, test_size=0.2, random_state=RANDOM_STATE)

            # Обучаем RandomForest
            rf = RandomForestRegressor(n_estimators=300, random_state=RANDOM_STATE, n_jobs=-1)
            rf.fit(X_tr, y_tr)

            y_pred = rf.predict(X_te)
            print("Sample-level model results:")
            print("  MAE:", mean_absolute_error(y_te, y_pred))
            print("  R2 :", r2_score(y_te, y_pred))

            # Сохраняем модель и фичи
            joblib.dump({"model": rf, "feature_cols": selected_features}, os.path.join(DATA_DIR, "rf_sample_model_cleaned.joblib"))
            print("Saved sample-level model to rf_sample_model_cleaned.joblib")

Samples with true label: 1435
Selected 88 features (threshold=0.60). Dropped 5 features due to low completeness.
Example dropped features: ['FITC-H_median', 'FITC-H_mean', 'FITC-H_mad', 'FITC-H_p90', 'FITC-H_p99']
Sample-level model results:
  MAE: 0.6130846071769885
  R2 : 0.9441285631194766
Saved sample-level model to rf_sample_model_cleaned.joblib


In [17]:
X_df

Unnamed: 0,n_cells_original,n_cells_clean,n_rows_removed,FSC-H_median,FSC-H_mean,FSC-H_mad,FSC-H_p90,FSC-H_p99,SSC-H_median,SSC-H_mean,...,Area_to_Width_median,Area_to_Width_mean,Area_to_Width_mad,Area_to_Width_p90,Area_to_Width_p99,Time_norm_median,Time_norm_mean,Time_norm_mad,Time_norm_p90,Time_norm_p99
0,666,666,0,29596.0,1.011576e+05,23167.5,354817.5,521487.90,33548.5,70477.848348,...,,,,,,,,,,
1,654,654,0,28520.0,1.061099e+05,22276.0,362022.5,524272.00,22885.0,55081.481651,...,,,,,,,,,,
2,50000,50000,0,181051.5,1.835631e+05,40878.0,243697.2,291474.10,128241.0,157736.193900,...,,,,,,,,,,
3,50000,50000,0,175997.0,1.650866e+05,42580.0,238747.2,284554.07,122992.0,135971.194840,...,,,,,,,,,,
4,50000,50000,0,175647.5,1.692292e+05,42533.0,238151.1,283799.20,122293.0,143916.985400,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1430,37944,37862,82,4789083.0,4.769200e+06,713619.5,6149737.3,7687625.14,265554.0,295500.257012,...,31388.759884,31412.737088,5004.545591,41283.333654,51777.858414,0.458294,0.486180,0.252038,0.908306,0.991570
1431,40000,39893,107,4800526.0,4.774798e+06,718477.0,6166968.0,7692920.84,263585.0,293657.929110,...,31638.723404,31647.468098,5083.157030,41707.107110,51880.123972,0.464877,0.487930,0.251127,0.904686,0.990898
1432,40000,39904,96,4831508.0,4.811184e+06,724216.0,6193953.3,7768334.40,264182.5,291829.083851,...,31867.325062,31949.631674,5147.668615,42052.228952,52465.496475,0.455257,0.479870,0.247533,0.901092,0.990797
1433,38145,38043,102,4846741.0,4.821535e+06,709524.0,6201318.4,7757935.32,263419.0,293500.605499,...,31758.390476,31809.718301,5064.701797,41914.880710,52446.521542,0.470803,0.495108,0.251720,0.912228,0.991377


In [18]:
# ----------------------
# 3) Cell-level модель (если есть label)
#    Собираем подпоследовательности клеток из файлов, предварительно отбрасывая строки с inf
# ----------------------
cell_frames = []
collected = 0
print("\nCollecting cells for cell-level model (dropping rows with inf/NaN)...")
for p in tqdm(FILES, desc="Collecting cells"):
    try:
        df = pd.read_parquet(p)
    except Exception as e:
        print(f"Warning: не удалось прочитать {p}: {e}")
        continue

    if "label" not in df.columns:
        continue

    # очистка строк с inf/NaN
    df_clean, removed = drop_rows_with_nonfinite_numeric(df)
    if df_clean.empty:
        continue

    # подвыборка, чтобы не захватить слишком много от одного файла
    take = min(len(df_clean), MAX_CELLS_PER_SAMPLE)
    sample_part = df_clean.sample(n=take, random_state=RANDOM_STATE) if len(df_clean) > take else df_clean
    cell_frames.append(sample_part)
    collected += len(sample_part)
    if collected >= MAX_TOTAL_CELLS_FOR_CELL_MODEL:
        break

print(f"Collected approx. {collected} cells from {len(cell_frames)} files.")

if collected < 100:
    print("Недостаточно размеченных клеток для обучения cell-level модели. Пропускаем.")
else:
    cell_all = pd.concat(cell_frames, ignore_index=True)
    # ещё раз удалить строки с нечисловыми в числовых колонках (страховка)
    cell_all, removed2 = drop_rows_with_nonfinite_numeric(cell_all)
    print(f"After final cleaning, cell dataset shape: {cell_all.shape} (removed {removed2} more rows)")

    # Формируем фичи и таргет
    if "label" not in cell_all.columns:
        print("В собранных данных нет label — пропускаем cell-level модель.")
    else:
        y_cell = cell_all["label"].astype(int).values
        # используем все числовые колонки кроме label
        feat_cols_cell = [c for c in cell_all.select_dtypes(include=[np.number]).columns if c != "label"]
        X_cell_df = cell_all[feat_cols_cell].apply(pd.to_numeric, errors='coerce')
        # отбрасываем строки с NaN/inf (если остались)
        mask_finite_cell = np.isfinite(X_cell_df.values).all(axis=1)
        X_cell_df = X_cell_df.loc[mask_finite_cell].reset_index(drop=True)
        y_cell = y_cell[mask_finite_cell]

        print(f"Cell-level feature matrix: {X_cell_df.shape}, positives: {y_cell.sum()}, negatives: {len(y_cell)-y_cell.sum()}")

        if X_cell_df.shape[0] < 100:
            print("После очистки слишком мало клеток для обучения — пропускаем.")
        else:
            # Train/test
            Xc_tr, Xc_te, yc_tr, yc_te = train_test_split(
                X_cell_df.values, y_cell, test_size=0.2, random_state=RANDOM_STATE, stratify=y_cell
            )

            clf = RandomForestClassifier(n_estimators=300, random_state=RANDOM_STATE, n_jobs=-1, class_weight="balanced")
            clf.fit(Xc_tr, yc_tr)

            yc_pred = clf.predict(Xc_te)
            yc_prob = clf.predict_proba(Xc_te)[:, 1]

            print("Cell-level model results:")
            print("  Accuracy:", accuracy_score(yc_te, yc_pred))
            try:
                print("  ROC AUC :", roc_auc_score(yc_te, yc_prob))
            except Exception as e:
                print("  ROC AUC : couldn't compute:", e)

            joblib.dump({"model": clf, "features": feat_cols_cell}, os.path.join(DATA_DIR, "rf_cell_model_cleaned.joblib"))
            print("Saved cell-level model to rf_cell_model_cleaned.joblib")

print("\nAll done.")


Collecting cells for cell-level model (dropping rows with inf/NaN)...


Collecting cells:   5%|▍         | 68/1435 [00:00<00:18, 74.25it/s]


Collected approx. 202320 cells from 69 files.
After final cleaning, cell dataset shape: (202320, 15) (removed 0 more rows)
Cell-level feature matrix: (202320, 14), positives: 8075, negatives: 194245
Cell-level model results:
  Accuracy: 0.9980476472914195
  ROC AUC : 0.9992816754111956
Saved cell-level model to rf_cell_model_cleaned.joblib

All done.


# 2nd way

In [1]:
# train_in_memory_fixed.py
import os
import numpy as np
import pandas as pd
import joblib
import logging
from sklearn.model_selection import StratifiedShuffleSplit, GroupShuffleSplit, train_test_split
from sklearn.metrics import roc_auc_score, average_precision_score, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from tqdm import tqdm

# LightGBM
import lightgbm as lgb

# ---------------- USER CONFIG ----------------
PARQUET_PATH = r"E:\\parquet\\merged_processed.parquet"
MODEL_OUT = r"E:\\parquet\\lgbm_inmemory_booster.joblib"
TEST_SIZE = 0.15
RANDOM_STATE = 42
EARLY_STOPPING_ROUNDS = 50
NUM_BOOST_ROUND = 2000
VERBOSE = 100   # how often to log eval during training
DROP_COLS = ['FITC-H']  # drop FITC-H as requested
# base lgb params (will add scale_pos_weight later)
LGB_PARAMS = {
    "objective": "binary",
    "boosting_type": "gbdt",
    "learning_rate": 0.05,
    "num_leaves": 127,
    "verbosity": -1,
    "n_jobs": -1,
    # 'metric' will be set in train call via valid_sets (or we can set here)
    "metric": "auc"
}
# ----------------------------------------------

# Logging
logger = logging.getLogger("train_in_memory_fixed")
logger.setLevel(logging.INFO)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter("%(asctime)s %(levelname)s %(message)s"))
logger.addHandler(ch)

def detect_sample_id_column(df):
    candidates = ['sample_file','sample','sample_id','filename','file','orig_file','source_file']
    for c in candidates:
        if c in df.columns:
            return c
    for c in df.columns:
        s = str(c).lower()
        if 'sample' in s or 'file' in s or 'filename' in s:
            return c
    return None

def prepare_data(df, drop_cols=None, label_col='label'):
    df = df.loc[:, ~df.columns.str.startswith('__')].copy()

    if drop_cols:
        for c in drop_cols:
            if c in df.columns:
                df.drop(columns=[c], inplace=True)
                logger.info(f"Dropped column: {c}")

    numeric = df.select_dtypes(include=[np.number]).columns.tolist()
    if label_col in numeric:
        numeric.remove(label_col)
    feature_cols = sorted([c for c in numeric if not str(c).startswith("__")])

    if label_col not in df.columns:
        logger.warning(f"Label column '{label_col}' not found — creating zeros.")
        df[label_col] = 0

    lbl = df[label_col]
    if lbl.dtype == 'bool':
        df[label_col] = lbl.astype(int)
    else:
        df[label_col] = lbl.replace({'True':1,'TRUE':1,'true':1,'False':0,'FALSE':0,'false':0})
        df[label_col] = pd.to_numeric(df[label_col], errors='coerce').fillna(0).astype(int)

    for c in feature_cols:
        df[c] = pd.to_numeric(df[c], errors='coerce')
    df[feature_cols] = df[feature_cols].replace([np.inf, -np.inf], np.nan)
    medians = df[feature_cols].median()
    df[feature_cols] = df[feature_cols].fillna(medians)

    return df, feature_cols

def main():
    logger.info("Loading parquet into memory: %s", PARQUET_PATH)
    df = pd.read_parquet(PARQUET_PATH)
    logger.info("Loaded shape: %s", df.shape)

    sample_col = detect_sample_id_column(df)
    if sample_col is not None:
        logger.info("Detected sample-id column: %s — group-aware split will be used.", sample_col)
    else:
        logger.info("No sample-id column detected — will perform stratified split by label (possible leakage).")

    df, feature_cols = prepare_data(df, drop_cols=DROP_COLS, label_col='label')
    logger.info("Using %d features. Example: %s", len(feature_cols), feature_cols[:10])

    n_total = len(df)
    n_pos = int(df['label'].sum())
    pos_rate = n_pos / max(1, n_total)
    logger.info("Total rows=%d, positives=%d, positive rate=%.6f", n_total, n_pos, pos_rate)

    X = df[feature_cols].values.astype(np.float32)
    y = df['label'].values.astype(np.int8)
    logger.info("Built X shape=%s, y shape=%s", X.shape, y.shape)

    # split
    if sample_col is not None:
        groups = df[sample_col].values
        splitter = GroupShuffleSplit(n_splits=1, test_size=TEST_SIZE, random_state=RANDOM_STATE)
        train_idx, val_idx = next(splitter.split(X, y, groups))
        logger.info("GroupShuffleSplit done.")
    else:
        if len(np.unique(y)) > 1 and np.min(np.bincount(y)) > 1:
            sss = StratifiedShuffleSplit(n_splits=1, test_size=TEST_SIZE, random_state=RANDOM_STATE)
            train_idx, val_idx = next(sss.split(X, y))
            logger.info("Stratified split done.")
        else:
            train_idx, val_idx = train_test_split(np.arange(len(y)), test_size=TEST_SIZE, random_state=RANDOM_STATE)
            logger.info("Random split done.")

    X_train, X_val = X[train_idx], X[val_idx]
    y_train, y_val = y[train_idx], y[val_idx]
    logger.info("Train shape: %s, Val shape: %s", X_train.shape, X_val.shape)
    logger.info("Train pos rate=%.6f Val pos rate=%.6f", y_train.mean(), y_val.mean())

    # compute scale_pos_weight
    n_pos_tr = int(y_train.sum())
    n_neg_tr = len(y_train) - n_pos_tr
    if n_pos_tr == 0:
        scale_pos_weight = 1.0
        logger.warning("No positives in train: scale_pos_weight set to 1.0")
    else:
        scale_pos_weight = max(1.0, n_neg_tr / max(1.0, n_pos_tr))
    logger.info("scale_pos_weight = %.3f", scale_pos_weight)

    # prepare lgb datasets
    dtrain = lgb.Dataset(X_train, label=y_train, feature_name=feature_cols)
    dval = lgb.Dataset(X_val, label=y_val, reference=dtrain, feature_name=feature_cols)

    # update params with scale_pos_weight
    params = dict(LGB_PARAMS)
    params['scale_pos_weight'] = scale_pos_weight

    # callbacks: try to use early stopping / logging; if not available fallback to training without callbacks
    callbacks = []
    try:
        callbacks.append(lgb.early_stopping(EARLY_STOPPING_ROUNDS))
        callbacks.append(lgb.log_evaluation(VERBOSE))
    except Exception as e:
        logger.warning("LightGBM callbacks not available or failed to create: %s. Will still try training.", e)
        callbacks = None

    logger.info("Start lgb.train with params: %s", {k: params[k] for k in ['learning_rate','num_leaves','scale_pos_weight'] if k in params})
    try:
        if callbacks:
            booster = lgb.train(params, dtrain,
                                num_boost_round=NUM_BOOST_ROUND,
                                valid_sets=[dtrain, dval],
                                valid_names=['train','valid'],
                                callbacks=callbacks)
        else:
            booster = lgb.train(params, dtrain,
                                num_boost_round=NUM_BOOST_ROUND,
                                valid_sets=[dtrain, dval],
                                valid_names=['train','valid'])
    except TypeError as e:
        logger.warning("lgb.train() raised TypeError: %s — trying without callbacks/valid_sets", e)
        booster = lgb.train(params, dtrain, num_boost_round=NUM_BOOST_ROUND)

    # predictions on validation
    logger.info("Predicting on validation set...")
    probs_val = booster.predict(X_val, num_iteration=booster.best_iteration or None)
    try:
        roc = roc_auc_score(y_val, probs_val)
    except Exception:
        roc = float('nan')
    try:
        pr = average_precision_score(y_val, probs_val)
    except Exception:
        pr = float('nan')

    preds_val = (probs_val >= 0.5).astype(int)
    acc = accuracy_score(y_val, preds_val)
    prec = precision_score(y_val, preds_val, zero_division=0)
    rec = recall_score(y_val, preds_val, zero_division=0)
    f1 = f1_score(y_val, preds_val, zero_division=0)
    cm = confusion_matrix(y_val, preds_val)

    logger.info("Validation metrics: ROC AUC=%.6f, PR AUC=%.6f", roc, pr)
    logger.info("Val acc=%.6f precision=%.6f recall=%.6f f1=%.6f", acc, prec, rec, f1)
    logger.info("Confusion matrix:\n%s", cm)

    # feature importance by gain
    try:
        names = booster.feature_name()
        gains = booster.feature_importance(importance_type='gain')
        fi = sorted(zip(names, gains), key=lambda x: x[1], reverse=True)
        logger.info("Top features by gain:")
        for name, g in fi[:20]:
            logger.info("  %s: %f", name, g)
    except Exception as e:
        logger.warning("Couldn't extract feature importances: %s", e)

    # save booster + feature list
    logger.info("Saving booster model and feature list to: %s", MODEL_OUT)
    joblib.dump({'booster': booster, 'features': feature_cols}, MODEL_OUT)

    logger.info("Done.")

if __name__ == "__main__":
    main()

2025-12-02 09:43:11,478 INFO Loading parquet into memory: E:\\parquet\\merged_processed.parquet
2025-12-02 09:43:19,421 INFO Loaded shape: (48463283, 15)
2025-12-02 09:43:19,422 INFO No sample-id column detected — will perform stratified split by label (possible leakage).
2025-12-02 09:43:22,614 INFO Dropped column: FITC-H
2025-12-02 09:43:53,234 INFO Using 13 features. Example: ['FITC-A', 'FITC_asinh', 'FITC_log10', 'FITC_pct', 'FITC_raw', 'FITC_z_robust', 'FSC-A', 'FSC-H', 'FSC_ratio', 'SSC-A']
2025-12-02 09:43:53,286 INFO Total rows=48463283, positives=3520061, positive rate=0.072634
2025-12-02 09:43:57,152 INFO Built X shape=(48463283, 13), y shape=(48463283,)
2025-12-02 09:44:11,049 INFO Stratified split done.
2025-12-02 09:44:17,590 INFO Train shape: (41193790, 13), Val shape: (7269493, 13)
2025-12-02 09:44:17,614 INFO Train pos rate=0.072634 Val pos rate=0.072634
2025-12-02 09:44:17,636 INFO scale_pos_weight = 12.768
2025-12-02 09:44:17,637 INFO Start lgb.train with params: {'le

Training until validation scores don't improve for 50 rounds
[100]	train's auc: 0.993302	valid's auc: 0.993309
[200]	train's auc: 0.995023	valid's auc: 0.994988
[300]	train's auc: 0.995685	valid's auc: 0.995618
[400]	train's auc: 0.996097	valid's auc: 0.995992
[500]	train's auc: 0.996407	valid's auc: 0.996266
[600]	train's auc: 0.99663	valid's auc: 0.996457
[700]	train's auc: 0.996828	valid's auc: 0.996632
[800]	train's auc: 0.997008	valid's auc: 0.996783


KeyboardInterrupt: 

# 3d way

In [1]:
# %%cell
# Конфигурация + импорты
PARQUET_PATH = r"E:\\parquet\\merged_processed.parquet"  # <- ваш путь
LABEL_COL = "label"
DROP_COL = "FITC-H"

# Настройки для потоковой обработки / подвыборки
VAL_FRAC = 0.05           # доля для валидации (при полном чтении) или размер валидации при потоковой (как доля)
MAX_VAL_SAMPLES_PER_CLASS = 200_000  # ограничение для валидационного набора на класс в режиме стрима
TRAIN_SUBSAMPLE_FRAC = 0.1  # если памяти нет, можно обучаться на подвыборке 10% (если выбрано)

# Параметры LightGBM (можно править)
LGB_PARAMS = {
    'objective': 'binary',
    'metric': 'auc',
    'boosting_type': 'gbdt',
    'learning_rate': 0.05,
    'num_leaves': 31,
    'max_bin': 127,
    'feature_fraction': 0.7,
    'bagging_fraction': 0.7,
    'bagging_freq': 1,
    'num_threads': 31,   # подберите под вашу машину
    'verbose': -1,
    'device': 'gpu',
}

# Импорты
import os, time, math, tempfile, joblib, logging, tracemalloc
from pathlib import Path
from collections import defaultdict
import numpy as np
import pandas as pd

import psutil
from tqdm.notebook import tqdm

# Optional libs: polars (fast parquet), pyarrow (stream)
try:
    import polars as pl
except Exception as e:
    raise RuntimeError("polars required: pip install polars") from e

try:
    import pyarrow.dataset as ds
    import pyarrow as pa
except Exception:
    ds = None

# ML libs
from sklearn.model_selection import train_test_split
from sklearn.linear_model import SGDClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score, accuracy_score, precision_recall_fscore_support
from sklearn.utils import shuffle

try:
    import lightgbm as lgb
    LGB_AVAILABLE = True
except Exception:
    LGB_AVAILABLE = False

# Logging
LOGFILE = "training_notebook.log"
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[logging.StreamHandler(), logging.FileHandler(LOGFILE, mode='a')]
)
logger = logging.getLogger("flow_training")
logger.info("Notebook training script started")


2025-12-02 11:43:25,236 - INFO - Notebook training script started


In [2]:
# %%cell
# Утилиты

def memory_info():
    mem = psutil.virtual_memory()
    return {"total_GB": mem.total / 1e9, "available_GB": mem.available / 1e9, "used_GB": mem.used / 1e9}

def estimate_memory_needed(n_rows, n_cols, bytes_per_value=4, overhead_factor=6.0):
    """
    Очень грубая оценка памяти (в байтах) для pandas/polars в памяти.
    bytes_per_value: 4 for float32
    overhead_factor: эмпирический множитель (pandas сильнее расходует память)
    """
    base = n_rows * n_cols * bytes_per_value
    return base * overhead_factor / 1e9  # в GB

def df_preview(path, n=5):
    # быстрый preview через polars
    return pl.read_parquet(path, n_rows=n).to_pandas()

logger.info(f"System memory: {memory_info()}")

2025-12-02 11:43:25,253 - INFO - System memory: {'total_GB': 68.623040512, 'available_GB': 58.087292928, 'used_GB': 10.535747584}


In [3]:
# %%cell
# Быстрая оценка числа строк и столбцов без загрузки всего файла
# Polars предоставляет scanning metadata, или используем pyarrow dataset

def get_parquet_shape(path):
    try:
        # polars scan to get height (fast metadata read)
        lf = pl.scan_parquet(path)
        # materialize schema and n_rows
        # polars currently can't give n_rows directly from scan, so fallback to pyarrow
    except Exception:
        lf = None
    # fallback via pyarrow
    if ds is not None:
        dataset = ds.dataset(path, format="parquet")
        n_rows = 0
        cols = None
        for frag in dataset.get_fragments():
            try:
                md = frag.metadata
                n_rows += md.num_rows
            except Exception:
                # fallback: smaller cost - read fragment table
                pass
        # columns via sampling small read
        sample = pl.read_parquet(path, n_rows=1)
        cols = sample.columns
        return int(n_rows) if n_rows>0 else None, cols
    else:
        # worst case read tiny chunk
        tmp = pl.read_parquet(path, n_rows=10)
        return None, tmp.columns

n_rows_est, cols = get_parquet_shape(PARQUET_PATH)
logger.info(f"Estimated rows: {n_rows_est}, columns: {cols}")

2025-12-02 11:43:25,402 - INFO - Estimated rows: 48463283, columns: ['FSC-H', 'SSC-H', 'FITC-H', 'FSC-A', 'SSC-A', 'FITC-A', 'label', 'FITC_raw', 'FITC_log10', 'FITC_asinh', 'FITC_pct', 'FITC_z_robust', 'FSC_ratio', 'SSC_ratio', 'gmm_prob_infected', '__index_level_0__']


In [4]:
# %%cell
# Решение: если хватает памяти — full load, иначе streaming fallback

# получим точное число строк, если polars может быстро посчитать
try:
    # polars can compute n_rows via read_parquet with n_rows=0 ? => we'll try reading statistics
    # but if file is large, better to avoid full scan; rely on earlier estimate if available
    if n_rows_est is None:
        # attempt one fast metadata call via polars (may still be inexpensive)
        n_rows_est = pl.read_parquet(PARQUET_PATH, n_rows=0).shape[0]
except Exception:
    pass

n_cols = len(cols) if cols is not None else 15
if n_rows_est is None:
    # fallback assume large (conservative)
    n_rows_est = 48_463_283

# estimated memory needed (GB)
est_needed_gb = estimate_memory_needed(n_rows_est, n_cols, bytes_per_value=4, overhead_factor=6.0)
avail = memory_info()['available_GB']
logger.info(f"Estimated memory needed (GB): {est_needed_gb:.2f}, available (GB): {avail:.2f}")

USE_FULL_LOAD = avail > est_needed_gb * 1.5  # safety margin

logger.info(f"USE_FULL_LOAD = {USE_FULL_LOAD}")

2025-12-02 11:43:25,418 - INFO - Estimated memory needed (GB): 18.61, available (GB): 58.04
2025-12-02 11:43:25,419 - INFO - USE_FULL_LOAD = True


In [12]:
# %%cell
# Full load + LightGBM training (исправленная версия)
if USE_FULL_LOAD:
    t0 = time.time()
    logger.info("Loading full parquet into memory with Polars...")
    df_pl = pl.read_parquet(PARQUET_PATH)
    logger.info(f"Loaded Polars DF: rows={df_pl.height}, cols={len(df_pl.columns)} in {time.time()-t0:.1f}s")
    
    # Drop column FITC-H if present
    if DROP_COL in df_pl.columns:
        df_pl = df_pl.drop(DROP_COL)
        logger.info(f"Dropped column {DROP_COL}")
    
    # Cast numeric columns to float32 where possible, label to int8
    def cast_float32(pl_df, label_col=LABEL_COL):
        cols = pl_df.columns
        casts = []
        for c in cols:
            if c == label_col:
                casts.append(pl.col(c).cast(pl.Int8).alias(c))
            else:
                # Попытка кастить все остальные колонки в Float32 (если это невозможно, результат будет null)
                casts.append(pl.col(c).cast(pl.Float32).alias(c))
        return pl_df.with_columns(casts)
    
    df_pl = cast_float32(df_pl, LABEL_COL)
    
    # Convert to pandas for sklearn/lightgbm (polars -> pandas uses memory; but we already checked memory)
    df = df_pl.to_pandas()
    # df = df.dropna()
    logger.info("Converted to pandas DataFrame (in-memory)")
    
    # Split features/labels
    X = df.drop(columns=[LABEL_COL, "__index_level_0__"])
    y = df[LABEL_COL].astype(int)
    del df_pl  # free polars object if still referenced

    # Stratified split
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=VAL_FRAC, stratify=y, random_state=42)
    logger.info(f"Train size: {X_train.shape}, Val size: {X_val.shape}")
    
    # Optionally subsample training for faster experiments
    # X_train, y_train = sklearn.utils.shuffle(X_train, y_train, random_state=42)
    
    # Train LightGBM
    if LGB_AVAILABLE:
        lgb_train = lgb.Dataset(X_train, label=y_train)
        lgb_val = lgb.Dataset(X_val, label=y_val, reference=lgb_train)
        evals_result = {}
        logger.info("Starting LightGBM training (in-memory). GPU auto-detect not enforced here.")
        t0 = time.time()
        # callbacks: early stopping + periodic logging + record evals into dict
        callbacks = [lgb.early_stopping(100), lgb.log_evaluation(50), lgb.record_evaluation(evals_result)]
        gbm = lgb.train(
            LGB_PARAMS,
            lgb_train,
            num_boost_round=2000,
            valid_sets=[lgb_train, lgb_val],
            valid_names=['train', 'valid'],
            callbacks=callbacks
        )
        logger.info(f"LightGBM finished in {(time.time()-t0)/60:.2f} min. Best iter: {getattr(gbm, 'best_iteration', None)}")
        # Save model
        model_path = "lgb_model_inmemory.txt"
        gbm.save_model(model_path)
        logger.info(f"Saved model to {model_path}")
        
        # Eval metrics
        y_pred = gbm.predict(X_val, num_iteration=gbm.best_iteration)
        auc = roc_auc_score(y_val, y_pred)
        y_bin = (y_pred >= 0.5).astype(int)
        acc = accuracy_score(y_val, y_bin)
        prec, rec, f1, _ = precision_recall_fscore_support(y_val, y_bin, average='binary')
        logger.info(f"Val metrics - AUC: {auc:.5f}, Acc: {acc:.4f}, Prec: {prec:.4f}, Rec: {rec:.4f}, F1: {f1:.4f}")
        
        # Save training history (if recorded)
        try:
            train_auc = evals_result['train']['auc']
            valid_auc = evals_result['valid']['auc']
            hist_df = pd.DataFrame({'train_auc': train_auc, 'valid_auc': valid_auc})
            hist_df.to_csv("lgb_training_history.csv", index=False)
            logger.info("Saved training history to lgb_training_history.csv")
        except Exception:
            logger.warning("Could not record evals_result history; skipping save.")
        
        # SHAP explanation on a small sample (expensive) - run only on subset
        try:
            import shap
            sample_idx = np.random.choice(X_val.shape[0], size=min(2000, X_val.shape[0]), replace=False)
            explainer = shap.TreeExplainer(gbm)
            shap_values = explainer.shap_values(X_val.iloc[sample_idx])
            # plot summary (in notebook)
            shap.summary_plot(shap_values, X_val.iloc[sample_idx], show=True)
            logger.info("SHAP summary plotted for a sample of validation set")
        except Exception as e:
            logger.warning(f"SHAP not executed: {e}")
    else:
        logger.warning("LightGBM not available. Consider pip install lightgbm or use fallback SGD.")

2025-12-02 11:47:03,363 - INFO - Loading full parquet into memory with Polars...
2025-12-02 11:47:04,961 - INFO - Loaded Polars DF: rows=48463283, cols=16 in 1.6s
2025-12-02 11:47:04,964 - INFO - Dropped column FITC-H
2025-12-02 11:47:06,834 - INFO - Converted to pandas DataFrame (in-memory)
2025-12-02 11:47:28,765 - INFO - Train size: (46040118, 13), Val size: (2423165, 13)
2025-12-02 11:47:28,849 - INFO - Starting LightGBM training (in-memory). GPU auto-detect not enforced here.


Training until validation scores don't improve for 100 rounds
[50]	train's auc: 0.988644	valid's auc: 0.988651
[100]	train's auc: 0.990668	valid's auc: 0.990658
[150]	train's auc: 0.991816	valid's auc: 0.991814
[200]	train's auc: 0.992409	valid's auc: 0.992389
[250]	train's auc: 0.992837	valid's auc: 0.992807
[300]	train's auc: 0.993151	valid's auc: 0.993124
[350]	train's auc: 0.993377	valid's auc: 0.993366
[400]	train's auc: 0.993578	valid's auc: 0.993565
[450]	train's auc: 0.993561	valid's auc: 0.993546


LightGBMError: Check failed: (best_split_info.left_count) > (0) at D:\a\1\s\lightgbm-python\src\treelearner\serial_tree_learner.cpp, line 846 .


In [None]:
import seaborn as sns
sns.set_theme(style="ticks")

# df = sns.load_dataset("penguins")
sns.pairplot(df[:30000], hue="label")

ValueError: input must be categorical

In [7]:
pip install datashader

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com
Collecting datashader
  Downloading datashader-0.18.2-py3-none-any.whl.metadata (7.6 kB)
Collecting colorcet (from datashader)
  Downloading colorcet-3.1.0-py3-none-any.whl.metadata (6.3 kB)
Collecting multipledispatch (from datashader)
  Downloading multipledispatch-1.0.0-py3-none-any.whl.metadata (3.8 kB)
Collecting numba (from datashader)
  Downloading numba-0.62.1-cp312-cp312-win_amd64.whl.metadata (2.9 kB)
Collecting param (from datashader)
  Downloading param-2.3.1-py3-none-any.whl.metadata (22 kB)
Collecting pyct (from datashader)
  Downloading pyct-0.6.0-py3-none-any.whl.metadata (7.2 kB)
Collecting toolz (from datashader)
  Downloading toolz-1.1.0-py3-none-any.whl.metadata (5.1 kB)
Collecting xarray (from datashader)
  Downloading xarray-2025.11.0-py3-none-any.whl.metadata (12 kB)
Collecting llvmlite<0.46,>=0.45.0dev0 (from numba->datashader)
  Downloading llvmlite-0.45.1-cp312-cp312-win_amd64.whl.metadat


[notice] A new release of pip is available: 25.1.1 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
# analyze_folder.py
"""
End-to-end pipeline for flow cytometry parquet files:
 - merge parquet files into one merged.parquet (streaming, pyarrow)
 - clean rows with non-finite numeric values
 - (optionally) add simple derived features if missing (FITC_log10, FITC_asinh, ratios)
 - build cell-level classifier (if 'label' exists)
 - build sample-level regressor (if sample-level true percent exists in files)
 - run inference for all files and save summary Excel + plots
 - save models and metadata (feature lists, scaler)
 
Requirements:
  pip install pandas pyarrow scikit-learn joblib matplotlib tqdm openpyxl
Run:
  python analyze_folder.py
"""

import os
import glob
import sys
import math
from tqdm import tqdm
import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sklearn.ensemble import RandomForestClassifier, RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, roc_auc_score, mean_absolute_error, r2_score
from sklearn.preprocessing import RobustScaler
import joblib
import matplotlib.pyplot as plt

# -----------------------
# USER CONFIGURATION
# -----------------------
DATA_DIR = r"C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet"   # <- папка с parquet
MERGED_PATH = os.path.join(DATA_DIR, "merged.parquet")
SUMMARY_PATH = os.path.join(DATA_DIR, "summary_from_merged.parquet")
EXCEL_OUT = os.path.join(DATA_DIR, "analysis_summary.xlsx")
CELL_MODEL_PATH = os.path.join(DATA_DIR, "cell_level_rf.joblib")
SAMPLE_MODEL_PATH = os.path.join(DATA_DIR, "sample_level_rf.joblib")
PLOTS_DIR = os.path.join(DATA_DIR, "plots")
RANDOM_STATE = 42

# Limits to avoid OOM
MAX_TRAIN_CELLS = 300_000
MAX_CELLS_PER_SAMPLE = 5_000

# Feature engineering defaults (if missing)
ASINH_COFACTOR = 1000.0
LOG_SHIFT = 1.0

# ensure output dirs
os.makedirs(PLOTS_DIR, exist_ok=True)

# -----------------------
# Helpers
# -----------------------
def list_parquets(data_dir):
    return sorted(glob.glob(os.path.join(data_dir, "*.parquet")))

def drop_rows_with_nonfinite_numeric(df):
    num_cols = df.select_dtypes(include=[np.number]).columns
    if len(num_cols) == 0:
        return df.copy(), 0
    mask = np.isfinite(df[num_cols].values).all(axis=1)
    removed = (~mask).sum()
    return df.loc[mask].copy(), int(removed)

def safe_add_feature_columns(df):
    """
    Add basic derived features if they aren't present:
     - FITC_raw (from FITC-A or B530-A)
     - FITC_log10
     - FITC_asinh
     - FITC_pct (rank pct)
     - FSC_ratio = FSC-A / FSC-H
     - SSC_ratio = SSC-A / SSC-H
    Operates in-place on a copy and returns df.
    """
    df = df.copy()
    # detect FITC column
    fitc_candidates = ['FITC-A', 'B530-A', 'FITC_raw', 'FITC']
    fitc_col = None
    for c in fitc_candidates:
        if c in df.columns:
            fitc_col = c
            break
    # if FITC not found, try FITC-A with case-insensitive search
    if fitc_col is None:
        for c in df.columns:
            if str(c).lower().startswith('fitc') or 'b530' in str(c).lower():
                fitc_col = c
                break

    if fitc_col is not None:
        df['FITC_raw'] = pd.to_numeric(df[fitc_col], errors='coerce').astype(float)
    elif 'FITC_raw' not in df.columns:
        df['FITC_raw'] = np.nan

    # Clean raw: clip negatives to 0
    df['FITC_raw'] = df['FITC_raw'].where(df['FITC_raw'] >= 0.0, other=0.0)

    # log10 + shift
    if 'FITC_log10' not in df.columns:
        with np.errstate(divide='ignore', invalid='ignore'):
            df['FITC_log10'] = np.log10(df['FITC_raw'].fillna(0.0) + float(LOG_SHIFT))
    # asinh
    if 'FITC_asinh' not in df.columns:
        df['FITC_asinh'] = np.arcsinh(df['FITC_raw'].fillna(0.0) / float(ASINH_COFACTOR))
    # pct rank within sample (works per DataFrame)
    if 'FITC_pct' not in df.columns:
        try:
            df['FITC_pct'] = df['FITC_log10'].rank(pct=True, na_option='bottom')
        except Exception:
            df['FITC_pct'] = np.nan

    # ratios
    eps = 1e-9
    if 'FSC-A' in df.columns and 'FSC-H' in df.columns and 'FSC_ratio' not in df.columns:
        df['FSC_ratio'] = pd.to_numeric(df['FSC-A'], errors='coerce') / (pd.to_numeric(df['FSC-H'], errors='coerce') + eps)
    if 'SSC-A' in df.columns and 'SSC-H' in df.columns and 'SSC_ratio' not in df.columns:
        df['SSC_ratio'] = pd.to_numeric(df['SSC-A'], errors='coerce') / (pd.to_numeric(df['SSC-H'], errors='coerce') + eps)

    return df

# -----------------------
# 1) Merge (stream) parquet files into merged.parquet
# -----------------------
files = list_parquets(DATA_DIR)
if len(files) == 0:
    print("No parquet files found in", DATA_DIR)
    sys.exit(1)

print(f"Found {len(files)} parquet files. Merging into {MERGED_PATH} ...")
# use first non-empty file to obtain schema
first_schema = None
writer = None
rows_written = 0
total_removed = 0
total_rows = 0

for p in tqdm(files, desc="Merging files"):
    try:
        df = pd.read_parquet(p)
    except Exception as e:
        print(f"Warning: cannot read {p}: {e}")
        continue
    total_rows += len(df)
    df_clean, removed = drop_rows_with_nonfinite_numeric(df)
    total_removed += removed
    if df_clean.empty:
        continue

    # ensure some derived features exist (so schema is consistent)
    df_clean = safe_add_feature_columns(df_clean)

    # set schema from first written
    if writer is None:
        table0 = pa.Table.from_pandas(df_clean, safe=False)
        schema = table0.schema
        writer = pq.ParquetWriter(MERGED_PATH, schema, compression='snappy', use_dictionary=True)
    # align columns to schema (add missing as NaN)
    schema_cols = [f.name for f in schema]
    for c in schema_cols:
        if c not in df_clean.columns:
            df_clean[c] = np.nan
    # reindex into same order
    df_to_write = df_clean[schema_cols]
    writer.write_table(pa.Table.from_pandas(df_to_write, schema=schema, safe=False))
    rows_written += len(df_to_write)

if writer is not None:
    writer.close()

print(f"Merging finished. total rows scanned: {total_rows}, rows written: {rows_written}, rows removed: {total_removed}")
if rows_written == 0:
    print("No rows written to merged parquet. Exiting.")
    sys.exit(1)

# -----------------------
# 2) Inspect merged schema and decide training branches
# -----------------------
pf = pq.ParquetFile(MERGED_PATH)
merged_columns = pf.schema_arrow.names
print("Merged columns (sample):", merged_columns[:40])
has_label = 'label' in merged_columns
print("Cell-level label present?:", has_label)

# -----------------------
# 3) Build cell-level dataset (subsample) and train classifier if labels exist
# -----------------------
cell_model = None
cell_feature_list = None

if has_label:
    print("Building cell-level training dataset (subsampling across files)...")
    collected = 0
    frames = []
    for p in tqdm(files, desc="Sampling files for cell-level"):
        try:
            df = pd.read_parquet(p)
        except Exception:
            continue
        df_clean, removed = drop_rows_with_nonfinite_numeric(df)
        if df_clean.empty or 'label' not in df_clean.columns:
            continue
        # ensure derived features exist
        df_clean = safe_add_feature_columns(df_clean)
        n = len(df_clean)
        take = min(n, MAX_CELLS_PER_SAMPLE)
        remaining = MAX_TRAIN_CELLS - collected
        if remaining <= 0:
            break
        take = min(take, remaining)
        sample = df_clean.sample(n=take, random_state=RANDOM_STATE) if take < n else df_clean
        frames.append(sample)
        collected += len(sample)
    if collected == 0:
        print("No labeled cells found after cleaning. Skipping cell-level training.")
    else:
        cell_df = pd.concat(frames, ignore_index=True)
        # choose numeric features except label
        num_cols = cell_df.select_dtypes(include=[np.number]).columns.tolist()
        if 'label' in num_cols:
            num_cols.remove('label')
        # drop any columns with all-NaN
        num_cols = [c for c in num_cols if not cell_df[c].isna().all()]
        X = cell_df[num_cols].astype(float).fillna(0.0).values
        y = cell_df['label'].astype(int).values
        print("Cell-level dataset shape:", X.shape, "positive rate:", np.mean(y))
        # split, train
        Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y)
        clf = RandomForestClassifier(n_estimators=300, class_weight='balanced', n_jobs=-1, random_state=RANDOM_STATE)
        clf.fit(Xtr, ytr)
        yp = clf.predict(Xte)
        yprob = clf.predict_proba(Xte)[:, 1] if hasattr(clf, "predict_proba") else None
        print("Cell-level accuracy:", accuracy_score(yte, yp))
        try:
            print("Cell-level ROC AUC:", roc_auc_score(yte, yprob))
        except Exception:
            pass
        # save
        joblib.dump({'model': clf, 'features': num_cols}, CELL_MODEL_PATH)
        print("Saved cell-level model to", CELL_MODEL_PATH)
        cell_model = clf
        cell_feature_list = num_cols

# -----------------------
# 4) Build sample-level summary (per-file aggregation) and train regressor
# -----------------------
print("Building sample-level summary from each original parquet file...")
summary_rows = []
for p in tqdm(files, desc="Computing sample summaries"):
    try:
        df = pd.read_parquet(p)
    except Exception:
        continue
    df_clean, removed = drop_rows_with_nonfinite_numeric(df)
    if df_clean.empty:
        continue
    # make sure features exist
    df_clean = safe_add_feature_columns(df_clean)
    row = {'sample_file': os.path.basename(p), 'n_cells': len(df_clean)}
    if 'label' in df_clean.columns:
        row['pct_infected_true'] = 100.0 * df_clean['label'].astype(bool).mean()
    else:
        row['pct_infected_true'] = np.nan
    num_cols = df_clean.select_dtypes(include=[np.number]).columns.tolist()
    for c in num_cols:
        row[f'{c}_median'] = float(df_clean[c].median()) if df_clean[c].dropna().size>0 else np.nan
        row[f'{c}_mean'] = float(df_clean[c].mean()) if df_clean[c].dropna().size>0 else np.nan
        row[f'{c}_p90'] = float(df_clean[c].quantile(0.90)) if df_clean[c].dropna().size>0 else np.nan
    summary_rows.append(row)

summary_df = pd.DataFrame(summary_rows)
summary_df.to_parquet(SUMMARY_PATH, index=False)
print("Saved sample summary to", SUMMARY_PATH)

# Train sample-level regressor if enough true labels exist
n_with_target = summary_df['pct_infected_true'].notna().sum()
print("Samples with true target:", n_with_target)
sample_model = None
sample_feature_list = None
scaler = None

if n_with_target >= 10:
    df_train = summary_df.dropna(subset=['pct_infected_true']).copy()
    # choose numeric features excluding target
    feat_candidates = [c for c in df_train.columns if c not in ['sample_file', 'pct_infected_true'] and np.issubdtype(df_train[c].dtype, np.number)]
    # drop columns with too many NaN
    completeness = df_train[feat_candidates].notna().mean(axis=0)
    # keep cols with >= 0.5 completeness (adjustable)
    keep = completeness[completeness >= 0.5].index.tolist()
    if len(keep) < 5:
        keep = completeness.sort_values(ascending=False).head(10).index.tolist()
    X_df = df_train[keep].apply(pd.to_numeric, errors='coerce')
    # impute NaNs with median
    X_df = X_df.fillna(X_df.median())
    y = df_train['pct_infected_true'].values.astype(float)
    # scale features (robust)
    scaler = RobustScaler()
    X_scaled = scaler.fit_transform(X_df.values)
    Xtr, Xte, ytr, yte = train_test_split(X_scaled, y, test_size=0.2, random_state=RANDOM_STATE)
    rfr = RandomForestRegressor(n_estimators=300, n_jobs=-1, random_state=RANDOM_STATE)
    rfr.fit(Xtr, ytr)
    yp = rfr.predict(Xte)
    print("Sample-level MAE:", mean_absolute_error(yte, yp))
    print("Sample-level R2:", r2_score(yte, yp))
    joblib.dump({'model': rfr, 'features': keep, 'scaler': scaler}, SAMPLE_MODEL_PATH)
    print("Saved sample-level model to", SAMPLE_MODEL_PATH)
    sample_model = rfr
    sample_feature_list = keep
else:
    print("Not enough samples with true target to train sample-level regressor. Skipping training.")

# -----------------------
# 5) Inference: apply models to each file and write Excel + plots
# -----------------------
print("Running inference over all files and producing Excel + plots...")
out_rows = []
for p in tqdm(files, desc="Running inference"):
    try:
        df = pd.read_parquet(p)
    except Exception:
        continue
    df_clean, removed = drop_rows_with_nonfinite_numeric(df)
    if df_clean.empty:
        continue
    df_clean = safe_add_feature_columns(df_clean)
    row = {'sample_file': os.path.basename(p), 'n_cells_clean': len(df_clean)}
    # cell-level prediction: proportion predicted infected by cell_model (if exists)
    if cell_model is not None and cell_feature_list is not None:
        # ensure columns exist
        missing = [c for c in cell_feature_list if c not in df_clean.columns]
        if missing:
            # create missing with NaN
            for c in missing:
                df_clean[c] = np.nan
        Xc = df_clean[cell_feature_list].apply(pd.to_numeric, errors='coerce').fillna(0.0).values
        try:
            probs = cell_model.predict_proba(Xc)[:, 1]
            row['cell_model_pct_infected'] = 100.0 * float(np.mean(probs))
        except Exception:
            row['cell_model_pct_infected'] = np.nan
    else:
        row['cell_model_pct_infected'] = np.nan

    # sample-level model prediction from aggregated features
    if sample_model is not None and sample_feature_list is not None and scaler is not None:
        # build feature vector from medians (as used in training)
        v = {}
        for c in sample_feature_list:
            # original training used medians/means; choose medians if present
            base_col = c.replace('_median','').replace('_mean','')
            # try to find the column in df_clean that corresponds to base_col
            if base_col in df_clean.columns:
                v[c] = df_clean[base_col].median()
            else:
                # fallback: try direct column name (maybe training used the summary col name)
                v[c] = np.nan
        xf = pd.Series(v).astype(float).fillna(pd.Series(v).median())
        Xs = scaler.transform(xf.values.reshape(1, -1))
        try:
            pred = sample_model.predict(Xs)[0]
            row['sample_model_pred_pct'] = float(pred)
        except Exception:
            row['sample_model_pred_pct'] = np.nan
    else:
        row['sample_model_pred_pct'] = np.nan

    # truth (if exists)
    if 'label' in df_clean.columns:
        row['true_pct_infected'] = float(100.0 * df_clean['label'].astype(bool).mean())
    else:
        row['true_pct_infected'] = np.nan

    out_rows.append(row)

out_df = pd.DataFrame(out_rows)
# save to excel
out_df = out_df.sort_values('sample_file').reset_index(drop=True)
out_df.to_excel(EXCEL_OUT, index=False)
print("Saved Excel summary to", EXCEL_OUT)

# simple plots: true vs predicted scatter (if available)
try:
    plt.figure(figsize=(6,6))
    mask = out_df['true_pct_infected'].notna() & out_df['sample_model_pred_pct'].notna()
    if mask.sum() > 0:
        plt.scatter(out_df.loc[mask,'true_pct_infected'], out_df.loc[mask,'sample_model_pred_pct'], s=8)
        plt.plot([0,100],[0,100],'k--',linewidth=0.8)
        plt.xlabel('True % infected')
        plt.ylabel('Predicted % infected')
        plt.title('Sample-level: true vs predicted')
        plt.grid(True, alpha=0.3)
        plt.tight_layout()
        plt.savefig(os.path.join(PLOTS_DIR,'sample_true_vs_pred.png'), dpi=150)
        plt.close()
    # histogram of predicted cell-level %
    if 'cell_model_pct_infected' in out_df.columns and out_df['cell_model_pct_infected'].notna().sum() > 0:
        plt.figure(figsize=(6,4))
        out_df['cell_model_pct_infected'].dropna().hist(bins=50)
        plt.xlabel('Cell-model % infected')
        plt.tight_layout()
        plt.savefig(os.path.join(PLOTS_DIR,'cell_model_pct_hist.png'), dpi=150)
        plt.close()
    print("Saved plots to", PLOTS_DIR)
except Exception as e:
    print("Plotting failed:", e)

print("Pipeline finished successfully.")


Found 1435 parquet files. Merging into C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\merged.parquet ...


Merging files: 100%|██████████| 1435/1435 [01:05<00:00, 22.02it/s]


Merging finished. total rows scanned: 48487827, rows written: 48463283, rows removed: 24544
Merged columns (sample): ['FSC-H', 'SSC-H', 'FITC-H', 'FSC-A', 'SSC-A', 'FITC-A', 'label', 'FITC_raw', 'FITC_log10', 'FITC_asinh', 'FITC_pct', 'FITC_z_robust', 'FSC_ratio', 'SSC_ratio', 'gmm_prob_infected', '__index_level_0__']
Cell-level label present?: True
Building cell-level training dataset (subsampling across files)...


Sampling files for cell-level:   4%|▍         | 62/1435 [00:00<00:19, 71.27it/s]


Cell-level dataset shape: (300000, 14) positive rate: 0.03927666666666667
Cell-level accuracy: 0.99825
Cell-level ROC AUC: 0.9999309569719919
Saved cell-level model to C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\cell_level_rf.joblib
Building sample-level summary from each original parquet file...


Computing sample summaries: 100%|██████████| 1435/1435 [00:46<00:00, 30.92it/s]


Saved sample summary to C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\summary_from_merged.parquet
Samples with true target: 1435
Sample-level MAE: 0.6913409797441538
Sample-level R2: 0.9365949133907789
Saved sample-level model to C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\sample_level_rf.joblib
Running inference over all files and producing Excel + plots...


Running inference: 100%|██████████| 1435/1435 [02:27<00:00,  9.73it/s]


Saved Excel summary to C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\analysis_summary.xlsx
Saved plots to C:\\Users\\Professional\\Documents\\VirusNeutr\\parquet\plots
Pipeline finished successfully.
