# Text Processing:
---

In [1]:
import sparknlp

spark = sparknlp.start()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1730817751874_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [2]:
sc._conf.get('spark.executor.memory')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'7g'

## Librerias:

In [3]:
# PySpark SQL Modules
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, when, trim, split, udf, expr
from pyspark.sql.types import StringType, ArrayType
from sparknlp.annotator import *

# PySpark ML Modules
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# NLTK Modules
import nltk
from nltk.stem import WordNetLemmatizer

# Other Libraries
import os

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Lectura de Datos
---

In [4]:
# Ruta al archivo JSON
file_path = 's3://bucketspark14/complaints.json'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
# Cargar archivo JSON en Spark DataFrame
df_tickets = spark.read.json(file_path)

# Desanidar las columnas dentro de _source
df_tickets_flat = df_tickets.select(
    "_id", "_index", "_score",
    col("_source.tags").alias("tags"),
    col("_source.zip_code").alias("zip_code"),
    col("_source.complaint_id").alias("complaint_id"),
    col("_source.issue").alias("issue"),
    col("_source.date_received").alias("date_received"),
    col("_source.state").alias("state"),
    col("_source.consumer_disputed").alias("consumer_disputed"),
    col("_source.product").alias("product"),
    col("_source.company_response").alias("company_response"),
    col("_source.company").alias("company"),
    col("_source.submitted_via").alias("submitted_via"),
    col("_source.date_sent_to_company").alias("date_sent_to_company"),
    col("_source.company_public_response").alias("company_public_response"),
    col("_source.sub_product").alias("sub_product"),
    col("_source.timely").alias("timely"),
    col("_source.complaint_what_happened").alias("complaint_what_happened"),
    col("_source.sub_issue").alias("sub_issue"),
    col("_source.consumer_consent_provided").alias("consumer_consent_provided"),
    "_type"
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Limpieza de Datos
---

In [6]:
# Seleccionar la columnas
df_filtered = df_tickets_flat.select('complaint_what_happened','product','sub_product')

# Concatenar las columnas 'category' y 'sub_category' con un '+' en medio
df_filtered = df_filtered.withColumn('category', F.concat_ws('+', F.col('product'), F.col('sub_product')))

# Eliminar la columna 'sub_category'
df_filtered = df_filtered.drop('product','sub_product')

# Reemplazar cadenas vacías por nulos
df_filtered = df_filtered.withColumn("complaint_what_happened", when(trim(col("complaint_what_happened")) == "", lit(None)).otherwise(col("complaint_what_happened")))        

# Eliminar filas con nulos en columnas especificadas
df_filtered = df_filtered.dropna(subset=["complaint_what_happened", "category"])

df_filtered.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   Good morning my n...|Debt collection+C...|
|   I upgraded my XXX...|Credit card or pr...|
|   Chase Card was re...|Credit reporting,...|
|   On XX/XX/2018, wh...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   Can you please re...|Credit reporting,...|
|   With out notice J...|Checking or savin...|
|   During the summer...|Vehicle loan or l...|
|   On XXXX XX/XX/201...|Money transfer, v...|
|   I have a Chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   I have reached ou...|Credit reporting,...|
|   I opened an accou...|Checking or savin...|
|   To whom it may co...|Checking or savin...|
|   My chase amazon c...|Credit card or pr...|
|   I opened the savi...|Checking or savin...|
|   XXXX XXXX a sofa,...|Checking or savin...|
|   My card went miss...|Checking or savin...|
|   Chase sen

In [7]:
# Definir una UDF para aplicar todas las transformaciones en Spark
def clean_text_spark(df, text_column):
    # Convertir el texto a minúsculas
    df = df.withColumn(text_column, F.lower(F.col(text_column)))
    
    # Eliminar texto en corchetes []
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'\[.*?\]', ''))
    
    # Eliminar puntuación
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'[^\w\s]', ''))
    
    # Eliminar palabras que contienen números
    df = df.withColumn(text_column, F.regexp_replace(F.col(text_column), r'\b\w*\d\w*\b', ''))
    
    # Eliminar espacios en blanco adicionales
    df = df.withColumn(text_column, F.trim(F.col(text_column)))
    
    return df

# Aplicar la función de limpieza
df_cleaned = clean_text_spark(df_filtered, 'complaint_what_happened')

