# Model predykcji wysokiego stresu (HIGH)

Notebook uporządkowany: wczytanie danych → preprocessing → modele → walidacja CV → tuning ETC → interpretacja cech → zapis artefaktów.


In [11]:

import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split, RepeatedStratifiedKFold, cross_validate, RandomizedSearchCV
from sklearn.compose import ColumnTransformer, make_column_selector
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.metrics import (
    balanced_accuracy_score, f1_score, roc_auc_score,
    confusion_matrix, classification_report, make_scorer
)

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.inspection import permutation_importance

import joblib


## 1) Konfiguracja

In [12]:
class Config:
    excel_path: str = "prawidlowy_excel.xlsx"
    sheet_name: str | None = None

    stress_high_threshold: float = 8.0
    test_size: float = 0.35
    random_state: int = 42

cfg = Config()


## 2) Funkcje pomocnicze + testy jednostkowe (czytelne, z liczbami)

In [13]:
import re
import pandas as pd
def find_stress_column(df: pd.DataFrame) -> str:
    cols_lower = {c: str(c).lower() for c in df.columns}
    stress_candidates = [c for c in df.columns if "stres" in cols_lower[c]]
    if not stress_candidates:
        raise ValueError("Nie znalazłem kolumny stresu (musi zawierać 'stres').")

    for c in stress_candidates:
        cl = cols_lower[c]
        if "poziom" in cl or "skali" in cl or "skala" in cl:
            return c
    return stress_candidates[0]

def coerce_numeric_commas(series: pd.Series) -> pd.Series:
    s = series.astype(str).str.replace(",", ".", regex=False).str.strip()
    s = s.replace({"nan": np.nan, "None": np.nan, "none": np.nan, "": np.nan})
    return pd.to_numeric(s, errors="coerce")

def make_target_from_stress(stress_num: pd.Series, thr: float) -> pd.Series:
    return pd.Series(np.where(stress_num >= thr, "HIGH", "NOT_HIGH"), index=stress_num.index)

def make_preprocess():
    num_sel = make_column_selector(dtype_include=np.number)
    cat_sel = make_column_selector(dtype_exclude=np.number)

    num_pipe = Pipeline(steps=[
        ("imp", SimpleImputer(strategy="median")),
    ])

    cat_pipe = Pipeline(steps=[
        ("imp", SimpleImputer(strategy="most_frequent")),
        ("ohe", OneHotEncoder(handle_unknown="ignore")),
    ])

    return ColumnTransformer(
        transformers=[("num", num_pipe, num_sel), ("cat", cat_pipe, cat_sel)],
        remainder="drop"
    )

# ===== TESTY (verbose, z obliczeniami) =====
def _test_find_stress_column_verbose():
    print("TEST: find_stress_column\n" + "-"*35)
    df0 = pd.DataFrame({"Poziom stresu (skala 1-10)": [1, 2], "wiek": [20, 21]})
    res0 = find_stress_column(df0)
    print("Kolumny:", list(df0.columns))
    print("Wybrana:", res0)
    assert res0 == "Poziom stresu (skala 1-10)"
    print("OK\n")

    df1 = pd.DataFrame({"stres": [1, 2], "x": [0, 1]})
    res1 = find_stress_column(df1)
    print("Kolumny:", list(df1.columns))
    print("Wybrana:", res1)
    assert res1 == "stres"
    print("OK\n")

    try:
        find_stress_column(pd.DataFrame({"abc": [1]}))
        raise AssertionError("Powinien być wyjątek, a nie był.")
    except ValueError as e:
        print("Oczekiwany wyjątek:", repr(e))
        print("OK\n")

def _test_coerce_numeric_commas_verbose():
    print("TEST: coerce_numeric_commas\n" + "-"*35)
    s = pd.Series(["1,5", "2", "abc", None, ""])
    out = coerce_numeric_commas(s)
    df_show = pd.DataFrame({"in": s, "out": out})
    print(df_show)
    assert np.isclose(out.iloc[0], 1.5)
    assert np.isclose(out.iloc[1], 2.0)
    assert np.isnan(out.iloc[2])
    assert np.isnan(out.iloc[3])
    assert np.isnan(out.iloc[4])
    print("OK\n")

def _test_make_target_from_stress_verbose():
    print("TEST: make_target_from_stress\n" + "-"*35)
    s = pd.Series([7.9, 8.0, 10.0])
    y = make_target_from_stress(s, 8.0)
    df_show = pd.DataFrame({"stress": s, "thr": 8.0, "class": y})
    print(df_show)
    assert list(y) == ["NOT_HIGH", "HIGH", "HIGH"]
    print("OK\n")

_test_find_stress_column_verbose()
_test_coerce_numeric_commas_verbose()
_test_make_target_from_stress_verbose()

