
# Text Classification PoC v2
- SentenceTransformer: `intfloat/multilingual-e5-small`（instruction tuned）で高品質な埋め込みを取得。
- 5-fold Stratified CV + hold-out で SVM/LogReg/MLP/LightGBM/Bagging を比較し、Optuna で主要モデルをチューニング。
- カテゴリ名の埋め込みとのコサイン類似度や確信度モニタリング、類似問い合わせ抽出など運用要件をPoC化。


In [None]:

# Purpose: Import dependencies, set constants/logging, and configure deterministic behavior for reproducibility.
from __future__ import annotations

import logging
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Tuple

import numpy as np
import optuna
import pandas as pd
import polars as pl
from lightgbm import LGBMClassifier
from sentence_transformers import SentenceTransformer
from sklearn import metrics, model_selection, preprocessing
from sklearn.ensemble import BaggingClassifier, StackingClassifier, VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC

RANDOM_SEED = 42
DATA_PATH = Path("data/text_classification_samples_200.csv")
EMBED_MODEL_NAME = "intfloat/multilingual-e5-small"
E5_INSTRUCTION = "query: "

np.random.seed(RANDOM_SEED)
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
pl.enable_string_cache()


In [None]:

# Purpose: Provide data loading helpers built on top of Polars LazyFrame for scalable ingestion.
def load_lazy_dataset(csv_path: Path) -> pl.LazyFrame:
    """Return a Polars LazyFrame scanning the CSV without materializing rows."""
    return pl.scan_csv(csv_path)


def preview_lazyframe(lazy_frame: pl.LazyFrame, sample_size: int = 5) -> pl.DataFrame:
    """Collect a small sample to inspect schema/values while keeping the query lazy."""
    return lazy_frame.head(sample_size).collect()


def materialize_dataset(lazy_frame: pl.LazyFrame) -> pd.DataFrame:
    """Materialize the LazyFrame into a Pandas DataFrame for compatibility with sklearn."""
    return lazy_frame.collect().to_pandas()


def encode_labels(
    df: pd.DataFrame,
    label_column: str = "category_label",
    new_column: str = "category_id",
) -> Tuple[pd.DataFrame, preprocessing.LabelEncoder]:
    """Encode string labels into integers and append them as a new column."""
    encoded_df = df.copy()
    encoder = preprocessing.LabelEncoder()
    encoded_df[new_column] = encoder.fit_transform(encoded_df[label_column])
    return encoded_df, encoder


def summarize_categories(
    df: pd.DataFrame,
    category_name_col: str = "category_name",
) -> pd.DataFrame:
    """Return counts and ratios per category for quick EDA."""
    summary = (
        df[category_name_col]
        .value_counts()
        .rename("count")
        .to_frame()
        .assign(ratio=lambda frame: frame["count"] / frame["count"].sum())
        .reset_index()
        .rename(columns={"index": category_name_col})
    )
    return summary


In [None]:

# Purpose: Load the dataset lazily, preview it, and persist a Pandas copy with encoded labels for modeling.
lazy_dataset = load_lazy_dataset(DATA_PATH)
preview_lazyframe(lazy_dataset, sample_size=5)


In [None]:

# Purpose: Materialize the dataset, attach numeric labels, and summarize category balance for reference.
records_df, label_encoder = encode_labels(materialize_dataset(lazy_dataset))
category_summary = summarize_categories(records_df)
category_summary


In [None]:

# Purpose: Define embedding helpers tailored for instruction-tuned E5 family models.
def normalize_for_instruction(text: str, instruction: str = E5_INSTRUCTION) -> str:
    """Prefix text with the E5 instruction keyword to unlock better multilingual embeddings."""
    clean = text.strip().replace("\n", " ")
    return f"{instruction}{clean}"


def build_embedder(model_name: str = EMBED_MODEL_NAME) -> SentenceTransformer:
    """Load and return a SentenceTransformer model; default is multilingual-e5-small."""
    return SentenceTransformer(model_name)