# Mostrar el resultado limpio
df_cleaned.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   good morning my n...|Debt collection+C...|
|   i upgraded my xxx...|Credit card or pr...|
|   chase card was re...|Credit reporting,...|
|   on  while trying ...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   can you please re...|Credit reporting,...|
|   with out notice j...|Checking or savin...|
|   during the summer...|Vehicle loan or l...|
|   on xxxx  i made a...|Money transfer, v...|
|   i have a chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   i have reached ou...|Credit reporting,...|
|   i opened an accou...|Checking or savin...|
|   to whom it may co...|Checking or savin...|
|   my chase amazon c...|Credit card or pr...|
|   i opened the savi...|Checking or savin...|
|   xxxx xxxx a sofa ...|Checking or savin...|
|   my card went miss...|Checking or savin...|
|   chase sen

In [8]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import re

# Define a UDF to remove "xxxx" and any extra spaces left by the removal
def remove_xxxx(text):
    # Remove "xxxx" and any spaces left behind
    return re.sub(r"\s*xxxx\s*", "", text)

# Register the UDF with Spark
remove_xxxx_udf = udf(remove_xxxx, StringType())

# Apply the UDF to the 'complaint_what_happened' column
df_cleaned = df_cleaned.withColumn("complaint_what_happened", remove_xxxx_udf("complaint_what_happened"))

# Verify the column after cleaning
df_cleaned.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------+--------------------+
|complaint_what_happened|            category|
+-----------------------+--------------------+
|   good morning my n...|Debt collection+C...|
|   i upgraded mycard...|Credit card or pr...|
|   chase card was re...|Credit reporting,...|
|   on  while trying ...|Credit reporting,...|
|   my grand son give...|Checking or savin...|
|   can you please re...|Credit reporting,...|
|   with out notice j...|Checking or savin...|
|   during the summer...|Vehicle loan or l...|
|   oni made a  payme...|Money transfer, v...|
|   i have a chase cr...|Credit card or pr...|
|   mishandling of th...|Vehicle loan or l...|
|   i have reached ou...|Credit reporting,...|
|   i opened an accou...|Checking or savin...|
|   to whom it may co...|Checking or savin...|
|   my chase amazon c...|Credit card or pr...|
|   i opened the savi...|Checking or savin...|
|   a sofa love seat ...|Checking or savin...|
|   my card went miss...|Checking or savin...|
|   chase sen

## Lemmatization, Stopwords, Pos Tagging, CountVectorizer.
---

In [41]:
from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType
import sparknlp
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer as NLPTokenizer
from sparknlp.annotator import LemmatizerModel, PerceptronModel, StopWordsCleaner

# Iniciar sesión de Spark con Spark NLP
# spark = sparknlp.start()

# Document Assembler
document_assembler = DocumentAssembler() \
    .setInputCol("complaint_what_happened") \
    .setOutputCol("document")

# Tokenizer
tokenizer = NLPTokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

# Stop Words Remover
stop_words_cleaner = StopWordsCleaner() \
    .setInputCols(["token"]) \
    .setOutputCol("cleanTokens") \
    .setCaseSensitive(False)

# POS Tagger
pos_tagger = PerceptronModel.pretrained("pos_anc") \
    .setInputCols(["document", "cleanTokens"]) \
    .setOutputCol("pos")

# Lemmatizer
lemmatizer = LemmatizerModel.pretrained("lemma_antbnc") \
    .setInputCols(["cleanTokens"]) \
    .setOutputCol("lemma")

# Finisher to extract results from annotations
finisher = Finisher() \
    .setInputCols(["lemma", "pos"]) \
    .setOutputCols(["finished_lemma", "finished_pos"]) \
    .setCleanAnnotations(False)

# Custom Transformer to remove pronouns and extract nouns
class NounExtractor(Transformer):
    def __init__(self, inputColPOS=None, inputColLemma=None, outputCol=None):
        super(NounExtractor, self).__init__()
        self.inputColPOS = inputColPOS
        self.inputColLemma = inputColLemma
        self.outputCol = outputCol

    def _transform(self, dataset):
        def extract_nouns(pos_tags, lemmas):
            noun_tags = ['NN', 'NNS', 'NNP', 'NNPS']
            return [lemma for pos, lemma in zip(pos_tags, lemmas) if pos in noun_tags]
        
        extract_nouns_udf = udf(extract_nouns, ArrayType(StringType()))
        return dataset.withColumn(self.outputCol, extract_nouns_udf(col(self.inputColPOS), col(self.inputColLemma)))

