# Pipeline

Implementación de dos modelos de transformers para la categorización de sentimientos y emociones.

In [1]:
#!pip install xformers # se instaló la version 0.0.22

In [2]:
import concurrent.futures
from transformers import pipeline
import pandas as pd
import torch
from app.main import data_feed, data_preparation
from tools.feed import data_info

2023-10-08 16:43:40.641989: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [3]:
model_name_i = "clasificador de sentimientos"
model_path_i = "cardiffnlp/twitter-xlm-roberta-base-sentiment"
## sentiment_task = pipeline("sentiment-analysis", model=model_path_i, tokenizer=model_path_i)

model_name_ii = "clasificador de emociones"
model_path_ii = "daveni/twitter-xlm-roberta-emotion-es"
## emotion_task = pipeline("sentiment-analysis", model=model_path_ii, tokenizer=model_path_ii)

In [4]:
archivo = "corrupcion-oct.csv"

if archivo.endswith(".csv"):
    nombre = archivo[:-4]
else:
    nombre = archivo
    archivo += ".csv"

## procesamiento de la entrada de elastic search
df_cruda = data_feed.main(archivo)
df_limpia = data_preparation.main(df_cruda)
df = data_preparation.preprocesamiento(df_limpia, quitar=False)

text_corpus = [str(sentence) for sentence in df.content.to_list()]


    Resumen del procesamiento de df:

       index          columna  Nan  pct_nan   dtype  count  pct_reg  count_unique
0      5  spark_sentiment    0      0.0  object   6031    100.0             4
1      4    spark_emotion    0      0.0  object   6031    100.0             8
2      1           author    0      0.0  object   6031    100.0          2911
3      0       @timestamp    0      0.0  object   6031    100.0          3083
4      3            token    0      0.0  object   6031    100.0          3439
5      2          content    0      0.0  object   6031    100.0          4999
    
1. Limpieza del content

Longitud del batch de contenidos: 6031
Tamaño de la muestra neta: 5338


In [5]:
data_info(df)

Unnamed: 0,index,columna,Nan,pct_nan,dtype,count,pct_reg,count_unique
0,5,spark_sentiment,0,0.0,object,5338,100.0,4
1,4,spark_emotion,0,0.0,object,5338,100.0,8
2,7,hashtags,4792,89.77,object,546,10.23,329
3,6,usuarios_mencionados,343,6.43,object,4995,93.57,2139
4,1,author,0,0.0,object,5338,100.0,2910
5,0,@timestamp,0,0.0,object,5338,100.0,3082
6,3,token,0,0.0,object,5338,100.0,3438
7,2,content,0,0.0,object,5338,100.0,4997


In [6]:
# Preparación de los training sets
m = 8
nx = len(text_corpus) // m

# Construimos la muestra de batches
m_examples = []
m_datasets = []
for i in range(0, len(text_corpus), nx):
    if i == 0:
        i_antiguo = i
        continue
    m_examples.append(text_corpus[i_antiguo:i])
    m_datasets.append(df.iloc[i_antiguo:i])
    i_antiguo = i

# reiniciar indices de cada uno
for i, dataset in enumerate(m_datasets):
    m_datasets[i] = dataset.reset_index(drop=True)

## Módulo de ejecución de los modelos

Actualmente configurado con 4 workers en un Pool Multi-Hilos.

Resumen de ejecuciones al final del modulo.

La mejor configuracion es con 4 workers. Después de eso se achata la curva de performance de rendiemiento.

El tiempo de ejecución actual es de  4.0 min

In [7]:
cantidad_de_workers = 4

import time

start = time.time()
start_i = time.time()

In [8]:
print(f"Modelo: {model_name_i} | {model_path_i}")
model = pipeline("sentiment-analysis", model=model_path_i, tokenizer=model_path_i)

predicciones = {}
def predecir(inputs, position):
    with torch.no_grad():
        output = model(inputs, padding= True, truncation= True, max_length= 512)
        predicciones[position] = output

OUTPUT = []
for I in range(len(m_examples)):
    with concurrent.futures.ThreadPoolExecutor(max_workers=cantidad_de_workers) as executor:
        futures = {executor.submit(predecir, *(input_ids, i)): i for i, input_ids in enumerate(m_examples[I])}

        for future in concurrent.futures.as_completed(futures):
            i = futures[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"Error in prediction: {e}")
                predicciones[i] = result
    
    OUTPUT.append(predicciones)

end_i = time.time()

Modelo: clasificador de sentimientos | cardiffnlp/twitter-xlm-roberta-base-sentiment


In [9]:
for i in range(len(OUTPUT)):# sabemos que es 8 porque m muestras de training sets
    # Ordenamos el output de las predicciones -> están desordenadas por que algunos hilos terminan antes que otros y se desincroniza el orden
    OUTPUT[i] = {k: v for k, v in sorted(OUTPUT[i].items())}

    # creamos objeto de predicciones en formato columna
    resultado = {}
    for value_list in OUTPUT[i].values():
        for value_dict in value_list:
            for label, score in value_dict.items():
                resultado.setdefault(label, []).append(score)

    # Actualizamos objeto output
    m_datasets[i].loc[:, "sentiment"] = pd.DataFrame(resultado).label
    m_datasets[i].loc[:, "score_sentiment"] = pd.DataFrame(resultado).score


