# Text Processing:
---

In [None]:
#limpiar cache spark
#spark.catalog.clearCache()

In [1]:
import sparknlp

spark = sparknlp.start()

24/11/09 21:19:03 WARN Utils: Your hostname, ubuntu-IdeaPad-Gaming-3-15IAH7 resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface enp48s0)
24/11/09 21:19:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/ubuntu/.local/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-004f582b-459f-4066-9fbe-0fee4b5c8bf9;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;5.5.1 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-s3;1.12.500 in central
	found com.amazonaws#aws-java-sdk-kms;1.12.500 in central
	found com.amazonaws#aws-java-sdk-core;1.12.500 in central
	found commons-logging#commons-logging;1.1.3 in central
	found commons-codec#commons-codec;1.15 in central
	found org.apache.httpcomponents#httpclient;4.5.13 in central
	found org.apache.httpcomponents#httpcore;4.4.13 in central
	found software.amazon.ion#ion-java;1.0.2 in central
	found joda-time#joda-time;2.8.1 in central
	found com.amazonaws#jmespath-java;1.12.500 in centra

## Librerias:

In [2]:
# PySpark SQL Modules
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when, trim, split, udf, expr
from pyspark.sql.types import StringType, ArrayType
from sparknlp.annotator import *

# PySpark ML Modules
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# Other Libraries
import os

## Lectura de Datos
---

In [3]:
# Ruta al archivo JSON
file_path = '/home/ubuntu/Documents/sparknlp-complaints/complaints.json'

In [4]:
# Cargar archivo JSON en Spark DataFrame
df_tickets = spark.read.json(file_path)

# Desanidar las columnas dentro de _source
df_tickets_flat = df_tickets.select(
    "_id", "_index", "_score",
    col("_source.tags").alias("tags"),
    col("_source.zip_code").alias("zip_code"),
    col("_source.complaint_id").alias("complaint_id"),
    col("_source.issue").alias("issue"),
    col("_source.date_received").alias("date_received"),
    col("_source.state").alias("state"),
    col("_source.consumer_disputed").alias("consumer_disputed"),
    col("_source.product").alias("product"),
    col("_source.company_response").alias("company_response"),
    col("_source.company").alias("company"),
    col("_source.submitted_via").alias("submitted_via"),
    col("_source.date_sent_to_company").alias("date_sent_to_company"),
    col("_source.company_public_response").alias("company_public_response"),
    col("_source.sub_product").alias("sub_product"),
    col("_source.timely").alias("timely"),
    col("_source.complaint_what_happened").alias("complaint_what_happened"),
    col("_source.sub_issue").alias("sub_issue"),
    col("_source.consumer_consent_provided").alias("consumer_consent_provided"),
    "_type"
)

                                                                                

## Limpieza de Datos
---

In [5]:
# Seleccionar la columnas
df_filtered = df_tickets_flat.select('complaint_what_happened','product','sub_product')

# Concatenar las columnas 'category' y 'sub_category' con un '+' en medio
df_filtered = df_filtered.withColumn('category', F.concat_ws('+', F.col('product'), F.col('sub_product')))

# Eliminar la columna 'sub_category'
df_filtered = df_filtered.drop('product','sub_product')

# Reemplazar cadenas vacías por nulos
df_filtered = df_filtered.withColumn("complaint_what_happened", when(trim(col("complaint_what_happened")) == "", lit(None)).otherwise(col("complaint_what_happened")))        

# Eliminar filas con nulos en columnas especificadas
df_filtered = df_filtered.dropna(subset=["complaint_what_happened", "category"])

df_filtered.show()

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   Good morning my n...|Debt collection+C...|
|   I upgraded my XXX...|Credit card or pr...|
|   Chase Card was re...|Credit reporting,...|
|   On XX/XX/2018, wh...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   Can you please re...|Credit reporting,...|
|   With out notice J...|Checking or savin...|
|   During the summer...|Vehicle loan or l...|
|   On XXXX XX/XX/201...|Money transfer, v...|
|   I have a Chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   I have reached ou...|Credit reporting,...|
|   I opened an accou...|Checking or savin...|
|   To whom it may co...|Checking or savin...|
|   My chase amazon c...|Credit card or pr...|
|   I opened the savi...|Checking or savin...|
|   XXXX XXXX a sofa,...|Checking or savin...|
|   My card went miss...|Checking or savin...|
|   Chase sen

                                                                                