# Instantiate custom transformer
noun_extractor = NounExtractor(inputColPOS="finished_pos", inputColLemma="finished_lemma", outputCol="noun_tokens")

# Build the pipeline
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    stop_words_cleaner,
    pos_tagger,
    lemmatizer,
    finisher,
    noun_extractor
])

# Fit and transform the data
pipeline_model = pipeline.fit(df_cleaned)
df_final = pipeline_model.transform(df_cleaned)
df_final.select("noun_tokens", "pos").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]
lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, StringType

# 1. Vectorización: Aplicar CountVectorizer
count_vectorizer = CountVectorizer(inputCol="noun_tokens", outputCol="features")
cv_model = count_vectorizer.fit(df_final)  # Ajuste del modelo de vectorización
df_cv = cv_model.transform(df_final)  # Transformación de los datos para obtener la columna "features"

# 2. Entrenamiento del modelo LDA
lda = LDA(k=5, maxIter=10, featuresCol="features")
lda_model = lda.fit(df_cv)

# 3. Obtener los temas
topics = lda_model.describeTopics()

# 4. Mapear índices a palabras
# Extraer vocabulario de CountVectorizer
vocab = cv_model.vocabulary

# Definir función para mapear índices a palabras
def indices_to_words(indices):
    return [vocab[idx] for idx in indices]

# Crear UDF para aplicar la función
indices_to_words_udf = udf(indices_to_words, ArrayType(StringType()))

# Añadir columna con palabras correspondientes a los índices
topics_with_words = topics.withColumn("terms", indices_to_words_udf(col("termIndices")))

# Mostrar temas con palabras en lugar de índices
topics_with_words.show()


In [None]:
topics_with_words.select("topic", "terms", "termWeights").show(truncate=False)

## (alternativa) Embeddings

In [33]:
# Importar las librerías necesarias
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from sparknlp.base import DocumentAssembler, EmbeddingsFinisher
from sparknlp.annotator import Tokenizer, WordEmbeddingsModel, SentenceEmbeddings
from pyspark.ml.feature import StringIndexer, StopWordsRemover
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Definir las etapas del pipeline
document_assembler = DocumentAssembler() \
    .setInputCol("complaint_what_happened") \
    .setOutputCol("document")

tokenizer = Tokenizer() \
    .setInputCols(["document"]) \
    .setOutputCol("token")

stop_words_cleaner = StopWordsCleaner() \
    .setInputCols(["token"]) \
    .setOutputCol("cleanTokens")

# Usar embeddings ligeros
embeddings = WordEmbeddingsModel.pretrained("glove_100d") \
    .setInputCols(["document", "cleanTokens"]) \
    .setOutputCol("embeddings")

sentence_embeddings = SentenceEmbeddings() \
    .setInputCols(["document", "embeddings"]) \
    .setOutputCol("sentence_embeddings") \
    .setPoolingStrategy("AVERAGE")

# Usar EmbeddingsFinisher para extraer los embeddings
embeddings_finisher = EmbeddingsFinisher() \
    .setInputCols(["sentence_embeddings"]) \
    .setOutputCols(["finished_embeddings"]) \
    .setOutputAsVector(True) \
    .setCleanAnnotations(False)

# Construir el pipeline
pipeline = Pipeline(stages=[
    document_assembler,
    tokenizer,
    stop_words_cleaner,
    embeddings,
    sentence_embeddings,
    embeddings_finisher
])

