# Clasificación basada en modelos de lenguaje de N-gramas

In [1]:
import re
from pathlib import Path
import numpy as np
import pandas as pd
import nltk
from sklearn.preprocessing import FunctionTransformer

nltk.download('punkt')
nltk.download('punkt_tab')

[nltk_data] Downloading package punkt to /home/lucho/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to /home/lucho/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [2]:
data_dir = Path("data/")
train_df = pd.read_csv(data_dir / "train.csv")
test_df = pd.read_csv(data_dir / "data_test.csv")

train_df.head()

Unnamed: 0,text,book,label
0,Look at his legs!”\n“Confound you handsome you...,Middlemarch,1
1,Meanwhile Mr. Pickwick had been wheeled to the...,The_Pickwick_Papers,0
2,"Rose had been in high spirits, too, and they h...",Oliver_Twist,0
3,"He\nheld his breath, and listened at the key-h...",The_Pickwick_Papers,0
4,"On hearing this most unexpected sound, Mr. Bum...",Oliver_Twist,0


In [3]:
len(train_df)

60484

In [4]:
X_train = train_df["text"]
y_train = train_df["label"]
X_test = test_df["text"]

## Procesamiento del corpus

Tokenizamos las secuencias por palabra, luego de convertir el texto a minúscula y añadimos tokens de inicio y final de secuencia. Preservamos signos de puntuación.

In [5]:
def simple_preprocessor(X: str) -> list[str]:
    """Tokenize sequence, adding <START> and <END> tokens."""
    return pd.Series([
        ["<START>"] + nltk.word_tokenize(sequence.lower()) + ["<END>"]
        for sequence in X
    ])

preprocessor = FunctionTransformer(simple_preprocessor)
X_train_tokens = preprocessor.transform(X_train)

In [6]:
for sequence in X_train_tokens[:5]:
    print(sequence)