In [6]:
# Definir una UDF para aplicar todas las transformaciones en Spark
def clean_text_spark(df, text_column):
    # Convertir el texto a minúsculas
    df = df.withColumn(text_column, F.lower(F.col(text_column)))
    
    # Eliminar texto en corchetes []
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'\[.*?\]', ''))
    
    # Eliminar puntuación
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'[^\w\s]', ''))
    
    # Eliminar palabras que contienen números
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'\b\w*\d\w*\b', ''))
    
    # Eliminar espacios en blanco adicionales
    df = df.withColumn(text_column, F.trim(F.col(text_column)))
    
    return df

# Aplicar la función de limpieza
df_cleanedxx = clean_text_spark(df_filtered, 'complaint_what_happened')

# Mostrar el resultado limpio
df_cleanedxx.show()

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   good morning my n...|Debt collection+C...|
|   i upgraded my xxx...|Credit card or pr...|
|   chase card was re...|Credit reporting,...|
|   on  while trying ...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   can you please re...|Credit reporting,...|
|   with out notice j...|Checking or savin...|
|   during the summer...|Vehicle loan or l...|
|   on xxxx  i made a...|Money transfer, v...|
|   i have a chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   i have reached ou...|Credit reporting,...|
|   i opened an accou...|Checking or savin...|
|   to whom it may co...|Checking or savin...|
|   my chase amazon c...|Credit card or pr...|
|   i opened the savi...|Checking or savin...|
|   xxxx xxxx a sofa ...|Checking or savin...|
|   my card went miss...|Checking or savin...|
|   chase sen

In [7]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

# Define a UDF to remove "xxxx" and any extra spaces left by the removal
def remove_xxxx(text):
    # Remove "xxxx" and any spaces left behind
    return re.sub(r"\s*xxxx\s*", "", text)

# Register the UDF with Spark
remove_xxxx_udf = udf(remove_xxxx, StringType())

# Apply the UDF to the 'complaint_what_happened' column
df_cleaned = df_cleanedxx.withColumn("complaint_what_happened", remove_xxxx_udf("complaint_what_happened"))

# Verify the column after cleaning
df_cleaned.show()

[Stage 3:>                                                          (0 + 1) / 1]

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   good morning my n...|Debt collection+C...|
|   i upgraded mycard...|Credit card or pr...|
|   chase card was re...|Credit reporting,...|
|   on  while trying ...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   can you please re...|Credit reporting,...|
|   with out notice j...|Checking or savin...|
|   during the summer...|Vehicle loan or l...|
|   oni made a  payme...|Money transfer, v...|
|   i have a chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   i have reached ou...|Credit reporting,...|
|   i opened an accou...|Checking or savin...|
|   to whom it may co...|Checking or savin...|
|   my chase amazon c...|Credit card or pr...|
|   i opened the savi...|Checking or savin...|
|   a sofa love seat ...|Checking or savin...|
|   my card went miss...|Checking or savin...|
|   chase sen

                                                                                

## Lemmatization, Stopwords, Pos Tagging, CountVectorizer.
---

In [14]:
# Import necessary packages
import sparknlp
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import udf, col, concat_ws, split
from pyspark.sql.types import ArrayType, StringType
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import (
    Tokenizer,
    LemmatizerModel,
    PerceptronModel,
    StopWordsCleaner,
    NGramGenerator
)

# Start Spark session with Spark NLP
spark = sparknlp.start()

# Load your data into a DataFrame (assuming 'df_cleaned' is your input DataFrame)
# df_cleaned = spark.read.csv('your_data.csv', header=True, inferSchema=True)

# Document Assembler
document_assembler = DocumentAssembler() \
    .setInputCol("complaint_what_happened") \
    .setOutputCol("document")

# Tokenizer
tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Stop Words Remover
stop_words_cleaner = StopWordsCleaner() \
    .setInputCols(["token"]) \
    .setOutputCol("cleanTokens") \
    .setCaseSensitive(False)

# Lemmatizer (before NGramGenerator)
lemmatizer = LemmatizerModel.pretrained() \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemmaTokens")

# NGramGenerator (uses lemmatized tokens)
ngram = NGramGenerator() \
    .setN(2) \
    .setInputCols(["lemmaTokens"]) \
    .setOutputCol("bigrams")

# POS Tagger (uses bigrams from lemmatized tokens)
pos_tagger = PerceptronModel.pretrained() \
    .setInputCols(["document", "lemmaTokens"]) \
    .setOutputCol("pos")