# Ajustar el pipeline y transformar los datos
pipeline_model = pipeline.fit(df_cleaned)
df_embeddings = pipeline_model.transform(df_cleaned)
df_embeddings.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]
+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|complaint_what_happened|            category|            document|               token|         cleanTokens|          embeddings| sentence_embeddings| finished_embeddings|
+-----------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|   good morning my n...|Debt collection+C...|[{document, 0, 44...|[{token, 0, 3, go...|[{token, 0, 3, go...|[{word_embeddings...|[{sentence_embedd...|[[0.0094272485002...|
|   i upgraded mycard...|Credit card or pr...|[{document, 0, 29...|[{token, 0, 0, i,...|[{token, 2, 9, up...|[{word_embeddings...|[{sentence_embedd...|[[0.0441840477287...|
|   chase card was re...|Credit report

In [34]:
df_embeddings.select("sentence_embeddings", "finished_embeddings").show(5, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [36]:
from pyspark.sql.functions import col, size, flatten

# Use `sentence_embeddings` as the feature column and flatten it
df_features = df_embeddings.withColumn("features", flatten(col("sentence_embeddings.embeddings")))

# Confirm that all features have the expected dimension of 100
df_features.select(size(col("features")).alias("feature_size")).groupBy("feature_size").count().show()

# Filter out any rows where the feature size is not 100
df_non_empty = df_features.filter(size(col("features")) == 100)

# Proceed with KMeans clustering
kmeans = KMeans().setK(5).setSeed(1).setFeaturesCol("features").setPredictionCol("cluster")
model = kmeans.fit(df_non_empty)
df_clustered = model.transform(df_non_empty)

# Interpret the clusters
df_clustered.groupBy("cluster", "category").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+-----+
|feature_size|count|
+------------+-----+
|         100|21068|
|           0|    4|
+------------+-----+

+-------+--------------------+-----+
|cluster|            category|count|
+-------+--------------------+-----+
|      3|Debt collection+O...|    2|
|      2|Mortgage+FHA mort...|   38|
|      0|Prepaid card+Gove...|    1|
|      1|Consumer Loan+Ins...|    2|
|      2|Mortgage+Home equ...|   16|
|      4|Money transfer, v...|  172|
|      4|Mortgage+VA mortgage|   23|
|      3|Credit card or pr...|    5|
|      2|Checking or savin...|    5|
|      3|Credit reporting,...|    1|
|      1|Money transfer, v...|    3|
|      4|Payday loan, titl...|    3|
|      0|Money transfer, v...|    2|
|      4|Credit card or pr...|    5|
|      0|Consumer Loan+Veh...|    9|
|      1|Bank account or s...|    8|
|      2|Money transfer, v...|   62|
|      0|Debt collection+M...|   34|
|      4|Consumer Loan+Paw...|    1|
|      0|Money transfer, v...|   10|
+-------+-------------

In [37]:
centers = model.clusterCenters()
wssse = df_clustered.rdd.map(
    lambda row: float(sum((row["features"][i] - centers[row["cluster"]][i]) ** 2 for i in range(len(row["features"]))))
).sum()

print("Within-Cluster Sum of Squared Errors (WSSSE) = " + str(wssse))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Within-Cluster Sum of Squared Errors (WSSSE) = 14520.128747812982

In [38]:
from pyspark.ml.evaluation import ClusteringEvaluator

# Initialize the ClusteringEvaluator
evaluator = ClusteringEvaluator(featuresCol="features", predictionCol="cluster", metricName="silhouette", distanceMeasure="squaredEuclidean")

# Calculate the Silhouette Score
silhouette_score = evaluator.evaluate(df_clustered)
print("Silhouette Score = " + str(silhouette_score))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Silhouette Score = 0.09824491880211028

In [31]:
# Interpret the clusters
df_clustered.groupBy("cluster", "category").count().show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------------------+-----+
|cluster|            category|count|
+-------+--------------------+-----+
|      3|Debt collection+O...|    5|
|      2|Mortgage+FHA mort...|   77|
|      1|Consumer Loan+Ins...|   14|
|      2|Mortgage+Home equ...|   12|
|      3|Credit card or pr...|    4|
|      2|Checking or savin...|    2|
|      3|Credit reporting,...|    4|
|      1|Money transfer, v...|   20|
|      0|Money transfer, v...|    6|
|      0|Consumer Loan+Veh...|   19|
|      1|Bank account or s...|    6|
|      2|Money transfer, v...|   81|
|      0|Debt collection+M...|   22|
|      0|Money transfer, v...|    9|
|      0|Credit reporting,...|    1|
|      3|Mortgage+FHA mort...|   73|
|      2|Money transfer, v...|    2|
|      2|Money transfer, v...|   22|
|      2|Student loan+Priv...|    1|
|      3|Money transfer, v...|    5|
+-------+--------------------+-----+
only showing top 20 rows

## N-Gramas

# FIN
---