def embed_texts(
    embedder: SentenceTransformer,
    texts: Iterable[str],
    instruction: str = E5_INSTRUCTION,
    batch_size: int = 32,
    normalize_embeddings: bool = True,
) -> np.ndarray:
    """Convert iterable of texts into normalized embeddings with the provided instruction prefix."""
    prepared = [normalize_for_instruction(text, instruction) for text in texts]
    vectors = embedder.encode(
        prepared,
        batch_size=batch_size,
        show_progress_bar=False,
        normalize_embeddings=normalize_embeddings,
    )
    return np.asarray(vectors, dtype=np.float32)


In [None]:

# Purpose: Instantiate the embedder and transform all texts into dense vectors.
embedder = build_embedder()
text_embeddings = embed_texts(embedder, records_df["text"])
text_embeddings.shape


In [None]:

# Purpose: Build category-level embeddings and cosine-similarity features to enrich the model input space.
def build_category_embeddings(
    embedder: SentenceTransformer,
    category_texts: Iterable[str],
    instruction: str = E5_INSTRUCTION,
) -> np.ndarray:
    """Generate normalized embeddings for each category description/name."""
    prepared = [normalize_for_instruction(text, instruction) for text in category_texts]
    return embedder.encode(
        prepared,
        batch_size=len(prepared),
        show_progress_bar=False,
        normalize_embeddings=True,
    )


def concat_similarity_features(
    text_vectors: np.ndarray,
    category_vectors: np.ndarray,
) -> np.ndarray:
    """Compute cosine similarities and append them to the original embeddings."""
    similarities = text_vectors @ category_vectors.T
    return np.hstack([text_vectors, similarities])


category_embeddings = build_category_embeddings(embedder, label_encoder.classes_)
augmented_embeddings = concat_similarity_features(text_embeddings, category_embeddings)
augmented_embeddings.shape


In [None]:

# Purpose: Create modeling utilities for CV + holdout evaluation on multiple classifiers, including LightGBM & bagging.
def build_model_registry(random_state: int = RANDOM_SEED) -> Dict[str, Callable[[], object]]:
    """Return a set of lazily-initialized sklearn-compatible estimators with consistent preprocessing."""

    def make_logistic() -> LogisticRegression:
        return LogisticRegression(max_iter=4000, random_state=random_state)

    def make_bagging_logistic() -> BaggingClassifier:
        return BaggingClassifier(
            estimator=make_logistic(),
            n_estimators=15,
            max_samples=0.85,
            bootstrap=True,
            random_state=random_state,
            n_jobs=None,
        )

    return {
        "linear_svm": lambda: Pipeline(
            [("scaler", StandardScaler()), ("clf", LinearSVC(random_state=random_state))]
        ),
        "logistic_regression": lambda: Pipeline(
            [("scaler", StandardScaler()), ("clf", make_logistic())]
        ),
        "mlp_classifier": lambda: Pipeline(
            [
                ("scaler", StandardScaler()),
                (
                    "clf",
                    MLPClassifier(
                        hidden_layer_sizes=(384,),
                        activation="relu",
                        max_iter=1500,
                        random_state=random_state,
                    ),
                ),
            ]
        ),
        "lightgbm": lambda: LGBMClassifier(
            n_estimators=600,
            learning_rate=0.05,
            subsample=0.9,
            colsample_bytree=0.9,
            reg_lambda=0.1,
            random_state=random_state,
            n_jobs=-1,
            verbosity=-1,
        ),
        "bagging_log_reg": lambda: Pipeline(
            [("scaler", StandardScaler()), ("clf", make_bagging_logistic())]
        ),
    }


def cross_validate_models(
    model_builders: Dict[str, Callable[[], object]],
    features: np.ndarray,
    labels: np.ndarray,
    cv_splits: int = 5,
    seed: int = RANDOM_SEED,
) -> pd.DataFrame:
    """Run stratified K-fold cross validation and return accuracy/F1 means and stds per model."""
    splitter = model_selection.StratifiedKFold(
        n_splits=cv_splits,
        shuffle=True,
        random_state=seed,
    )
    rows: List[Dict[str, float]] = []
    for name, builder in model_builders.items():
        estimator = builder()
        scores = model_selection.cross_validate(
            estimator,
            features,
            labels,
            cv=splitter,
            scoring=["accuracy", "f1_macro"],
            n_jobs=None,
        )
        rows.append(
            {
                "name": name,
                "cv_accuracy_mean": scores["test_accuracy"].mean(),
                "cv_accuracy_std": scores["test_accuracy"].std(),
                "cv_macro_f1_mean": scores["test_f1_macro"].mean(),
                "cv_macro_f1_std": scores["test_f1_macro"].std(),
            }
        )
    return (
        pd.DataFrame(rows)
        .sort_values("cv_macro_f1_mean", ascending=False)
        .reset_index(drop=True)
    )