['<START>', 'look', 'at', 'his', 'legs', '!', '”', '“', 'confound', 'you', 'handsome', 'young', 'fellows', '!', 'you', 'think', 'of', 'having', 'it', 'all', 'your', 'own', 'way', 'in', 'the', 'world', '.', '<END>']
['<START>', 'meanwhile', 'mr.', 'pickwick', 'had', 'been', 'wheeled', 'to', 'the', 'pound', ',', 'and', 'safely', 'deposited', 'therein', ',', 'fast', 'asleep', 'in', 'the', 'wheel-barrow', ',', 'to', 'the', 'immeasurable', 'delight', 'and', 'satisfaction', 'not', 'only', 'of', 'all', 'the', 'boys', 'in', 'the', 'village', ',', 'but', 'three-fourths', 'of', 'the', 'whole', 'population', ',', 'who', 'had', 'gathered', 'round', ',', 'in', 'expectation', 'of', 'his', 'waking', '.', '<END>']
['<START>', 'rose', 'had', 'been', 'in', 'high', 'spirits', ',', 'too', ',', 'and', 'they', 'had', 'walked', 'on', ',', 'in', 'merry', 'conversation', ',', 'until', 'they', 'had', 'far', 'exceeded', 'their', 'ordinary', 'bounds', '.', '<END>']
['<START>', 'he', 'held', 'his', 'breath', ',', 

## Vocabulario

In [7]:
from collections import Counter

vocabulary = Counter()
for sequence_tokens in X_train_tokens:
    vocabulary.update(sequence_tokens)

print(f"Tamaño del vocabulario: {len(vocabulary):,d}")

Tamaño del vocabulario: 46,937


In [8]:
vocabulary.most_common(20)

[(',', 167349),
 ('the', 84871),
 ('.', 68779),
 ('<START>', 60484),
 ('<END>', 60484),
 ('and', 58523),
 ('to', 53309),
 ('of', 48124),
 ('a', 41332),
 ('i', 36397),
 ('’', 31413),
 ('in', 29542),
 ('was', 24870),
 ('that', 24446),
 ('it', 23307),
 ('he', 22137),
 ('you', 20336),
 (';', 20333),
 ('his', 19836),
 ('her', 19143)]

Los tokens más comunes corresponden a pronombres, signos de puntuación, tokens de inicio y parada, además de palabras que suelen cosiderarse de parada. Como queremos crear modelos de lenguaje para cada clase, conservaremos todos estos tokens.

In [9]:
vocabulary.most_common()[:-20:-1]

[('non-existent', 1),
 ("'shout", 1),
 ('schoolhouse', 1),
 ("'walentine", 1),
 ('cock-and-bull', 1),
 ('obstruct', 1),
 ('educators', 1),
 ('plurality', 1),
 ('snuffs', 1),
 ('fast-approaching', 1),
 ('porpus', 1),
 ("'servants", 1),
 ('fingers._', 1),
 ('_wall', 1),
 ('kangaroo', 1),
 ('herefordshire', 1),
 ('bat-fowling', 1),
 ('hollyhocks', 1),
 ('jasmine-boughs', 1)]

## Modelo de ngramas

Creamos una clase que herede de `BaseEstimator`. Esta clase debe entrenar un modelo de lenguaje para cada clase del dataset. En entrenamiento se calculan los conteos para realizar las estimaciones por máxima verosimilitud y en inferencia se calcula la probabilidad de una secuencia dada para los modelos de lenguaje de cada clase y se asigna a la secuencia la clase cuyo modelo de una mayor probabilidad.

In [10]:
from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.utils.validation import check_is_fitted
from sklearn.utils.multiclass import unique_labels
from typing import Iterable, Mapping, Generator


class LanguageModel:
    def __init__(self, corpus: Iterable[list[str]], vocabulary: Mapping, ngram_size: int = 2):
        self.ngram_size = ngram_size
        self.corpus = corpus
        self.vocabulary = vocabulary
        self._context_counts = {}
        self._ngram_counts = {}
        self._compute_frequencies()
    
    def _ngram_generator(self, sequence: list[str]) -> Generator[tuple[str], None, None]:
        """Generates ngrams."""
        for i in range(len(sequence) - self.ngram_size + 1):
            yield tuple(sequence[i:i+self.ngram_size])

    def _compute_frequencies(self) -> None:
        """Training the language model by computing the frequencies of the ngrams and their contexts."""
        for sequence in self.corpus:
            for ngram in self._ngram_generator(sequence):
                context = ngram[:-1]
                self._context_counts[context] = self._context_counts.get(context, 0) + 1
                self._ngram_counts[ngram] = self._ngram_counts.get(ngram, 0) + 1

    def _compute_ngram_likelihood(self, ngram: list[str]) -> float:
        """Compute the probability of an ngram using maximum likelihood estimation and Lamplace smoothing."""
        context = ngram[:-1]
        ngram_freq = self._ngram_counts.get(ngram, 0)
        context_freq = self._context_counts.get(context, 0)
        return (ngram_freq + 1) / (context_freq + len(vocabulary))

    def sequence_probability(self, sequence: list[str], logspace: bool = True) -> float:
        """Compute the probability of a sequence as the product of ngram probabilities.
        
        Log space is used for stability.
        """
        log_p = 0
        for ngram in self._ngram_generator(sequence):
            log_p += np.log(self._compute_ngram_likelihood(ngram))
        return log_p if logspace else np.exp(log_p)


class NgramLamguageModelClassifier(BaseEstimator, ClassifierMixin):
    n_gram_size: int
    classes_: Iterable
    vocabulary: Mapping
    _langmodels: Mapping[int|str, LanguageModel]

    def __init__(self, ngram_size: int = 2):
        self.ngram_size = ngram_size
        super().__init__()

    def _build_vocabulary(self, X) -> None:
        """Given an array-like object of tokenized sequence, build a vocabulary."""
        self.vocabulary = Counter()
        for sequence_tokens in X:
            self.vocabulary.update(sequence_tokens)

    def fit(self, X, y) -> "NgramLamguageModelClassifier":
        """Train a language model for each class."""
        # store classes seen during fit
        self.classes_ = unique_labels(y)
        
        # build vocabulary
        self._build_vocabulary(X)

        # train a language model for each class
        self._langmodels = {}
        for class_label in self.classes_:
            # filter text belonging to that class
            class_indexes = (y == class_label)
            class_corpus = X[class_indexes]
            # train ngram model
            class_model = LanguageModel(class_corpus, vocabulary, ngram_size=self.ngram_size)
            self._langmodels[class_label] = class_model
            
        return self
    
    def predict(self, X) -> np.array:
        """Return predicted classses for a set of inputs.
        
        Inputs must be given as an iterable of lists of tokens.
        """
        # check if fit has been called
        check_is_fitted(self)
        
        probs = self._sequence_probs(X)
        return np.argmax(probs, axis=1)
    
    def _sequence_probs(self, X) -> np.array:
        """Returns probabilities for each class as computed from the language model.
        
        Note that this does not returns probability distributions."""
        # check if fit has been called
        check_is_fitted(self)

        # sequence probabilities computed from language model
        sequences_probs = np.array([
            [
                self._langmodels.get(class_label).sequence_probability(sequence) 
                for class_label in self.classes_
            ]
            for sequence in X
        ])

        return sequences_probs

## Selección y Evaluación

Usamos grid search con modelos de unigramas hasta 5-gramas usando como criterio la métrica f1 macro.

In [11]:
from sklearn.model_selection import GridSearchCV

clf = NgramLamguageModelClassifier()
param_grid = {"ngram_size": [1,2,3,4,5]}
search = GridSearchCV(clf, param_grid, scoring="f1_macro", cv=5, n_jobs=-1)
search.fit(X_train_tokens, y_train)


In [12]:
pd.DataFrame(search.cv_results_).sort_values(by="rank_test_score")

Unnamed: 0,mean_fit_time,std_fit_time,mean_score_time,std_score_time,param_ngram_size,params,split0_test_score,split1_test_score,split2_test_score,split3_test_score,split4_test_score,mean_test_score,std_test_score,rank_test_score
0,4.187857,0.786864,11.969135,1.034715,1,{'ngram_size': 1},0.860806,0.85883,0.863676,0.864225,0.866454,0.862798,0.002679,1
2,8.499283,0.79393,15.568293,0.755789,3,{'ngram_size': 3},0.648974,0.647531,0.644652,0.662993,0.656613,0.652152,0.00671,2
3,7.601169,0.497524,14.429006,1.123506,4,{'ngram_size': 4},0.624902,0.624907,0.623978,0.635788,0.632994,0.628514,0.004891,3
1,6.378956,0.729623,16.248383,0.702373,2,{'ngram_size': 2},0.547932,0.553778,0.552875,0.562231,0.556987,0.55476,0.004731,4
4,6.711497,0.262067,9.857089,1.842717,5,{'ngram_size': 5},0.450863,0.442568,0.450937,0.455139,0.448337,0.449569,0.004126,5


## Mejor Modelo

Creamos un pipeline con el tokenizador y el mejor modelo para generar nuestras predicciones sobre el conjunto de test

In [13]:
from sklearn.pipeline import Pipeline

model = Pipeline([
    ("tokenizer", preprocessor),
    ("classifier", search.best_estimator_)
])

pred = model.predict(X_test)

In [25]:
results = pd.DataFrame.from_dict({"ID": test_df["ID"], "label": pred}, orient="columns")
results

Unnamed: 0,ID,label
0,1,0
1,2,2
2,3,0
3,4,0
4,5,0
...,...,...
5110,5111,4
5111,5112,4
5112,5113,4
5113,5114,0


In [27]:
results.to_csv(data_dir / "sumbmission.csv", index=False)