In [10]:
start_ii = time.time()

In [11]:
print(f"Modelo: {model_name_ii} | {model_path_ii}")
model = pipeline("sentiment-analysis", model=model_path_ii, tokenizer=model_path_ii)

predicciones = {}
def predecir(inputs, position):
    with torch.no_grad():
        output = model(inputs, padding= True, truncation= True, max_length= 512)
        predicciones[position] = output

OUTPUT = []
for I in range(len(m_examples)):
    with concurrent.futures.ThreadPoolExecutor(max_workers=cantidad_de_workers) as executor:
        futures = {executor.submit(predecir, *(input_ids, i)): i for i, input_ids in enumerate(m_examples[I])}

        for future in concurrent.futures.as_completed(futures):
            i = futures[future]
            try:
                result = future.result()
            except Exception as e:
                print(f"Error in prediction: {e}")
                predicciones[i] = result
    
    OUTPUT.append(predicciones)

end_ii = time.time()

Modelo: clasificador de emociones | daveni/twitter-xlm-roberta-emotion-es


In [12]:
for i in range(len(OUTPUT)):
    # Ordenamos el output de las predicciones
    OUTPUT[i] = {k: v for k, v in sorted(OUTPUT[i].items())}

    # creamos objeto de predicciones en formato columna
    resultado = {}
    for value_list in OUTPUT[i].values():
        for value_dict in value_list:
            for label, score in value_dict.items():
                resultado.setdefault(label, []).append(score)

    # Actualizamos objeto output
    m_datasets[i].loc[:, "emotion"] = pd.DataFrame(resultado).label
    m_datasets[i].loc[:, "score_emotion"] = pd.DataFrame(resultado).score

In [13]:
end = time.time()

In [14]:
# resumen de los tiempos

tiempo_modulo = end - start
tiempo_modelo_i = end_i - start_i
tiempo_modelo_ii = end_ii - start_ii

tiempo_modulo = round(tiempo_modulo, 2)
tiempo_modelo_i = round(tiempo_modelo_i, 2)
tiempo_modelo_ii = round(tiempo_modelo_ii, 2)

msg = f"""
        RESUMEN DE TIEMPOS DE EJECUCION
    workers: {cantidad_de_workers}

    Tiempo Modelos:
        - {model_name_i} = {tiempo_modelo_i} seg
        - {model_name_ii} = {tiempo_modelo_ii} seg


    Tiempo Módulo: {tiempo_modulo} seg
    Tiempo Módulo: {tiempo_modulo//60} min
"""

print(msg)


        RESUMEN DE TIEMPOS DE EJECUCION
    workers: 8

    Tiempo Modelos:
        - clasificador de sentimientos = 118.11 seg
        - clasificador de emociones = 116.92 seg


    Tiempo Módulo: 235.48 seg
    Tiempo Módulo: 3.0 min



#### Resultado de la última ejecución (los dos modelos):

**Ultima ejecución:**
- CPU times: user 2h 30min 41s, sys: 20.2 s, total: 2h 31min 2s
- Wall time: 5min 24s

**Resumen:**

2*60*60 = 7200 seg<br />
30*60 = 1800 seg<br />
41 = 41 seg<br />

- Previo a optimización: 9041 seg
- Con 4 workers = 242.8 seg
    - 1 - 4w/anterior = 1 - 242.8/9041 = 1 - 0.0268 = 0.97302
    - con 4 workers el código es 97,3% veces más rápido
- Con 8 workers = 235.48 seg
- Es apenas 7 segundos más rápido (cada 5000 datos)

## Módulo de estructuración de los resultados

In [18]:
results = pd.DataFrame()
for i in range(m):
    results = pd.concat([results, m_datasets[i]])

results = results.reset_index(drop=True)
results = results.drop(columns = ["spark_sentiment", "spark_emotion"])
data_info(results)

Unnamed: 0,index,columna,Nan,pct_nan,dtype,count,pct_reg,count_unique
0,7,score_sentiment,0,0.0,float64,5336,100.0,655
1,9,score_emotion,0,0.0,float64,5336,100.0,655
2,6,sentiment,0,0.0,object,5336,100.0,3
3,8,emotion,0,0.0,object,5336,100.0,4
4,5,hashtags,4790,89.77,object,546,10.23,329
5,4,usuarios_mencionados,343,6.43,object,4993,93.57,2138
6,1,author,0,0.0,object,5336,100.0,2908
7,0,@timestamp,0,0.0,object,5336,100.0,3082
8,3,token,0,0.0,object,5336,100.0,3438
9,2,content,0,0.0,object,5336,100.0,4995


## Módulo de descarga

In [19]:
results.to_csv("classified_corrupcion.csv")