In [1]:
# Instalar dependencias
!pip install pandas nltk scikit-learn vaderSentiment pytest
!pip install vaderSentiment
!pip install catboost



## **Construir el Pipeline de Datos**
Este enfoque construye un pipeline de procesamiento, verifica la calidad de los datos con controles clave y usa pruebas de unidad para asegurar que cada función del pipeline funcione correctamente. Esta estructura garantiza que el pipeline de procesamiento de datos esté bien diseñado, probado y listo para su uso en modelos de análisis de sentimiento.

El pipeline procesará los datos desde su estado original hasta el estado final, listos para su uso en un modelo de clasificación de sentimiento.


In [22]:
# my_pipeline.py

import pandas as pd
import re
import string
import numpy as np
import os
import kagglehub
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import PorterStemmer
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import precision_score, recall_score, f1_score
import nltk
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding, GlobalAveragePooling1D, Dense
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.callbacks import EarlyStopping


# Inicializar componentes necesarios
nltk.download('stopwords')
nltk.download('punkt')
stop_words = set(stopwords.words('english'))
stemmer = PorterStemmer()
analyzer = SentimentIntensityAnalyzer()
vectorizer = TfidfVectorizer(max_features=1000)

# Descargar y cargar datos de Kaggle con una muestra limitada
def load_data():
    path = kagglehub.dataset_download("kritanjalijain/amazon-reviews")
    train_file = os.path.join(path, 'train.csv')
    test_file = os.path.join(path, 'test.csv')

    columns = ['polaridad', 'titulo', 'texto']
    df_train = pd.read_csv(train_file, names=columns).head(5000)
    df_test = pd.read_csv(test_file, names=columns).head(1000)

    df_train = concat_columns(df_train, 'texto', 'titulo', 'texto')
    df_test = concat_columns(df_test, 'texto', 'titulo', 'texto')
    df_train['polaridad'] = df_train['polaridad'].map({1:0, 2:1})
    df_test['polaridad'] = df_test['polaridad'].map({1:0, 2:1})

    return df_train, df_test

# Funciones auxiliares
def concat_columns(df, col1, col2, new_col):
    df[new_col] = df[col1].apply(str) + ' ' + df[col2].apply(str)
    df.drop(col2, axis=1, inplace=True)
    return df

def clean_text(text):
    text = text.lower()  # Convertir a minúsculas
    text = re.sub(r'http\S+|www\S+', '', text)  # Eliminar URLs (http, https y www)
    text = re.sub(r'@\w+', '', text)  # Eliminar menciones (@usuario)
    text = text.translate(str.maketrans('', '', string.punctuation))  # Eliminar signos de puntuación
    return text.strip()  # Eliminar espacios adicionales al inicio y final

def tokenize_and_stem(text):
    tokens = [stemmer.stem(word) for word in word_tokenize(text) if word not in stop_words]
    return tokens

def preprocess_data(df):
    df['texto_limpio'] = df['texto'].apply(clean_text)
    df['tokens'] = df['texto_limpio'].apply(tokenize_and_stem)
    return df

def sentiment_analysis(df):
    df['compound'] = df['texto_limpio'].apply(lambda x: analyzer.polarity_scores(x)['compound'])
    df['sentiment'] = df['compound'].apply(lambda score: 'Positivo' if score > 0.05 else 'Negativo' if score < -0.05 else 'Neutral')
    return df

# Función para vectorizar el texto
def vectorize_text(df_train, df_test):
    X_train = vectorizer.fit_transform(df_train['texto_limpio'])
    X_test = vectorizer.transform(df_test['texto_limpio'])
    return X_train, X_test


# Función para configurar, entrenar y evaluar el modelo
def train_model(df_train, df_test):
    # Parámetros del preprocesamiento
    max_words = 10000
    max_len = 200

    # Tokenización y preprocesamiento de texto
    tokenizer = Tokenizer(num_words=max_words)
    tokenizer.fit_on_texts(df_train['texto_limpio'])

    X_train = pad_sequences(tokenizer.texts_to_sequences(df_train['texto_limpio']), maxlen=max_len)
    X_test = pad_sequences(tokenizer.texts_to_sequences(df_test['texto_limpio']), maxlen=max_len)

    y_train = df_train['polaridad'].values
    y_test = df_test['polaridad'].values

    # Configuración del modelo
    embedding_dim = 16
    model = Sequential([
        Embedding(input_dim=max_words, output_dim=embedding_dim, input_length=max_len),
        GlobalAveragePooling1D(),
        Dense(16, activation='relu'),
        Dense(1, activation='sigmoid')
    ])

    # Compilar el modelo
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    # Configurar EarlyStopping para evitar sobreajuste
    early_stop = EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True)

    # Entrenar el modelo
    model.fit(X_train, y_train, epochs=10, batch_size=32, validation_split=0.2, callbacks=[early_stop])

    # Evaluar en el conjunto de prueba
    loss, accuracy = model.evaluate(X_test, y_test, verbose=0)
    print(f"Test Accuracy: {accuracy:.4f}")
    print(f"Test Loss: {loss:.4f}")

    # Obtener predicciones en el conjunto de prueba
    y_pred = (model.predict(X_test) > 0.5).astype("int32")

    # Calcular métricas adicionales
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)

    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")