print("OK — testy funkcji pomocniczych przeszły.")


TEST: find_stress_column
-----------------------------------
Kolumny: ['Poziom stresu (skala 1-10)', 'wiek']
Wybrana: Poziom stresu (skala 1-10)
OK

Kolumny: ['stres', 'x']
Wybrana: stres
OK

Oczekiwany wyjątek: ValueError("Nie znalazłem kolumny stresu (musi zawierać 'stres').")
OK

TEST: coerce_numeric_commas
-----------------------------------
     in  out
0   1,5  1.5
1     2  2.0
2   abc  NaN
3  None  NaN
4        NaN
OK

TEST: make_target_from_stress
-----------------------------------
   stress  thr     class
0     7.9  8.0  NOT_HIGH
1     8.0  8.0      HIGH
2    10.0  8.0      HIGH
OK

OK — testy funkcji pomocniczych przeszły.


## 3) Wczytanie i przygotowanie danych (X, y)

In [16]:
import os

if not os.path.exists(cfg.excel_path):
    raise FileNotFoundError(f"Nie widzę pliku: {cfg.excel_path}. Ustaw cfg.excel_path poprawnie.")

raw = pd.read_excel(cfg.excel_path, sheet_name=cfg.sheet_name, engine="openpyxl")
df = raw[list(raw.keys())[0]].copy() if isinstance(raw, dict) else raw.copy()
df = df.dropna(how="all")

stress_col = find_stress_column(df)
stress_num = coerce_numeric_commas(df[stress_col])
mask = stress_num.notna()

df = df.loc[mask].copy()
stress_num = stress_num.loc[mask].copy()

y = make_target_from_stress(stress_num, cfg.stress_high_threshold)
y.name = "stress_class"

X = df.drop(columns=[stress_col], errors="ignore").copy()

# Konwersja "liczb z przecinkami" w kolumnach tekstowych, jeśli >=70% to liczby
for c in X.columns:
    if X[c].dtype == "object":
        maybe = coerce_numeric_commas(X[c])
        if maybe.notna().mean() >= 0.70:
            X[c] = maybe

print("Dane X:", X.shape, " | y:", y.shape)
print("Rozkład klas:\n", y.value_counts())
print("Kolumna stresu:", stress_col)


FileNotFoundError: Nie widzę pliku: prawidlowy_excel.xlsx. Ustaw cfg.excel_path poprawnie.

## 4) Podział train/test (holdout)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=cfg.test_size,
    random_state=cfg.random_state,
    stratify=y
)

print("Train:", X_train.shape, "Test:", X_test.shape)
print("Train klasy:\n", y_train.value_counts())
print("Test  klasy:\n", y_test.value_counts())


## 5) Modele bazowe: Logistic Regression vs ExtraTrees (holdout)

In [None]:
f1_high_scorer = make_scorer(f1_score, pos_label="HIGH")

pipe_lr = Pipeline(steps=[
    ("preprocess", make_preprocess()),
    ("model", LogisticRegression(max_iter=5000, class_weight="balanced", random_state=cfg.random_state))
])

pipe_etc = Pipeline(steps=[
    ("preprocess", make_preprocess()),
    ("model", ExtraTreesClassifier(
        n_estimators=600,
        random_state=cfg.random_state,
        n_jobs=1,
        class_weight="balanced",
        min_samples_leaf=2
    ))
])

def eval_holdout(pipe, name):
    pipe.fit(X_train, y_train)
    pred = pipe.predict(X_test)

    ba = balanced_accuracy_score(y_test, pred)
    f1h = f1_score(y_test, pred, pos_label="HIGH")
    cm = confusion_matrix(y_test, pred, labels=["HIGH","NOT_HIGH"])

    print(f"\n{name} — HOLDOUT")
    print("BA:", round(float(ba), 4), " | F1(HIGH):", round(float(f1h), 4))
    print("CM (rows=true [HIGH, NOT_HIGH], cols=pred [HIGH, NOT_HIGH]):\n", cm)
    print(classification_report(y_test, pred))

eval_holdout(pipe_lr, "LogisticRegression")
eval_holdout(pipe_etc, "ExtraTreesClassifier")


## 6) Stabilna ocena: RepeatedStratifiedKFold 5×5 (CV)

In [None]:
cv_rep = RepeatedStratifiedKFold(n_splits=5, n_repeats=5, random_state=cfg.random_state)

scoring = {
    "bal_acc": "balanced_accuracy",
    "f1_high": f1_high_scorer,
    "roc_auc": "roc_auc"
}

