In [28]:
import numpy as np
import pandas as pd
import re
import string
import nltk

from collections import Counter

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import RidgeClassifier
from sklearn.model_selection import GridSearchCV, train_test_split
from sklearn.metrics import classification_report
from sklearn.pipeline import Pipeline

nltk.download('stopwords')

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/marioreyes/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [54]:
class AuthorStyleTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, lowercase=True, remove_punctuation=False,
                 remove_numbers=False, min_word_length=1,
                 use_stylometric=True, use_ngram=False, use_matrix=False,
                 ngram_sizes=(2,), max_features=1000, matrix_mode='binary'):
        """
        Scikit-learn transformer for author style analysis with combined feature types.

        Args:
            lowercase: Whether to convert text to lowercase
            remove_punctuation: Whether to remove punctuation
            remove_numbers: Whether to remove numbers
            min_word_length: Minimum length of words to keep

            use_stylometric: Whether to include stylometric features
            use_ngram: Whether to include n-gram features
            use_matrix: Whether to include document-term matrix features

            ngram_sizes: Tuple of n-gram sizes to extract
            max_features: Maximum number of features per feature type
            matrix_mode: Representation mode for matrix ('binary', 'count', 'freq', 'tfidf')
        """
        self.lowercase = lowercase
        self.remove_punctuation = remove_punctuation
        self.remove_numbers = remove_numbers
        self.min_word_length = min_word_length

        self.use_stylometric = use_stylometric
        self.use_ngram = use_ngram
        self.use_matrix = use_matrix

        self.ngram_sizes = ngram_sizes
        self.max_features = max_features
        self.matrix_mode = matrix_mode

        self.punctuation_translator = str.maketrans('', '', string.punctuation)

        # Features data structures
        self.vocabulary_ = None
        self.feature_names_ = None
        self.ngram_vocab_ = {}
        self.doc_freq_ = None
        self.num_docs_ = 0
        self.stylo_feature_names_ = None
        self.feature_indices_ = {}  # Track where each feature type begins in the combined array
        self.nltk_stop_words = set(nltk.corpus.stopwords.words("english"))

    def tokenize(self, text):
        """
        Tokenize a text sequence into words.

        Args:
            text: Input text sequence

        Returns:
            list: List of tokens
        """
        # Apply preprocessing options
        if self.lowercase:
            text = text.lower()

        if self.remove_punctuation:
            text = text.translate(self.punctuation_translator)

        if self.remove_numbers:
            text = re.sub(r'\d+', '', text)

        # Split into tokens
        tokens = text.split()

        # Filter by minimum length
        tokens = [token for token in tokens if len(token) >= self.min_word_length and token not in self.nltk_stop_words]

        return tokens

    def extract_ngrams(self, text, n=2):
        """
        Extract n-grams from text.

        Args:
            text: Input text
            n: Size of n-grams

        Returns:
            list: List of n-grams
        """
        tokens = self.tokenize(text)
        units = tokens
        ngrams = []
        for i in range(len(units) - n + 1):
            ngram = units[i:i + n]
            ngrams.append(' '.join(ngram))

        return ngrams

    def extract_stylometric_features(self, text):
        """
        Extract stylometric features from text.

        Args:
            text: Input text

        Returns:
            dict: Dictionary of stylometric features
        """
        original_text = text
        tokens = self.tokenize(text)

        # Calculate features
        features = {}

        # Average word length
        if tokens:
            features['avg_word_length'] = sum(len(token) for token in tokens) / len(tokens)
        else:
            features['avg_word_length'] = 0

        # Sentence features
        sentences = re.split(r'[.!?]+', original_text)
        sentences = [s.strip() for s in sentences if s.strip()]

        if sentences:
            # Average sentence length (in words)
            features['avg_sentence_length'] = sum(len(s.split()) for s in sentences) / len(sentences)

            # Sentence length variation
            mean_len = features['avg_sentence_length']
            variance = sum((len(s.split()) - mean_len) ** 2 for s in sentences) / len(sentences)
            features['sentence_length_variance'] = variance
        else:
            features['avg_sentence_length'] = 0
            features['sentence_length_variance'] = 0

        # Lexical features
        if tokens:
            # Lexical diversity (unique words / total words)
            features['lexical_diversity'] = len(set(tokens)) / len(tokens)

            # Hapax legomena (words occurring only once)
            word_counts = Counter(tokens)
            features['hapax_percentage'] = sum(1 for w, c in word_counts.items() if c == 1) / len(tokens)
        else:
            features['lexical_diversity'] = 0
            features['hapax_percentage'] = 0

        # Punctuation density
        punctuation_count = sum(1 for char in original_text if char in string.punctuation)
        features['punctuation_density'] = punctuation_count / len(original_text) if original_text else 0

        # N-gram features
        if tokens:
            # Word n-grams
            for n in self.ngram_sizes:
                if len(tokens) >= n:
                    word_ngrams = self.extract_ngrams(text, n=n)
                    word_ngram_counts = Counter(word_ngrams)

                    # Top n-grams frequency
                    top_ngrams = word_ngram_counts.most_common(5)
                    for i, (ngram, count) in enumerate(top_ngrams):
                        features[f'word_{n}gram_{i + 1}'] = count / len(word_ngrams) if word_ngrams else 0

                    # N-gram diversity
                    features[f'word_{n}gram_diversity'] = len(word_ngram_counts) / len(
                        word_ngrams) if word_ngrams else 0

        return features

    def fit(self, X, y=None):
        """
        Fit the transformer to the data.

        Args:
            X: List of text samples
            y: Ignored (included for scikit-learn compatibility)

        Returns:
            self: The fitted transformer
        """
        self.num_docs_ = len(X)
        all_feature_names = []
        current_index = 0

        # Fit stylometric features if enabled
        if self.use_stylometric:
            sample_features = self.extract_stylometric_features(X[0])
            self.stylo_feature_names_ = list(sample_features.keys())
            all_feature_names.extend(self.stylo_feature_names_)
            self.feature_indices_['stylometric'] = (current_index, current_index + len(self.stylo_feature_names_))
            current_index += len(self.stylo_feature_names_)

        # Fit n-gram features if enabled
        if self.use_ngram:
            for n in self.ngram_sizes:
                all_ngrams = []

                for text in X:
                    text_ngrams = self.extract_ngrams(text, n=n)
                    all_ngrams.extend(text_ngrams)

                ngram_counts = Counter(all_ngrams)
                top_ngrams = ngram_counts.most_common(self.max_features)
                self.ngram_vocab_[n] = {ngram: idx for idx, (ngram, _) in enumerate(top_ngrams)}

                ngram_feature_names = [f"word_{n}gram_{ngram}" for ngram in self.ngram_vocab_[n].keys()]
                all_feature_names.extend(ngram_feature_names)

                self.feature_indices_[f'ngram_{n}'] = (current_index, current_index + len(ngram_feature_names))
                current_index += len(ngram_feature_names)

        # Fit matrix features if enabled
        if self.use_matrix:
            all_tokens = []
            for text in X:
                all_tokens.extend(self.tokenize(text))

            word_counts = Counter(all_tokens)
            most_common = word_counts.most_common(self.max_features)
            self.vocabulary_ = {word: idx for idx, (word, _) in enumerate(most_common)}

            matrix_feature_names = [f"term_{word}" for word in self.vocabulary_.keys()]
            all_feature_names.extend(matrix_feature_names)

            self.feature_indices_['matrix'] = (current_index, current_index + len(matrix_feature_names))
            current_index += len(matrix_feature_names)

            # For TF-IDF, calculate document frequencies
            if self.matrix_mode == 'tfidf':
                self.doc_freq_ = Counter()
                for text in X:
                    tokens = self.tokenize(text)
                    unique_tokens = set(t for t in tokens if t in self.vocabulary_)
                    for token in unique_tokens:
                        self.doc_freq_[token] += 1

        self.feature_names_ = all_feature_names
        return self

    def transform(self, X):
        """
        Transform the input data.

        Args:
            X: List of text samples

        Returns:
            numpy.ndarray: Transformed features
        """
        if not self.feature_names_:
            raise ValueError("Transformer must be fitted before transform")

        n_samples = len(X)
        n_features = len(self.feature_names_)
        result = np.zeros((n_samples, n_features))

        # Transform stylometric features if enabled
        if self.use_stylometric:
            start_idx, end_idx = self.feature_indices_['stylometric']
            for i, text in enumerate(X):
                features = self.extract_stylometric_features(text)
                for j, feature_name in enumerate(self.stylo_feature_names_):
                    result[i, start_idx + j] = features.get(feature_name, 0)

        # Transform n-gram features if enabled
        if self.use_ngram:
            for n in self.ngram_sizes:
                start_idx, end_idx = self.feature_indices_[f'ngram_{n}']

                for i, text in enumerate(X):
                    text_ngrams = self.extract_ngrams(text, n=n)
                    text_ngram_counts = Counter(text_ngrams)

                    for ngram, count in text_ngram_counts.items():
                        if ngram in self.ngram_vocab_[n]:
                            j = self.ngram_vocab_[n][ngram]
                            result[i, start_idx + j] = count

        # Transform matrix features if enabled
        if self.use_matrix:
            start_idx, end_idx = self.feature_indices_['matrix']

            for i, text in enumerate(X):
                tokens = self.tokenize(text)
                token_counts = Counter(t for t in tokens if t in self.vocabulary_)

                for token, count in token_counts.items():
                    if token in self.vocabulary_:
                        j = self.vocabulary_[token]

                        if self.matrix_mode == 'binary':
                            result[i, start_idx + j] = 1
                        elif self.matrix_mode == 'count':
                            result[i, start_idx + j] = count
                        elif self.matrix_mode == 'freq':
                            result[i, start_idx + j] = count / len(tokens) if tokens else 0
                        elif self.matrix_mode == 'tfidf':
                            # TF (term frequency) * IDF (inverse document frequency)
                            tf = count / len(tokens) if tokens else 0
                            idf = np.log(self.num_docs_ / (1 + self.doc_freq_[token]))
                            result[i, start_idx + j] = tf * idf

        return result

    def fit_transform(self, X, y=None):
        """
        Fit the transformer to the data and transform it.

        Args:
            X: List of text samples
            y: Ignored (included for scikit-learn compatibility)

        Returns:
            numpy.ndarray: Transformed features
        """
        return self.fit(X, y).transform(X)

    def get_feature_names_out(self):
        """
        Get output feature names for transformation.

        Returns:
            list: Feature names
        """
        if not self.feature_names_:
            raise ValueError("Transformer must be fitted before getting feature names")

        return np.array(self.feature_names_)