# Ejecutar el pipeline completo
if __name__ == "__main__":
    df_train, df_test = load_data()
    df_train = preprocess_data(df_train)
    df_test = preprocess_data(df_test)
    train_model(df_train, df_test)



[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


Epoch 1/10




[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 6ms/step - accuracy: 0.5160 - loss: 0.6924 - val_accuracy: 0.5710 - val_loss: 0.6858
Epoch 2/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.5282 - loss: 0.6857 - val_accuracy: 0.6330 - val_loss: 0.6653
Epoch 3/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.5896 - loss: 0.6527 - val_accuracy: 0.7040 - val_loss: 0.6238
Epoch 4/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 5ms/step - accuracy: 0.7686 - loss: 0.5628 - val_accuracy: 0.8070 - val_loss: 0.5177
Epoch 5/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 6ms/step - accuracy: 0.8229 - loss: 0.4616 - val_accuracy: 0.8450 - val_loss: 0.4441
Epoch 6/10
[1m125/125[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 4ms/step - accuracy: 0.8800 - loss: 0.3708 - val_accuracy: 0.8360 - val_loss: 0.4033
Epoch 7/10
[1m125/125[0m [32m━━━━━━━

## **Pruebas de Unidad para Validar el Pipeline**
A continuación, se define un conjunto de pruebas de unidad para validar las transformaciones de texto y las salidas de cada etapa del pipeline.



In [23]:
# Pruebas unitarias

def test_load_data():
    df_train, df_test = load_data()
    assert not df_train.empty, "El DataFrame de entrenamiento está vacío."
    assert not df_test.empty, "El DataFrame de prueba está vacío."

def test_clean_text():
    raw_text = "This is a test! Check out https://example.com @user"
    expected = "this is a test check out"
    result = clean_text(raw_text)
    assert result == expected, f"La función clean_text no limpia correctamente el texto. Resultado: {result}"

def test_tokenize_and_stem():
    text = "this is a simple text example"
    expected_tokens = ["simpl", "text", "exampl"]
    assert tokenize_and_stem(text) == expected_tokens, "La función tokenize_and_stem no funciona correctamente."

def test_preprocess_data():
    df_train, _ = load_data()
    df_processed = preprocess_data(df_train)
    assert 'texto_limpio' in df_processed.columns, "La columna 'texto_limpio' falta en el DataFrame procesado."
    assert 'tokens' in df_processed.columns, "La columna 'tokens' falta en el DataFrame procesado."

def test_sentiment_analysis():
    data = {'texto_limpio': ["I love this!", "I hate this!", "It's ok."]}
    df = pd.DataFrame(data)
    df = sentiment_analysis(df)
    assert set(df['sentiment']) == {"Positivo", "Negativo", "Neutral"}, "La función sentiment_analysis no clasifica correctamente los sentimientos."

# Ejecutar las pruebas
test_load_data()
test_clean_text()
test_tokenize_and_stem()
test_preprocess_data()
test_sentiment_analysis()

print("Todas las pruebas pasaron correctamente.")


Todas las pruebas pasaron correctamente.


## **Ejecutar Controles de Calidad**
Los controles de calidad aseguran que los datos procesados cumplan con los estándares esperados.





In [24]:
# Función para controles de calidad en los datos procesados
def quality_checks(df):
    # 1. Verificar que no haya valores nulos
    assert df.isnull().sum().sum() == 0, "Hay valores nulos en el dataframe procesado."

    # 2. Verificar que polaridad tenga solo valores binarios (0 y 1)
    assert set(df['polaridad'].unique()).issubset({0, 1}), "La columna 'polaridad' contiene valores no binarios."

    # 3. Comprobar columnas necesarias en el dataframe
    required_columns = ['texto', 'polaridad', 'texto_limpio', 'tokens', 'compound', 'sentiment']
    for col in required_columns:
        assert col in df.columns, f"La columna {col} falta en el dataframe."

    print("Todos los controles de calidad se aprobaron.")

# Pipeline completo de datos
def data_pipeline():
    # Cargar datos de Kaggle y aplicar muestra
    df_train, df_test = load_data()

    # Preprocesamiento y análisis de sentimiento
    df_train = preprocess_data(df_train)
    df_test = preprocess_data(df_test)
    df_train = sentiment_analysis(df_train)
    df_test = sentiment_analysis(df_test)

    # Vectorización del texto
    X_train, X_test = vectorize_text(df_train, df_test)
    y_train, y_test = df_train['polaridad'], df_test['polaridad']

    return X_train, X_test, y_train, y_test, df_train, df_test

# Ejecutar el pipeline y los controles de calidad
X_train, X_test, y_train, y_test, df_train, df_test = data_pipeline()
quality_checks(df_train)
quality_checks(df_test)


Todos los controles de calidad se aprobaron.
Todos los controles de calidad se aprobaron.