def holdout_report_for_model(
    model,
    features: np.ndarray,
    labels: np.ndarray,
    label_names: Iterable[str],
    test_size: float = 0.2,
    seed: int = RANDOM_SEED,
) -> Tuple[Dict[str, float], str]:
    """Train/validate a single model on a hold-out split and return metrics plus report."""
    X_train, X_test, y_train, y_test = model_selection.train_test_split(
        features,
        labels,
        test_size=test_size,
        stratify=labels,
        random_state=seed,
    )
    model.fit(X_train, y_train)
    preds = model.predict(X_test)
    metrics_dict = {
        "accuracy": metrics.accuracy_score(y_test, preds),
        "macro_f1": metrics.f1_score(y_test, preds, average="macro"),
    }
    report = metrics.classification_report(y_test, preds, target_names=list(label_names))
    return metrics_dict, report


In [None]:

# Purpose: Execute cross validation across all models and inspect their ranking.
model_registry = build_model_registry()
cv_results = cross_validate_models(model_registry, augmented_embeddings, records_df["category_id"].values)
cv_results


In [None]:

# Purpose: Evaluate the top-2 CV models (logistic + bagging) and LightGBM on a hold-out split for sanity check.
ranked_models = cv_results["name"].tolist()
selected = [name for name in ranked_models if name in {"logistic_regression", "bagging_log_reg", "lightgbm"}]
reports: List[Dict[str, object]] = []
for name in selected:
    model = model_registry[name]()
    metrics_dict, report = holdout_report_for_model(
        model,
        augmented_embeddings,
        records_df["category_id"].values,
        label_encoder.classes_,
    )
    reports.append({"name": name, **metrics_dict, "report": report})
reports


In [None]:

# Purpose: Use Optuna to tune the Logistic Regression pipeline and push macro-F1 higher via CV.
def tune_logistic_with_optuna(
    features: np.ndarray,
    labels: np.ndarray,
    n_trials: int = 25,
    seed: int = RANDOM_SEED,
) -> optuna.Study:
    """Optimize logistic regression hyperparameters with Optuna and return the study."""

    def objective(trial: optuna.Trial) -> float:
        C = trial.suggest_float("C", 1e-2, 10.0, log=True)
        fit_intercept = trial.suggest_categorical("fit_intercept", [True, False])
        class_weight = trial.suggest_categorical("class_weight", [None, "balanced"])
        tol = trial.suggest_float("tol", 1e-5, 1e-2, log=True)
        model = Pipeline(
            [
                ("scaler", StandardScaler()),
                (
                    "clf",
                    LogisticRegression(
                        C=C,
                        fit_intercept=fit_intercept,
                        class_weight=class_weight,
                        tol=tol,
                        max_iter=5000,
                        random_state=seed,
                    ),
                ),
            ]
        )
        splitter = model_selection.StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
        scores = model_selection.cross_val_score(
            model,
            features,
            labels,
            cv=splitter,
            scoring="f1_macro",
            n_jobs=None,
        )
        return float(scores.mean())

    study = optuna.create_study(
        direction="maximize",
        sampler=optuna.samplers.TPESampler(seed=seed),
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
    return study


log_reg_study = tune_logistic_with_optuna(augmented_embeddings, records_df["category_id"].values)
log_reg_study.best_value, log_reg_study.best_params


In [None]:

# Purpose: Tune LightGBM hyperparameters with Optuna to explore boosted-tree capacity.
def tune_lightgbm_with_optuna(
    features: np.ndarray,
    labels: np.ndarray,
    n_trials: int = 30,
    seed: int = RANDOM_SEED,
) -> optuna.Study:
    """Optimize LightGBM hyperparameters using Optuna on macro-F1."""

    def objective(trial: optuna.Trial) -> float:
        params = {
            "num_leaves": trial.suggest_int("num_leaves", 8, 64),
            "max_depth": trial.suggest_int("max_depth", 3, 12),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2, log=True),
            "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
            "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
            "bagging_freq": trial.suggest_int("bagging_freq", 1, 7),
            "min_child_samples": trial.suggest_int("min_child_samples", 5, 30),
            "lambda_l1": trial.suggest_float("lambda_l1", 1e-3, 10.0, log=True),
            "lambda_l2": trial.suggest_float("lambda_l2", 1e-3, 10.0, log=True),
        }
        model = LGBMClassifier(
            n_estimators=600,
            subsample=params.pop("bagging_fraction"),
            subsample_freq=params.pop("bagging_freq"),
            colsample_bytree=params.pop("feature_fraction"),
            random_state=seed,
            n_jobs=-1,
            verbosity=-1,
            **params,
        )
        splitter = model_selection.StratifiedKFold(n_splits=5, shuffle=True, random_state=seed)
        scores = model_selection.cross_val_score(
            model,
            features,
            labels,
            cv=splitter,
            scoring="f1_macro",
            n_jobs=None,
        )
        return float(scores.mean())

    study = optuna.create_study(
        direction="maximize",
        sampler=optuna.samplers.TPESampler(seed=seed),
    )
    study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
    return study