In [55]:
def build_author_style_pipeline(classifier='ridge', param_grid=None, cv=3, n_jobs=-1):
    """
    Build a pipeline for author style classification.

    Args:
        classifier: Type of classifier ('ridge' or 'forest')
        param_grid: Optional parameter grid (uses default if None)
        cv: Number of cross-validation folds
        n_jobs: Number of parallel jobs for grid search

    Returns:
        Fitted GridSearchCV object
    """
    # Define the feature extractor
    feature_extractor = AuthorStyleTransformer(
        lowercase=True,
        remove_punctuation=True,
        min_word_length=3,
        use_stylometric=True,
        use_ngram=True,
        use_matrix=True,
        ngram_sizes=(2, 3, ),
        max_features=20000,
        matrix_mode='tfidf'
    )

    # Create classifier
    if classifier == 'ridge':
        clf = RidgeClassifier(random_state=42)
    elif classifier == 'forest':
        clf = RandomForestClassifier(random_state=42)
    else:
        raise ValueError(f"Unsupported classifier: {classifier}")

    # Create pipeline
    pipeline = Pipeline([
        ('features', feature_extractor),
        ('classifier', clf)
    ])

    # Define default parameter grids if none provided
    if param_grid is None:
        if classifier == 'ridge':
            param_grid = {
                'features__use_stylometric': [True],
                'features__matrix_mode': ['tfidf'],
                'features__matrix_mode': [(1, 2, ), (2, 3, )]
                
            }
        elif classifier == 'forest':
            param_grid = {
                'features__use_stylometric': [True],
                'features__matrix_mode': ['tfidf'],
                'features__matrix_mode': [(1, 2, 3, )],
                'classifier__class_weight': ['balanced'],
                'classifier__n_estimators': [100, 200],
                'classifier__max_depth': [10, 20]
            }

    # Create grid search
    grid_search = GridSearchCV(
        pipeline,
        param_grid,
        cv=cv,
        n_jobs=n_jobs,
        verbose=1,
        scoring='accuracy'
    )

    return grid_search