# Finisher to extract results from annotations, including bigrams
finisher = Finisher() \
    .setInputCols(["lemmaTokens", "bigrams", "pos"]) \
    .setOutputCols(["finished_lemma", "finished_bigrams", "finished_pos"]) \
    .setCleanAnnotations(False)

# Custom Transformer to extract specific parts of speech
class POSExtractor(Transformer):
    def __init__(self, inputColPOS=None, inputColLemma=None, outputCol=None):
        super(POSExtractor, self).__init__()
        self.inputColPOS = inputColPOS
        self.inputColLemma = inputColLemma
        self.outputCol = outputCol

    def _transform(self, dataset):
        def extract_pos(pos_tags, lemmas):
            allowed_tags = ['NN', 'JJ', 'VB', 'RB']  # Nouns, Adjectives, Verbs, Adverbs
            return [lemma for pos, lemma in zip(pos_tags, lemmas) if pos in allowed_tags]
        
        extract_pos_udf = udf(extract_pos, ArrayType(StringType()))
        return dataset.withColumn(self.outputCol, extract_pos_udf(col(self.inputColPOS), col(self.inputColLemma)))

# Instantiate custom transformer
pos_extractor = POSExtractor(
    inputColPOS="finished_pos",
    inputColLemma="finished_lemma",
    outputCol="selected_tokens"
)

# Build the updated pipeline
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    stop_words_cleaner,
    lemmatizer,
    ngram,
    pos_tagger,
    finisher,
    pos_extractor
])

# Fit and transform the data
pipeline_model = pipeline.fit(df_cleaned)
df_final = pipeline_model.transform(df_cleaned)

# Combine unigrams and bigrams
df_final = df_final.withColumn(
    "all_tokens",
    split(
        concat_ws(" ", col("selected_tokens"), concat_ws(" ", col("finished_bigrams"))),
        " "
    )
)

df_final.select("all_tokens","complaint_what_happened").show()

lemma_antbnc download started this may take some time.


24/11/09 21:47:31 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


Approximate size to download 907.6 KB
[OK!]
pos_anc download started this may take some time.


24/11/09 21:47:34 WARN S3AbortableInputStream: Not all bytes were read from the S3ObjectInputStream, aborting HTTP connection. This is likely an error and may result in sub-optimal behavior. Request only the bytes you need via a ranged GET or drain the input stream after use.