lightgbm_study = tune_lightgbm_with_optuna(augmented_embeddings, records_df["category_id"].values)
lightgbm_study.best_value, lightgbm_study.best_params


In [None]:

# Purpose: Build stacking/voting ensembles from the Optuna-tuned base models (non-bagging) and evaluate them.
def build_tuned_logistic(best_params: Dict[str, object], seed: int = RANDOM_SEED) -> Pipeline:
    """Instantiate a StandardScaler+LogReg pipeline using Optuna-best hyperparameters."""
    params = best_params.copy()
    return Pipeline(
        [
            ("scaler", StandardScaler()),
            (
                "clf",
                LogisticRegression(
                    C=params.get("C", 1.0),
                    fit_intercept=params.get("fit_intercept", True),
                    class_weight=params.get("class_weight"),
                    tol=params.get("tol", 1e-4),
                    max_iter=5000,
                    random_state=seed,
                ),
            ),
        ]
    )


def build_tuned_lightgbm(best_params: Dict[str, object], seed: int = RANDOM_SEED) -> LGBMClassifier:
    """Instantiate a LightGBM classifier with Optuna-best hyperparameters."""
    params = best_params.copy()
    feature_fraction = params.pop("feature_fraction", 0.9)
    bagging_fraction = params.pop("bagging_fraction", 0.9)
    bagging_freq = params.pop("bagging_freq", 1)
    return LGBMClassifier(
        n_estimators=600,
        subsample=bagging_fraction,
        subsample_freq=bagging_freq,
        colsample_bytree=feature_fraction,
        random_state=seed,
        n_jobs=-1,
        verbosity=-1,
        **params,
    )


def build_stacking_ensemble(
    log_reg: Pipeline,
    lgbm: LGBMClassifier,
    seed: int = RANDOM_SEED,
) -> StackingClassifier:
    """Create a stacking classifier that blends tuned LogReg and LightGBM."""
    estimators = [
        ("logreg", log_reg),
        ("lgbm", lgbm),
    ]
    final_estimator = LogisticRegression(max_iter=4000, random_state=seed)
    return StackingClassifier(
        estimators=estimators,
        final_estimator=final_estimator,
        stack_method="auto",
        passthrough=False,
        n_jobs=None,
    )


def build_voting_ensemble(
    log_reg: Pipeline,
    lgbm: LGBMClassifier,
) -> VotingClassifier:
    """Return a soft-voting ensemble combining tuned LogReg and LightGBM."""
    return VotingClassifier(
        estimators=[("logreg", log_reg), ("lgbm", lgbm)],
        voting="soft",
        weights=[0.6, 0.4],
        n_jobs=None,
    )


