# PoC: Klasyfikacja zdań do 150 demandów z użyciem opisów labeli (bez RAG, bez OpenAI)

**Cel**: wykorzystać nową kolumnę `demand_desc` jako "prototypy klas" i klasyfikować `text` poprzez największe podobieństwo cosine do embeddingów opisów labeli.
Opcjonalnie: self-training na danych nielabelowanych.

Założenia o danych:
- kolumny: `text`, `demand_id`, `demand_desc`
- `demand_id` jest puste/NaN dla wierszy niepolabelowanych
- `demand_desc` zawiera opis znaczenia demanda (może powtarzać się w wielu wierszach dla tego samego `demand_id`)

In [None]:
!pip -q install -U "sentence-transformers>=3.0.0" "scikit-learn>=1.4.0" "pandas>=2.0.0" "numpy>=1.24.0" "tqdm" "openpyxl"

## Importy + konfiguracja

In [None]:
import os
import random
import numpy as np
import pandas as pd
import torch

from tqdm.auto import tqdm
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, classification_report

from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device:", device)

## Wczytanie danych (CSV lub XLSX)

In [None]:
DATA_PATH = "dataset.csv"  # albo: "dataset.xlsx"

if DATA_PATH.lower().endswith(".xlsx"):
    df = pd.read_excel(DATA_PATH)
else:
    df = pd.read_csv(DATA_PATH)

required_cols = {"text", "demand_id", "demand_desc"}
missing = required_cols - set(df.columns)
if missing:
    raise ValueError(f"Brak wymaganych kolumn: {missing}")

df["text"] = df["text"].astype(str)
df["demand_id"] = df["demand_id"].replace("", np.nan)

print("Shape:", df.shape)
print("Labeled:", df["demand_id"].notna().sum(), "Unlabeled:", df["demand_id"].isna().sum())
df.head(3)

## Mapa: demand_id -> demand_desc (prototypy klas)

In [None]:
labeled = df[df["demand_id"].notna()].copy()
unlabeled = df[df["demand_id"].isna()].copy()

label_desc = (
    labeled[["demand_id", "demand_desc"]]
    .dropna()
    .drop_duplicates(subset=["demand_id"])
    .set_index("demand_id")["demand_desc"]
    .to_dict()
)

all_labels = sorted(label_desc.keys())
print("Unique labeled classes:", len(all_labels))

missing_desc = set(labeled["demand_id"].unique()) - set(label_desc.keys())
print("Labels missing desc:", len(missing_desc))
if missing_desc:
    print("Examples:", list(missing_desc)[:10])

## Split train/val/test

Uwaga: przy 150 klasach i 3k przykładów niektóre klasy mogą być bardzo rzadkie; w razie problemów ze stratyfikacją robimy fallback.

In [None]:
X = labeled["text"].values
y = labeled["demand_id"].values

def safe_stratified_split(X, y, test_size, seed):
    try:
        return train_test_split(X, y, test_size=test_size, random_state=seed, stratify=y)
    except ValueError:
        return train_test_split(X, y, test_size=test_size, random_state=seed, stratify=None)

X_train, X_tmp, y_train, y_tmp = safe_stratified_split(X, y, test_size=0.30, seed=SEED)
X_val, X_test, y_val, y_test = safe_stratified_split(X_tmp, y_tmp, test_size=0.50, seed=SEED)

print(len(X_train), len(X_val), len(X_test))

## Model embeddingowy

Wybór startowy: `intfloat/multilingual-e5-base` (dobry dla PL/EN).  
Możesz podmienić na np. `sentence-transformers/paraphrase-multilingual-mpnet-base-v2`.

In [None]:
BASE_MODEL = "intfloat/multilingual-e5-base"
model = SentenceTransformer(BASE_MODEL, device=device)

## Embeddingi opisów labeli i predykcja przez max cosine similarity

Tu *nie* porównujemy opisu z "innym opisem" w sensie treningu na parach z tego samego rzędu.
Robimy klasyfikację: **tekst zdania** (query) porównujemy do **wszystkich opisów klas** (passage) i wybieramy klasę o największym podobieństwie.

To jest klasyfikator typu "prototypowego": opis labela jest reprezentantem klasy.

In [None]:
@torch.no_grad()
def embed_texts(st_model, texts, batch_size=128):
    embs = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Embedding"):
        batch = texts[i:i+batch_size]
        embs.append(st_model.encode(batch, convert_to_tensor=True, normalize_embeddings=True))
    return torch.cat(embs, dim=0)

label_texts = [f"passage: {label_desc[l]}" for l in all_labels]
label_emb = embed_texts(model, label_texts, batch_size=128)

@torch.no_grad()
def predict_labels(st_model, texts, label_emb, all_labels, batch_size=128):
    preds = []
    scores = []
    for i in tqdm(range(0, len(texts), batch_size), desc="Predict"):
        batch = [f"query: {t}" for t in texts[i:i+batch_size]]
        text_emb = st_model.encode(batch, convert_to_tensor=True, normalize_embeddings=True)
        sims = cos_sim(text_emb, label_emb)  # [B, L]
        best = torch.argmax(sims, dim=1).cpu().numpy()
        best_scores = torch.max(sims, dim=1).values.cpu().numpy()
        preds.extend([all_labels[j] for j in best])
        scores.extend(best_scores.tolist())
    return np.array(preds), np.array(scores)

val_pred, _ = predict_labels(model, X_val, label_emb, all_labels)
test_pred, _ = predict_labels(model, X_test, label_emb, all_labels)

print("VAL macro F1 :", f1_score(y_val, val_pred, average="macro"))
print("VAL micro F1 :", f1_score(y_val, val_pred, average="micro"))
print("TEST macro F1:", f1_score(y_test, test_pred, average="macro"))
print("TEST micro F1:", f1_score(y_test, test_pred, average="micro"))

### Raport (może być długi dla 150 klas)

In [None]:
print(classification_report(y_test, test_pred, digits=3, zero_division=0))

## (Opcjonalnie) Self-training na unlabeled (pseudo-labeling)

Bierzemy tylko pewne predykcje (próg podobieństwa oraz margines top1-top2), dokładamy do treningu lub do ręcznej walidacji.
W tym prostym notebooku robimy tylko **prelabeling + zapis wyników**.

In [None]:
THRESH = 0.40
MARGIN = 0.05

@torch.no_grad()
def predict_top2(st_model, texts, label_emb, all_labels, batch_size=128):
    top1_label, top1_score, top2_score = [], [], []
    for i in tqdm(range(0, len(texts), batch_size), desc="Predict top2"):
        batch = [f"query: {t}" for t in texts[i:i+batch_size]]
        text_emb = st_model.encode(batch, convert_to_tensor=True, normalize_embeddings=True)
        sims = cos_sim(text_emb, label_emb)  # [B, L]
        vals, idxs = torch.topk(sims, k=2, dim=1)
        vals = vals.cpu().numpy()
        idxs = idxs.cpu().numpy()
        top1_label.extend([all_labels[j] for j in idxs[:,0]])
        top1_score.extend(vals[:,0].tolist())
        top2_score.extend(vals[:,1].tolist())
    return np.array(top1_label), np.array(top1_score), np.array(top2_score)

if len(unlabeled) > 0:
    ul_texts = unlabeled["text"].astype(str).tolist()
    ul_pred, ul_s1, ul_s2 = predict_top2(model, ul_texts, label_emb, all_labels)
    keep = (ul_s1 >= THRESH) & ((ul_s1 - ul_s2) >= MARGIN)
    print("Pseudo accepted:", int(keep.sum()), "out of", len(unlabeled))
else:
    print("No unlabeled rows found.")

## Zapis predykcji dla całego datasetu (labeled + unlabeled)

In [None]:
all_texts = df["text"].astype(str).tolist()
pred_all, score_all = predict_labels(model, all_texts, label_emb, all_labels)

out = df.copy()
out["pred_demand_id"] = pred_all
out["pred_score"] = score_all

OUT_PATH = "predictions.parquet"
out.to_parquet(OUT_PATH, index=False)

print("Saved:", OUT_PATH)
out.head(3)

## Co dalej (praktycznie)

1) Jeśli baseline (bez fine-tuningu) jest słaby, następnym krokiem jest fine-tuning bi-encodera na parach (text, demand_desc) z MultipleNegativesRankingLoss.  
2) Można też trenować klasyczny transformer-classifier, ale **dokleić** opis labela do wejścia (np. text + [SEP] desc) i uczyć „czy pasuje” (cross-encoder) — to bywa mocniejsze, ale wolniejsze w inferencji.  
W tym notebooku zostawiliśmy wariant najprostszy i najszybszy w PoC.