Approximate size to download 3.9 MB
[OK!]
+--------------------+-----------------------+
|          all_tokens|complaint_what_happened|
+--------------------+-----------------------+
|[good, morning, n...|   good morning my n...|
|[upgrade, mycard,...|   i upgraded mycard...|
|[chase, card, rep...|   chase card was re...|
|[try, book, atick...|   on  while trying ...|
|[grand, son, chec...|   my grand son give...|
|[please, remove, ...|   can you please re...|
|[notice, jp, morg...|   with out notice j...|
|[summer, month, e...|   during the summer...|
|[oni, make, payme...|   oni made a  payme...|
|[chase, credit, c...|   i have a chase cr...|
|[mishandle, accou...|   mishandling of th...|
|[reach, toseveral...|   i have reached ou...|
|[open, account, c...|   i opened an accou...|
|[concern, chase, ...|   to whom it may co...|
|[chase, amazon, c...|   my chase amazon c...|
|[open, save, acco...|   i opened the savi...|
|[sofa, love, seat...|   a sofa love seat ...|
|[card, go, miss, 

In [11]:
df_final.select("all_tokens", "complaint_what_happened").show(truncate=False)

                                                                                

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## COUNTVECTORIZER-IDF
---

In [18]:
import mlflow
import mlflow.spark
from pyspark.ml.feature import CountVectorizer, IDF
from mlflow.models import infer_signature

remote_server_uri = "http://127.0.0.1:5000"  # set to your server URI
mlflow.set_tracking_uri(remote_server_uri)
mlflow.set_experiment("/NLP_SPARK_COMPLAINTS")

# Start an MLflow run
with mlflow.start_run() as run:
    # Apply CountVectorizer for term frequency (TF)
    # Log the parameters
    minDF = 3
    maxDF = 0.95
    mlflow.log_param("minDF", minDF)
    mlflow.log_param("maxDF", maxDF)

    # Instantiate CountVectorizer and fit the data
    count_vectorizer = CountVectorizer(inputCol="all_tokens", outputCol="tf_features", minDF=minDF, maxDF=maxDF)
    cv_model = count_vectorizer.fit(df_final)
    df_tf = cv_model.transform(df_final)

    # Apply IDF for TF-IDF
    idf = IDF(inputCol="tf_features", outputCol="tfidf_features")
    idf_model = idf.fit(df_tf)
    df_tfidf = idf_model.transform(df_tf)

    #Signatures
    outputtidf = df_tfidf.select("tfidf_features")
    inputtidf = df_tf.select("tf_features")

    outputcv = df_tf.select("tf_features")
    inputcv = df_final.select("all_tokens")

    signaturecv = infer_signature(inputcv, outputcv)
    signatureidf = infer_signature(inputtidf, outputtidf)

    # Log the CountVectorizer and IDF models to MLflow
    mlflow.spark.log_model(cv_model, "count_vectorizer_model", registered_model_name="count_vectorizer_model", signature=signaturecv)
    mlflow.spark.log_model(idf_model, "idf_model", registered_model_name="idf_model", signature=signatureidf)

print("CountVectorizer and IDF models logged to MLflow successfully.")


24/11/09 22:07:56 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInst

CountVectorizer and IDF models logged to MLflow successfully.


### LDA

In [19]:
import mlflow
import mlflow.spark
from pyspark.sql import SparkSession
from pyspark.ml.feature import NGram, CountVectorizer, IDF, Tokenizer
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import col, array_union, expr, udf
from pyspark.sql.types import IntegerType
import numpy as np
from mlflow.models import infer_signature

remote_server_uri = "http://127.0.0.1:5000"  # set to your server URI
mlflow.set_tracking_uri(remote_server_uri)
mlflow.set_experiment("/NLP_SPARK_COMPLAINTS")

# Track the experiment with MLflow
with mlflow.start_run() as run:
    # Set parameters for tracking
    k_topics = 5
    seed_value = 42
    max_iterations = 25
    
    # Log the parameters
    mlflow.log_param("k_topics", k_topics)
    mlflow.log_param("seed", seed_value)
    mlflow.log_param("max_iterations", max_iterations)

    # Log the data
    #mlflow.spark.log_table(df_tfidf, "df_tfidf")

    # Apply LDA to find latent patterns in PQRS data
    lda = LDA(k=k_topics, seed=seed_value, maxIter=max_iterations, featuresCol="tfidf_features")
    lda_model = lda.fit(df_tfidf)
    # Transform the data to assign each transaction to a topic
    df_transformed = lda_model.transform(df_tfidf)

    input = df_tfidf.select("tfidf_features")
    output = df_transformed.select("topicDistribution")

    signature = infer_signature(input, output)

    # Optionally, log metrics or results if applicable
    # For example, logging log likelihood and log perplexity:
    log_likelihood = lda_model.logLikelihood(df_tfidf)
    log_perplexity = lda_model.logPerplexity(df_tfidf)
    mlflow.log_metric("log_likelihood", log_likelihood)
    mlflow.log_metric("log_perplexity", log_perplexity)

    # Log the model
    mlflow.spark.log_model(lda_model, "lda_model", registered_model_name="lda_model_nlp", signature=signature)

    # End the MLflow run
    mlflow.end_run()

print("LDA model and parameters tracked successfully.")


24/11/09 22:18:58 ERROR Instrumentation: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "mlflow-artifacts"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.super$save(Pipeline.scala:344)
	at org.apache.spark.ml.PipelineModel$PipelineModelWriter.$anonfun$save$4(Pipeline.scala:344)
	at org.apache.spark.ml.MLEvents.withSaveInst

LDA model and parameters tracked successfully.


In [20]:
def get_predominant_topic(topic_distribution):
    return int(np.argmax(topic_distribution))

# Registrar la función UDF para obtener el tema predominante
get_predominant_topic_udf = udf(get_predominant_topic, IntegerType())

# Agregar columna para el tema predominante y mantener la distribución de temas
df_results = df_transformed.withColumn("tema_predominante", get_predominant_topic_udf("topicDistribution"))

# Mostrar el tema predominante y la distribución de temas para cada documento
df_results.select("topicDistribution", "tema_predominante").show()

# Obtener los términos predominantes de cada tema
topics = lda_model.describeTopics(10)  # Obtiene los 10 términos principales por tema

# Extraer los términos del vocabulario para interpretar cada tema
vocab = cv_model.vocabulary  # Supone que 'cv_model' es el CountVectorizer ajustado anteriormente

# Mostrar términos predominantes y distribución de temas para cada tema
for topic in range(5):  # Itera sobre el número de temas
    term_indices = topics.where(col("topic") == topic).select("termIndices").head()[0]
    term_weights = topics.where(col("topic") == topic).select("termWeights").head()[0]
    terms = [vocab[idx] for idx in term_indices]
    print(f"Tema {topic}:")
    print("Términos predominantes:", terms)
    print("Pesos de términos:", term_weights)
    print("\n")

24/11/09 22:19:07 WARN DAGScheduler: Broadcasting large task binary with size 1136.8 KiB
                                                                                

+--------------------+-----------------+
|   topicDistribution|tema_predominante|
+--------------------+-----------------+
|[0.13310670088417...|                2|
|[5.93507283365201...|                1|
|[0.00109799921949...|                2|
|[1.94947946482820...|                3|
|[0.12078275437047...|                3|
|[0.00855909609356...|                2|
|[1.03330507183237...|                1|
|[0.29203309824202...|                1|
|[7.40036904564260...|                4|
|[0.00260048908742...|                2|
|[0.00716337018741...|                1|
|[3.89779006149586...|                2|
|[6.57056267549551...|                1|
|[0.08751543646143...|                3|
|[0.02833382578359...|                4|
|[4.56366095543165...|                1|
|[8.90549190764894...|                4|
|[0.00205213960286...|                4|
|[0.32888814868016...|                2|
|[1.59795049544360...|                1|
+--------------------+-----------------+
only showing top

In [None]:
# Obtener los términos predominantes de cada tema
topics = lda_model.describeTopics(15)  # Obtiene los 15 términos principales por tema

# Extraer los términos del vocabulario para interpretar cada tema
vocab = cv_model.vocabulary  # Supone que 'cv_model' es el CountVectorizer ajustado anteriormente

# Mostrar términos predominantes y distribución de temas para cada tema
for topic in range(5):  # Itera sobre el número de temas
    term_indices = topics.where(col("topic") == topic).select("termIndices").head()[0]
    term_weights = topics.where(col("topic") == topic).select("termWeights").head()[0]
    terms = [vocab[idx] for idx in term_indices]
    print(f"Tema {topic}:")
    print("Términos predominantes:", terms)
    print("Pesos de términos:", term_weights)
    print("\n")

Tema 0:
Términos predominantes: ['loan', 'mortgage', 'home', 'modification', 'property', 'payment', 'foreclosure', 'sale', 'house', 'document', 'escrow', 'debt', 'tax', 'pay', 'year']
Pesos de términos: [0.013326386454560466, 0.010754017452233226, 0.006321814741680889, 0.00606103887914607, 0.005476479253614503, 0.004948724872998746, 0.004133237082838254, 0.003991688485649169, 0.0039873539402145415, 0.003938429379389897, 0.0037066256251785147, 0.003595674584562267, 0.003321002343225685, 0.003274737410922139, 0.0031771071020010653]


Tema 1:
Términos predominantes: ['check', 'deposit', 'fund', 'account', 'branch', 'bank', 'call', 'money', 'tell', 'day', 'cash', 'receive', 'hold', 'wire', 'claim']
Pesos de términos: [0.009318808323817632, 0.007125102833802727, 0.006117925697727863, 0.005091663852551009, 0.004617728330665142, 0.004576132862491412, 0.004268397994137908, 0.0038232843976937617, 0.0036081635814134197, 0.0034485011048234946, 0.003253731934923443, 0.0031586013452838215, 0.003156

- Tema 0: Hipotecas y Bienes Raíces

Descripción: Este tema se centra en términos relacionados con préstamos, hipotecas, propiedades, pagos y procesos de ejecución hipotecaria.
Categoría Propuesta: "Préstamos Hipotecarios y Propiedades"

- Tema 1: Operaciones Bancarias y Transacciones en Sucursales

Descripción: Los términos indican interacciones en sucursales bancarias, como depósitos, cheques, manejo de fondos y consultas de cuentas.
Categoría Propuesta: "Operaciones Bancarias y Sucursales"

- Tema 2: Informes de Crédito y Disputas

Descripción: Los términos están relacionados con informes de crédito, disputas, consultas y protección del consumidor.
Categoría Propuesta: "Reportes de Crédito y Protección del Consumidor"

- Tema 3: Tarjetas de Crédito y Cargos de Intereses

Descripción: Se enfoca en pagos de tarjetas de crédito, saldos, intereses y cargos por pagos atrasados.
Categoría Propuesta: "Tarjetas de Crédito y Cargos Financieros"

- Tema 4: Fraude y Disputas de Transacciones

Descripción: Este tema abarca términos sobre fraudes, disputas de transacciones y problemas con tarjetas y cuentas bancarias.
Categoría Propuesta: "Fraude y Disputas de Transacciones"


In [21]:
from pyspark.sql import functions as F

# Agrupar por `tema_predominante` y contar la cantidad de documentos en cada tema
df_topic_counts = df_results.groupBy("tema_predominante").agg(F.count("*").alias("count"))

# Mostrar la distribución de temas predominantes
df_topic_counts.orderBy("tema_predominante").show()

24/11/09 22:19:10 WARN DAGScheduler: Broadcasting large task binary with size 1130.0 KiB

+-----------------+-----+
|tema_predominante|count|
+-----------------+-----+
|                0| 3851|
|                1| 3120|
|                2| 3299|
|                3| 5640|
|                4| 5162|
+-----------------+-----+



24/11/09 22:21:35 WARN DAGScheduler: Broadcasting large task binary with size 1088.8 KiB
                                                                                

## Modelado

### Regresion Logistica

In [26]:
train_data, test_data = df_results.randomSplit([0.7, 0.3], seed=42)

In [29]:
import mlflow
import mlflow.spark
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from mlflow.types import Schema, ColSpec
from mlflow.models import ModelSignature
from mlflow.models import infer_signature

remote_server_uri = "http://127.0.0.1:5000"  # set to your server URI
mlflow.set_tracking_uri(remote_server_uri)

experiment_name = "/NLP_SPARK_COMPLAINTS_Classification1"
mlflow.create_experiment(experiment_name, artifact_location="file:///tmp/mlflow-artifacts")
mlflow.set_experiment(experiment_name)

# Inicia un nuevo experimento en MLflow
with mlflow.start_run() as run:
    # Crea el modelo de regresión logística
    lr = LogisticRegression(featuresCol='tfidf_features', labelCol='tema_predominante', maxIter=10)
    
    # Ajusta el modelo en los datos de entrenamiento
    lr_model = lr.fit(train_data)
    test_df = train_data.select("tfidf_features")
    
    # Realiza predicciones en el conjunto de prueba
    predictions = lr_model.transform(test_data)
    predictions_df = predictions.select("prediction")

    # Inferir Schema
    signature = infer_signature(test_df, predictions_df)
    
    # Evaluación de precisión
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="tema_predominante", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator_accuracy.evaluate(predictions)
    print(f"Test Set Accuracy = {accuracy}")
    
    # Evaluación del puntaje F1
    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="tema_predominante", predictionCol="prediction", metricName="f1")
    f1 = evaluator_f1.evaluate(predictions)
    print(f"Test Set F1 Score = {f1}")
    
    # Registra métricas adicionales en MLflow
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1)
    
    # Registra el modelo en MLflow (ya se registra automáticamente con autolog, pero puedes hacerlo explícitamente)
    mlflow.spark.log_model(lr_model, "logistic_regression_model", registered_model_name="LogisticRegression_NLP_Model", signature=signature)

print("Modelo de Regresión Logística y métricas registradas en MLflow exitosamente.")


24/11/09 22:30:29 WARN DAGScheduler: Broadcasting large task binary with size 1249.8 KiB
24/11/09 22:32:54 WARN DAGScheduler: Broadcasting large task binary with size 1251.0 KiB
24/11/09 22:32:54 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
24/11/09 22:35:23 WARN DAGScheduler: Broadcasting large task binary with size 1251.6 KiB
24/11/09 22:35:23 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
24/11/09 22:35:23 WARN DAGScheduler: Broadcasting large task binary with size 1251.6 KiB
24/11/09 22:35:23 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
24/11/09 22:35:24 WARN DAGScheduler: Broadcasting large task binary with size 1251.6 KiB
24/11/09 22:35:24 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
24/11/09 22:35:24 WARN DAGScheduler: Broadcasting large task binary with size 1251.6 KiB
24/11/09 22:35:24 WARN DAGScheduler: Broadcasting large task binary with size 1250.4 KiB
24/11/09 22:35:24 WAR

Test Set Accuracy = 0.7992006394884092


                                                                                

Test Set F1 Score = 0.7986986601913686


Registered model 'LogisticRegression_NLP_Model' already exists. Creating a new version of this model...
2024/11/09 22:40:29 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: LogisticRegression_NLP_Model, version 3
Created version '3' of model 'LogisticRegression_NLP_Model'.
2024/11/09 22:40:29 INFO mlflow.tracking._tracking_service.client: 🏃 View run bold-bat-756 at: http://127.0.0.1:5000/#/experiments/253890346147790682/runs/3dc1590496fc4957890c64055b5ef444.
2024/11/09 22:40:29 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://127.0.0.1:5000/#/experiments/253890346147790682.


Modelo de Regresión Logística y métricas registradas en MLflow exitosamente.


### Decision Tree

In [30]:
import mlflow
import mlflow.spark
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from mlflow.models import infer_signature

# Establecer el URI de seguimiento de MLflow
remote_server_uri = "http://127.0.0.1:5000"
mlflow.set_tracking_uri(remote_server_uri)

# Definir el nombre del experimento
experiment_name = "/NLP_SPARK_COMPLAINTS_Classification1"
mlflow.set_experiment(experiment_name)

# Iniciar un nuevo experimento en MLflow
with mlflow.start_run() as run:
    # Crear el modelo Decision Tree
    dt = DecisionTreeClassifier(featuresCol='tfidf_features', labelCol='tema_predominante', maxDepth=20, seed=42)
    
    # Registrar los parámetros del modelo
    mlflow.log_param("maxDepth", 20)
    mlflow.log_param("seed", 42)

    # Entrenar el modelo con los datos de entrenamiento
    dt_model = dt.fit(train_data)

    # Realizar predicciones en los datos de prueba
    dt_predictions = dt_model.transform(test_data)
    
    # Inferir el esquema de entrada y salida para el modelo
    signature = infer_signature(test_data.select("tfidf_features"), dt_predictions.select("prediction"))

    # Evaluar el modelo usando MulticlassClassificationEvaluator
    evaluator_accuracy = MulticlassClassificationEvaluator(
        labelCol="tema_predominante", predictionCol="prediction", metricName="accuracy")
    dt_accuracy = evaluator_accuracy.evaluate(dt_predictions)
    print(f"Exactitud del conjunto de prueba (Decision Tree) = {dt_accuracy}")

    evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol="tema_predominante", predictionCol="prediction", metricName="f1")
    dt_f1 = evaluator_f1.evaluate(dt_predictions)
    print(f"Puntaje F1 del conjunto de prueba (Decision Tree) = {dt_f1}")

    # Registrar las métricas en MLflow
    mlflow.log_metric("accuracy", dt_accuracy)
    mlflow.log_metric("f1_score", dt_f1)
    
    # Registrar el modelo en MLflow
    mlflow.spark.log_model(dt_model, "decision_tree_model", registered_model_name="DecisionTree_NLP_Model", signature=signature)

print("Modelo de Decision Tree y métricas registradas en MLflow exitosamente.")



24/11/09 22:45:41 WARN DAGScheduler: Broadcasting large task binary with size 1189.9 KiB
24/11/09 22:48:07 WARN DAGScheduler: Broadcasting large task binary with size 1246.1 KiB
24/11/09 22:50:26 WARN DAGScheduler: Broadcasting large task binary with size 1246.2 KiB
24/11/09 22:52:53 WARN DAGScheduler: Broadcasting large task binary with size 1393.0 KiB
24/11/09 22:55:23 WARN DAGScheduler: Broadcasting large task binary with size 1756.4 KiB
24/11/09 22:57:51 WARN DAGScheduler: Broadcasting large task binary with size 1757.2 KiB
24/11/09 22:57:52 WARN DAGScheduler: Broadcasting large task binary with size 1757.9 KiB
24/11/09 22:57:53 WARN DAGScheduler: Broadcasting large task binary with size 1759.2 KiB
24/11/09 22:57:55 WARN DAGScheduler: Broadcasting large task binary with size 1761.9 KiB
24/11/09 22:57:56 WARN DAGScheduler: Broadcasting large task binary with size 1767.1 KiB
24/11/09 22:57:58 WARN DAGScheduler: Broadcasting large task binary with size 1776.2 KiB
24/11/09 22:58:00 WAR

Exactitud del conjunto de prueba (Decision Tree) = 0.5389288569144685


                                                                                

Puntaje F1 del conjunto de prueba (Decision Tree) = 0.537718916871764


Successfully registered model 'DecisionTree_NLP_Model'.
2024/11/09 23:03:42 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: DecisionTree_NLP_Model, version 1
Created version '1' of model 'DecisionTree_NLP_Model'.
2024/11/09 23:03:43 INFO mlflow.tracking._tracking_service.client: 🏃 View run nervous-koi-618 at: http://127.0.0.1:5000/#/experiments/253890346147790682/runs/777089b6bfb34cd693cc4dea985a0800.
2024/11/09 23:03:43 INFO mlflow.tracking._tracking_service.client: 🧪 View experiment at: http://127.0.0.1:5000/#/experiments/253890346147790682.


Modelo de Decision Tree y métricas registradas en MLflow exitosamente.


### Random Forest

In [None]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Crear el modelo Random Forest
rf = RandomForestClassifier(featuresCol='tfidf_features', labelCol='tema_predominante', numTrees=50, seed=42)

# Entrenar el modelo con los datos de entrenamiento
rf_model = rf.fit(train_data)

# Realizar predicciones en los datos de prueba
rf_predictions = rf_model.transform(test_data)

# Evaluar el modelo usando MulticlassClassificationEvaluator

# Exactitud (Accuracy)
evaluator_accuracy = MulticlassClassificationEvaluator(
    labelCol="tema_predominante", predictionCol="prediction", metricName="accuracy")
rf_accuracy = evaluator_accuracy.evaluate(rf_predictions)
print(f"Exactitud del conjunto de prueba (Random Forest) = {rf_accuracy}")

# Puntaje F1
evaluator_f1 = MulticlassClassificationEvaluator(
    labelCol="tema_predominante", predictionCol="prediction", metricName="f1")
rf_f1 = evaluator_f1.evaluate(rf_predictions)
print(f"Puntaje F1 del conjunto de prueba (Random Forest) = {rf_f1}")

# Recall
evaluator_recall = MulticlassClassificationEvaluator(
    labelCol="tema_predominante", predictionCol="prediction", metricName="weightedRecall")
rf_recall = evaluator_recall.evaluate(rf_predictions)
print(f"Recall del conjunto de prueba (Random Forest) = {rf_recall}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Exactitud del conjunto de prueba (Random Forest) = 0.4545163868904876
Puntaje F1 del conjunto de prueba (Random Forest) = 0.3206838704891863
Recall del conjunto de prueba (Random Forest) = 0.4545163868904876

In [None]:
## saving
# Define la ruta en S3 para almacenar el modelo de Rndom Forest
s3_path_rf_model = "s3://bucketspark14/models/random_forest_final"

# Guardar el modelo de RF en S3
rf_model.save(s3_path_rf_model)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
## saving
# Define la ruta en S3 para almacenar el modelo de Regresión Logística
s3_path_lr_model = "/home/ubuntu/nlp-models/logisticregression"

# Guardar el modelo de Regresión Logística en S3
lr_model.save(s3_path_lr_model)

In [34]:
## saving
# Define la ruta en S3 para almacenar el modelo de DT
s3_path_dt_model = "/home/ubuntu/nlp-models/DTree"

# Guardar el modelo de DT en S3
dt_model.save(s3_path_dt_model)

In [36]:
s3_path_lda = "/home/ubuntu/nlp-models/lda_model"

# Guardar el modelo LDA en S3
lda_model.save(s3_path_lda)

In [37]:
s3_path_cv = "/home/ubuntu/nlp-models/count_vectorizer_model"
s3_path_idf = "/home/ubuntu/nlp-models/idf_model"

# Guardar el modelo CountVectorizer en S3
cv_model.save(s3_path_cv)

# Guardar el modelo IDF en S3
idf_model.save(s3_path_idf)

In [38]:
s3_path_results = "/home/ubuntu/nlp-models/results/lda_transformed_data"

# Guardar el DataFrame `df_transformed` en S3 en formato Parquet
df_transformed.write.mode("overwrite").parquet(s3_path_results)

24/11/09 23:18:18 WARN DAGScheduler: Broadcasting large task binary with size 1643.5 KiB
                                                                                

In [39]:
s3_path_results = "/home/ubuntu/nlp-models/results/cv_transformed_data"

df_tfidf.write.mode("overwrite").parquet(s3_path_results)

24/11/09 23:20:44 WARN DAGScheduler: Broadcasting large task binary with size 1042.8 KiB
                                                                                

24/11/10 01:37:07 WARN TransportChannelHandler: Exception in connection from /192.168.1.16:32791
java.io.IOException: Connection timed out
	at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233)
	at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223)
	at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:254)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1132)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:357)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:151)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOpt