In [None]:

import re
import unicodedata
import numpy as np
import pandas as pd
from joblib import dump
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression

TRAIN_PATH = "../train.csv"
EVAL_PATH = "../eval.csv"
SUBMISSION = "submission.csv"
MODEL_PATH = "model.pkl"

CHAR_NGRAMS = (2, 6)
MIN_DF = 6
MAX_DF = 0.60
MAXFEAT_CHAR = 300_000
STRIP_ACCENTS = None

LR_C = 2.3
LR_PENALTY = "l2"
LR_CLASS_WEIGHT = "balanced"
LR_MAX_ITER = 12000
RANDOM_SEED = 42
HOLDOUT_FRAC = 0.15


In [None]:
ENABLE_WORD_EXPERIMENT = False

if ENABLE_WORD_EXPERIMENT:
    try:
        import nltk
        nltk.download("punkt", quiet=True)
        nltk.download("stopwords", quiet=True)
        nltk.download("wordnet", quiet=True)
    except Exception:
        pass

    try:
        import stanza
        try:
            stanza.download("es", processors="tokenize,pos,lemma", verbose=False)
        except Exception:
            pass
        nlp_es = stanza.Pipeline("es", processors="tokenize,pos,lemma",
                                 tokenize_no_ssplit=False, use_gpu=False, verbose=False)
    except Exception:
        nlp_es = None


In [None]:

def _normalize_basic(text):
    text = unicodedata.normalize("NFKC", str(text)).lower()
    text = re.sub(r"-\s*\n", "", text)
    text = re.sub(r"\s*\n\s*", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    return text

def _clean_text_list(text_list):
    cleaned = []
    for item in text_list:
        cleaned.append(_normalize_basic(item))
    return cleaned

def _force_decade_3digits(value):
    s = str(abs(int(value)))
    if len(s) >= 3:
        return int(s[:3])
    return int(s)

def _read_train(path):
    texts = []
    labels = []
    for chunk in pd.read_csv(path, chunksize=8000, encoding="utf-8-sig",
                             engine="python", on_bad_lines="skip", dtype={"text": str, "decade": str}):
        if "text" in chunk and "decade" in chunk:
            subset = chunk[["text", "decade"]].dropna()
            for t in subset["text"].tolist():
                texts.append(str(t))
            for y in subset["decade"].tolist():
                labels.append(_force_decade_3digits(int(y)))
    if len(texts) == 0:
        raise ValueError("Train vacío tras carga.")
    df = pd.DataFrame({"text": texts, "decade": labels})
    return df[["text", "decade"]]

def _stream_eval(path, needed_text_key="text"):
    ids = []
    texts = []
    header = pd.read_csv(path, nrows=0, encoding="utf-8-sig", engine="python", on_bad_lines="skip").columns.tolist()
    if "id" not in header:
        raise ValueError(f"eval.csv debe tener 'id'. Columnas: {header}")
    if needed_text_key in header:
        key = needed_text_key
    else:
        if "text" in header:
            key = "text"
        else:
            if "texto" in header:
                key = "texto"
            else:
                key = None
    if not key:
        raise ValueError(f"eval.csv debe tener '{needed_text_key}' o 'text'/'texto'. Columnas: {header}")
    for chunk in pd.read_csv(path, chunksize=8000, encoding="utf-8-sig",
                             engine="python", on_bad_lines="skip", dtype=str):
        if "id" in chunk and key in chunk:
            subset = chunk[["id", key]].dropna()
            for v in subset["id"].tolist():
                ids.append(v)
            for v in subset[key].astype(str).tolist():
                texts.append(v)
    return ids, texts


In [None]:

df_train = _read_train(TRAIN_PATH)
texts_all = _clean_text_list(df_train["text"].tolist())
labels_all = df_train["decade"].to_numpy()
n_samples = len(labels_all)

rng = np.random.default_rng(RANDOM_SEED)
unique_labels = np.unique(labels_all)
test_mask = np.zeros(n_samples, dtype=bool)
for label in unique_labels:
    idx_label = np.where(labels_all == label)[0]
    rng.shuffle(idx_label)
    n_test_label = max(1, int(round(HOLDOUT_FRAC * len(idx_label))))
    for i in range(n_test_label):
        test_mask[idx_label[i]] = True

train_mask = ~test_mask
train_indices = np.where(train_mask)[0]
test_indices = np.where(test_mask)[0]

texts_train = []
for i in train_indices:
    texts_train.append(texts_all[i])
labels_train = labels_all[train_mask]

texts_test = []
for i in test_indices:
    texts_test.append(texts_all[i])
labels_test = labels_all[test_mask]


In [None]:

vectorizer = TfidfVectorizer(
    analyzer="char",
    ngram_range=CHAR_NGRAMS,
    min_df=MIN_DF,
    max_df=MAX_DF,
    sublinear_tf=True,
    dtype=np.float32,
    max_features=MAXFEAT_CHAR,
    strip_accents=STRIP_ACCENTS
)
X_train = vectorizer.fit_transform(texts_train)
X_test = vectorizer.transform(texts_test)

model = LogisticRegression(
    solver="saga",
    penalty=LR_PENALTY,
    C=LR_C,
    class_weight=LR_CLASS_WEIGHT,
    max_iter=LR_MAX_ITER,
    n_jobs=-1,
    random_state=RANDOM_SEED
)
model.fit(X_train, labels_train)

acc_holdout = float((model.predict(X_test) == labels_test).mean())
print(f"[HOLDOUT] acc={acc_holdout:.4f} (CHAR; C={LR_C}, min_df={MIN_DF}, max_df={MAX_DF}, maxfeat={MAXFEAT_CHAR}, strip=None)")


[HOLDOUT] acc=0.2840 (CHAR; C=2.3, min_df=6, max_df=0.6, maxfeat=300000, strip=None)


In [None]:

X_all = vectorizer.fit_transform(texts_all)
model.fit(X_all, labels_all)
dump({"vectorizer": vectorizer, "model": model}, MODEL_PATH)


['model.pkl']

In [None]:

try:
    eval_ids, eval_texts = _stream_eval(EVAL_PATH, needed_text_key="text")
    eval_texts_clean = _clean_text_list(eval_texts)
    X_eval = vectorizer.transform(eval_texts_clean)
    eval_preds = model.predict(X_eval)
    eval_preds_3 = []
    for p in eval_preds:
        eval_preds_3.append(_force_decade_3digits(p))
    pd.DataFrame({"id": eval_ids, "decade": eval_preds_3}).to_csv(SUBMISSION, index=False, encoding="utf-8")
    print(f"Finish submission: {SUBMISSION} ({len(eval_ids)} filas)")
except FileNotFoundError:
    print("No se encontró eval.csv; no se generó submission.")


Finish submission: submission.csv (3490 filas)