def train_and_evaluate(X, y, classifier='ridge', test_size=0.2):
    """
    Train and evaluate the author style classification pipeline.

    Args:
        X: List of text samples
        y: List of corresponding labels
        classifier: Type of classifier ('ridge' or 'forest')
        test_size: Proportion of data to use for testing

    Returns:
        Fitted GridSearchCV object and performance metrics
    """
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=42, stratify=y
    )

    # Build and fit pipeline
    grid_search = build_author_style_pipeline(classifier=classifier)
    grid_search.fit(X_train, y_train)

    # Get best model
    best_model = grid_search.best_estimator_

    # Make predictions
    y_pred = best_model.predict(X_test)

    # Evaluate
    report = classification_report(y_test, y_pred, output_dict=True)

    print(f"Best parameters: {grid_search.best_params_}")
    print(f"Best cross-validation score: {grid_search.best_score_:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, y_pred))

    # Extract feature importance if using RandomForest
    if classifier == 'forest':
        feature_names = best_model.named_steps['features'].get_feature_names_out()
        importances = best_model.named_steps['classifier'].feature_importances_
        indices = np.argsort(importances)[::-1]

        print("\nTop 10 most important features:")
        for i in range(min(10, len(feature_names))):
            print(f"{feature_names[indices[i]]}: {importances[indices[i]]:.4f}")

    return grid_search, report

In [31]:
data = pd.read_csv('data/train.csv', sep = ',')
print(f'Total de datos = {data.size}')

print(f'Duplicados: {data.duplicated().sum()}')
data = data.drop_duplicates()
print(f'Duplicados: {data.duplicated().sum()}')

X_ = data['text']
y_ = data['label']

Total de datos = 181452
Duplicados: 3
Duplicados: 0


(112, 60018)


In [32]:
ridge_model, ridge_report = train_and_evaluate(X_.values, y_.values, classifier='ridge')

Fitting 3 folds for each of 2 candidates, totalling 6 fits
Best parameters: {'features__matrix_mode': (1, 2), 'features__use_stylometric': True}
Best cross-validation score: 0.5801

Classification Report:
              precision    recall  f1-score   support

           0       0.56      0.87      0.68      5220
           1       0.61      0.60      0.60      3253
           2       0.76      0.26      0.38      1917
           3       0.85      0.07      0.14       889
           4       0.83      0.06      0.12       818

    accuracy                           0.59     12097
   macro avg       0.72      0.37      0.38     12097
weighted avg       0.64      0.59      0.53     12097



In [26]:
ridge_model, ridge_report = train_and_evaluate(X_.values, y_.values, classifier='forest')

Fitting 3 folds for each of 4 candidates, totalling 12 fits
Best parameters: {'classifier__class_weight': 'balanced', 'classifier__max_depth': 20, 'classifier__n_estimators': 200, 'features__matrix_mode': (1, 2, 3), 'features__use_stylometric': True}
Best cross-validation score: 0.4129

Classification Report:
              precision    recall  f1-score   support

           0       0.77      0.34      0.47      5220
           1       0.46      0.62      0.53      3253
           2       0.48      0.25      0.33      1917
           3       0.26      0.19      0.22       889
           4       0.15      0.66      0.24       818

    accuracy                           0.41     12097
   macro avg       0.42      0.41      0.36     12097
weighted avg       0.56      0.41      0.43     12097


Top 10 most important features:
punctuation_density: 0.1206
avg_sentence_length: 0.0429
sentence_length_variance: 0.0297
word_3gram_1: 0.0274
avg_word_length: 0.0273
word_2gram_4: 0.0260
word_2gram_s