def cv_report(pipe, name):
    out = cross_validate(pipe, X, y, cv=cv_rep, scoring=scoring, n_jobs=1, return_train_score=False)
    ba = out["test_bal_acc"]; f1h = out["test_f1_high"]; roc = out["test_roc_auc"]
    print(f"\n{name} (Repeated 5×5 CV)")
    print(f"BA      : {ba.mean():.4f} ± {ba.std(ddof=1):.4f}")
    print(f"F1(HIGH): {f1h.mean():.4f} ± {f1h.std(ddof=1):.4f}")
    print(f"ROC-AUC : {roc.mean():.4f} ± {roc.std(ddof=1):.4f}")
    return out

out_lr = cv_report(pipe_lr, "LOGISTIC REGRESSION")
out_etc = cv_report(pipe_etc, "ETC (wstępne parametry)")


## 7) Strojenie hiperparametrów ETC (RandomizedSearchCV na Repeated CV)

In [None]:
param_dist = {
    "model__n_estimators": [400, 800, 1200],
    "model__max_depth": [None, 5, 8, 12, 16, 20],
    "model__min_samples_split": [2, 5, 10, 20, 40],
    "model__min_samples_leaf": [1, 2, 4, 8, 12, 20],
    "model__max_features": ["sqrt", "log2", 0.3, 0.5, 0.8, None],
    "model__bootstrap": [False, True],
    "model__class_weight": [None, "balanced"],
}

search = RandomizedSearchCV(
    estimator=pipe_etc,
    param_distributions=param_dist,
    n_iter=40,
    scoring=scoring,
    refit="f1_high",
    cv=cv_rep,
    n_jobs=1,
    random_state=cfg.random_state,
    verbose=1,
    return_train_score=False
)

search.fit(X, y)

best_pipe = search.best_estimator_

print("\nNajlepszy wynik (CV) F1(HIGH):", round(float(search.best_score_), 4))
print("Najlepsze parametry:")
for k, v in search.best_params_.items():
    print(" -", k, "=", v)

# raport stabilności dla best_pipe
out_best = cv_report(best_pipe, "BEST ETC")


## 8) Wpływ zmiennych: permutation importance (na holdout)

In [None]:
best_pipe.fit(X_train, y_train)

perm = permutation_importance(
    best_pipe,
    X_test, y_test,
    scoring=f1_high_scorer,
    n_repeats=30,
    random_state=cfg.random_state,
    n_jobs=1
)

feat_names = best_pipe.named_steps["preprocess"].get_feature_names_out()
imp_perm = pd.DataFrame({
    "feature": feat_names,
    "importance_mean": perm.importances_mean,
    "importance_std": perm.importances_std
}).sort_values("importance_mean", ascending=False)

display(imp_perm.head(20))


In [None]:
import joblib
joblib.dump(best_pipe, "best_model.joblib")


In [None]:
from pathlib import Path
import json
import joblib
import pandas as pd
from sklearn.metrics import balanced_accuracy_score, f1_score, confusion_matrix, classification_report

def holdout_report(pipe, X_test, y_test, name="MODEL"):
    pred = pipe.predict(X_test)
    ba = balanced_accuracy_score(y_test, pred)
    f1h = f1_score(y_test, pred, pos_label="HIGH")
    cm = confusion_matrix(y_test, pred, labels=["HIGH","NOT_HIGH"])
    rep = classification_report(y_test, pred)

    txt = (
        f"{name} — HOLDOUT\n"
        f"Balanced accuracy: {ba:.4f}\n"
        f"F1(HIGH): {f1h:.4f}\n\n"
        f"Confusion matrix (rows=true [HIGH, NOT_HIGH], cols=pred [HIGH, NOT_HIGH]):\n{cm}\n\n"
        f"Classification report:\n{rep}\n"
    )
    metrics = {"balanced_accuracy": float(ba), "f1_high": float(f1h)}
    return txt, metrics

# folder na wyniki
Path("results").mkdir(exist_ok=True)

# raport i metryki dla najlepszego modelu
report_text, metrics = holdout_report(best_pipe, X_test, y_test, name="BEST ETC")

(Path("results") / "metrics_report.txt").write_text(report_text, encoding="utf-8")
pd.DataFrame([metrics]).to_csv("results/metrics_holdout.csv", index=False)

# zapis najlepszych parametrów z RandomizedSearchCV
(Path("results") / "best_params.json").write_text(
    json.dumps(search.best_params_, ensure_ascii=False, indent=2),
    encoding="utf-8"
)

# zapis importance (to jest Twoje imp_perm z notebooka)
imp_perm.head(20).to_csv("results/feature_importance_top20.csv", index=False)
imp_perm.to_csv("results/feature_importance_all.csv", index=False)

# zapis modelu
joblib.dump(best_pipe, "results/best_model.joblib")