tuned_log_reg = build_tuned_logistic(log_reg_study.best_params)
tuned_lgbm = build_tuned_lightgbm(lightgbm_study.best_params)
stacking_model = build_stacking_ensemble(tuned_log_reg, tuned_lgbm)
voting_model = build_voting_ensemble(tuned_log_reg, tuned_lgbm)
stacking_metrics, stacking_report = holdout_report_for_model(
    stacking_model,
    augmented_embeddings,
    records_df["category_id"].values,
    label_encoder.classes_,
)
voting_metrics, voting_report = holdout_report_for_model(
    voting_model,
    augmented_embeddings,
    records_df["category_id"].values,
    label_encoder.classes_,
)
stacking_metrics, voting_metrics


In [None]:

# Purpose: Display the Optuna-ensemble classification reports for qualitative inspection.
print("=== Stacking ensemble report ===")
print(stacking_report)
print("=== Voting ensemble report ===")
print(voting_report)


In [None]:

# Purpose: Fit the tuned logistic model on all data to compute per-sample confidence scores and flag low-confidence predictions.
def compute_confidence_table(
    model: Pipeline,
    features: np.ndarray,
    labels: np.ndarray,
    label_names: Iterable[str],
    threshold: float = 0.8,
) -> Tuple[pd.DataFrame, Pipeline]:
    """Return a DataFrame with predictions, confidences, and a low-confidence flag."""
    fitted = model.fit(features, labels)
    probs = fitted.predict_proba(features)
    pred_ids = probs.argmax(axis=1)
    confidence = probs.max(axis=1)
    df = pd.DataFrame(
        {
            "id": records_df["id"],
            "text": records_df["text"],
            "true_label": records_df["category_label"],
            "pred_label": [label_names[idx] for idx in pred_ids],
            "pred_label_id": pred_ids,
            "confidence": confidence,
        }
    )
    df["low_confidence"] = df["confidence"] < threshold
    return df, fitted


confidence_df, tuned_log_reg_fitted = compute_confidence_table(
    tuned_log_reg,
    augmented_embeddings,
    records_df["category_id"].values,
    label_encoder.classes_,
    threshold=0.85,
)
confidence_df.sort_values("confidence").head(10)


In [None]:

# Purpose: List low-confidence cases so reviewers can manually re-label them.
low_confidence_cases = confidence_df.query("low_confidence").copy()
low_confidence_cases[["id", "text", "pred_label", "confidence"]].head(10)


In [None]:

# Purpose: Within each predicted class, surface highly similar texts to batch manual review.
def find_similar_within_prediction(
    base_embeddings: np.ndarray,
    ids: Iterable[int],
    texts: Iterable[str],
    predicted_labels: Iterable[str],
    top_k: int = 3,
    min_similarity: float = 0.9,
) -> List[Dict[str, object]]:
    """For each sample, return closest neighbors that share the predicted label."""
    id_array = np.asarray(list(ids))
    text_array = np.asarray(list(texts))
    label_array = np.asarray(list(predicted_labels))
    cos = cosine_similarity(base_embeddings)
    groups: List[Dict[str, object]] = []
    for idx in range(len(base_embeddings)):
        same_mask = label_array == label_array[idx]
        candidate_indices = np.where(same_mask)[0]
        sims = cos[idx, candidate_indices]
        neighbors = []
        for candidate_idx, sim in zip(candidate_indices, sims):
            if candidate_idx == idx or sim < min_similarity:
                continue
            neighbors.append(
                {
                    "neighbor_id": int(id_array[candidate_idx]),
                    "similarity": float(sim),
                    "neighbor_text": text_array[candidate_idx],
                }
            )
        if neighbors:
            neighbors = sorted(neighbors, key=lambda item: item["similarity"], reverse=True)[:top_k]
            groups.append(
                {
                    "id": int(id_array[idx]),
                    "pred_label": label_array[idx],
                    "text": text_array[idx],
                    "neighbors": neighbors,
                }
            )
    return groups


similar_groups = find_similar_within_prediction(
    text_embeddings,
    records_df["id"],
    records_df["text"],
    confidence_df["pred_label"],
    top_k=3,
    min_similarity=0.92,
)
similar_groups[:5]


In [None]:

# Purpose: Materialize similar-group output as a DataFrame for downstream tooling.
similar_groups_df = pd.json_normalize(similar_groups, sep=".")
similar_groups_df.head(10)
