In [1]:
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.pretrained import PretrainedPipeline
import sparknlp

In [2]:
import time 
t0 = time.time()

In [3]:
import pandas as pd

In [4]:
spark = sparknlp.start(gpu=True)

In [5]:
spark

# Explainer

Explain_document applies tokenization, lemmatization, part of speech, NER (Names-Entity Recognition) and embeddings.

In [6]:
# pipeline = PretrainedPipeline('explain_document_lg', lang='es')
# pipeline = PretrainedPipeline('explain_document_sm', lang='es') # Mejores resultados 

In [7]:
text = ["""\
Por otro lado, en 1926 Werner Heisenberg, Pascual Jordan y Max Born \
profundizaron en el estudio del problema del cuerpo negro: el comportamiento \
de la radiación electromagnética dentro de una cavidad, en ausencia de \
partículas cargadas. Esto constituyó el primer ejemplo de una teoría cuántica \
de campos, en este caso aplicando las reglas de cuantización al campo \
electromagnético. En sus resultados, la radiación se comportaba como un \
conjunto de partículas —los fotones—, en consonancia con la hipótesis de los \
cuantos de luz, formulada por Einstein en 1905.\
""",
"""\
Este es el segundo texto, creado por Juan Carlos Gonzalez para intentar \
probar el desempeno de la libreria Spark-NLP en la ejecucion de tareas \
de procesamiento de lenguage natural (NLP) en el mes de abril de 2020.\
"""]

In [8]:
# results = pipeline.annotate(text)

In [9]:
# lvl = 1

In [10]:
# print(list(results[lvl].keys()))

In [11]:
# results[lvl]['entities']

In [12]:
# list(zip(results[lvl]['token'] , results[lvl]['lemma'], results[lvl]['pos'], results[lvl]['ner']))[:]

# Part of Speech

In [13]:
# texto = """Este es el segndo texto, creado por Juan Carlos Gonzalez para intntar\
# probar el desempeno de la libreria Spark-NLP en la ejecucion de tareas \
# de procesamiento de lenguage natural (NLP) en el mes de abril de 2020. Para \
# efectos de entendmiento, esta es la segunda frase del texto.
# """

In [14]:
# text_df = spark.createDataFrame(pd.DataFrame({'text': text}))

In [15]:
# text_df.collect()

In [16]:
df = pd.read_csv('datasets/CorpusDemo.csv')
df = df[['CONTENT']].drop_duplicates().reset_index(drop=True)

In [17]:
# sample_text = df.groupby('CONTENT').count().reset_index()[['CONTENT']].iloc[3039]['CONTENT']

In [18]:
import re
def clean_text(text):
    text = re.sub('[#@][^\t\n\r\f\v\s]*', " ", text)
    text = re.sub('[^\w\d\:\/\.\-\_\,\(\)]', " ", text)
    text = re.sub('(http|www)[^\t\n\r\f\v\s]*', " ", text)
    text = re.sub('\s+', ' ', text)
    return text

In [19]:
text_df = spark.createDataFrame(df)
text_df.show(10)

+--------------------+
|             CONTENT|
+--------------------+
|Los proyectos #Fr...|
|Un pequeño pero c...|
|Colombia ha debid...|
|Es una Mierda #Fr...|
|"El planeta sufre...|
|🔶🔸Descargue aqu...|
|Pilotos de #frack...|
|#IMPORTANTE  Los ...|
|Los que creen que...|
|Interesante artíc...|
+--------------------+
only showing top 10 rows



In [20]:
from pyspark.sql import functions as F

udf_clean_text = F.udf(lambda x: clean_text(x))

In [21]:
text_df = text_df.select(udf_clean_text(F.col('CONTENT')).alias('CONTENT'))

### Custom lemmatizer

In [22]:
from es_lemmatizer import lemmatize
import spacy

nlp = spacy.load("es_core_news_sm")
nlp.add_pipe(lemmatize, after="tagger")


try:
    doc = nlp(''.join(text_df.select('CONTENT').rdd.flatMap(lambda x: x).collect()))
except:
    doc = nlp(text)

custom_lemm = {}
for token in doc:
    if token.lemma_ not in custom_lemm:
        custom_lemm[token.lemma_] = [str(token)]
    else:
        if str(token) not in custom_lemm[token.lemma_]:
            custom_lemm[token.lemma_].append(str(token))
        
        
keys = list(custom_lemm.keys())
vals = ['\t'.join(entry) for entry in list(custom_lemm.values())]

In [23]:
from pyspark.sql.types import *
from pyspark.sql.functions import regexp_extract
from pyspark.sql import functions as F

tr0 = time.time()

cSchema = StructType([StructField("text_token", StringType())])
token_df = spark.createDataFrame([[key] for key in keys], schema=cSchema)

es_CO = spark.read.csv('es_CO_level0.csv').withColumnRenamed('_c0', 'dic_token')

mispeled_tokens = token_df.join(es_CO, token_df['text_token'] == es_CO['dic_token'], how='left_anti')
mispeled_tokens.show()
mispeled_tokens = mispeled_tokens.withColumn("text_token",regexp_extract(mispeled_tokens['text_token'], '[^0-9|\W]+.+',0))\
    .filter(mispeled_tokens['text_token'].isNotNull())
mispeled_tokens = mispeled_tokens.filter(mispeled_tokens['text_token'] != '')
mispeled_tokens.show()

mispeled_tokens_x_esCO = es_CO.crossJoin(mispeled_tokens)
# print(mispeled_tokens_x_esCO.count())

levershtein_df = mispeled_tokens_x_esCO.withColumn("levenshtein", F.levenshtein(F.col("dic_token"), F.col("text_token"))).filter("levenshtein < 2")
res_filt = levershtein_df.groupby('text_token').count().filter("count == 1").select('text_token')
res_filt.show()
levershteinPandas = levershtein_df.join(res_filt, ['text_token'], how='right').select('text_token', 'dic_token')#.toPandas()
levershteinPandas.show()
spellChecker = levershteinPandas.rdd.map(lambda x: {x['text_token']: x['dic_token']}).collect()
spellChecker = {k:v for x in spellChecker for k,v in x.items()}
# spellChecker = spellChecker.collect()

# spellChecker = {x: y for x, y in zip(levershteinPandas['text_token'].tolist(), levershteinPandas['dic_token'].tolist())}
# print(spellChecker)

for num, key in enumerate(keys):
    if key in list(spellChecker.keys()):
        keys[num] = spellChecker[key]

# print(keys)
print(time.time() - tr0)

+-------------+
|   text_token|
+-------------+
|        de+el|
|     sogamoso|
|            ,|
|            2|
|interconectar|
|     colombia|
|            .|
|            6|
|            7|
|          ...|
|             |
|            :|
|          xxi|
|    ecopetrol|
|         2020|
|     facebook|
|       google|
|         eeuu|
|            (|
|  refinanciar|
+-------------+
only showing top 20 rows

+-----------------+
|       text_token|
+-----------------+
|            de+el|
|         sogamoso|
|    interconectar|
|         colombia|
|              xxi|
|        ecopetrol|
|         facebook|
|           google|
|             eeuu|
|      refinanciar|
|             a+el|
|         fracking|
|       les.olvida|
|         covid_19|
|               by|
|        jerobledo|
|             paul|
|          krugman|
|colombiaecopetrol|
|               pm|
+-----------------+
only showing top 20 rows

+--------------+
|    text_token|
+--------------+
|   polombianos|
|   defendiende|

In [24]:
custom_lemm_list = [f'{key}->{val}\n' for key, val in zip(keys, vals)]

In [25]:
# custom_lemm_list

In [26]:
with open('custom_lemma.txt', 'w') as file:
    file.writelines(custom_lemm_list)

### Pipeline 1-grams

In [27]:
documentAssembler = DocumentAssembler()\
    .setInputCol("CONTENT")\
    .setOutputCol('document')

In [28]:
# sentenceDetector = SentenceDetector()\
#     .setInputCols('document')\
#     .setOutputCol('sentece')

In [29]:
tokenizer = Tokenizer()\
    .setInputCols('document')\
    .setOutputCol('token')

In [30]:
lemmatizer = Lemmatizer()\
    .setInputCols('token')\
    .setOutputCol("lemma")\
    .setDictionary('custom_lemma.txt', '->', '\t')

In [31]:
# import nltk
# nltk.download('stopwords')

from nltk.corpus import stopwords

es_stopwords = stopwords.words('spanish')

In [32]:
stopwordsCleaner = StopWordsCleaner()\
    .setInputCols('lemma')\
    .setOutputCol('1-gram')\
    .setStopWords(es_stopwords)

In [33]:
nGrammer = NGramGenerator()\
    .setInputCols('1-gram')\
    .setOutputCol('n-grams')\
    .setN(3)\
    .setEnableCumulative(True)\
    .setDelimiter('_')

In [34]:
posTagger = PerceptronModel.pretrained("pos_ud_gsd", "es")\
    .setInputCols(['document', '1-gram'])\
    .setOutputCol('posTagger')

pos_ud_gsd download started this may take some time.
Approximate size to download 5.2 MB
[OK!]


In [35]:
finisher = Finisher()\
    .setInputCols(['1-gram', 'n-grams', 'posTagger'])

In [36]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
    .setStages([documentAssembler,
#                 sentenceDetector,
                tokenizer,
                lemmatizer,
                stopwordsCleaner,
                nGrammer,
                posTagger,
                finisher])

In [37]:
spark

In [38]:
processed_texts = pipeline.fit(text_df).transform(text_df)

In [39]:
processed_texts.show()

+--------------------+--------------------+--------------------+--------------------+
|             CONTENT|     finished_1-gram|    finished_n-grams|  finished_posTagger|
+--------------------+--------------------+--------------------+--------------------+
|Los proyectos se ...|[proyecto, ubicar...|[proyecto, ubicar...|[NOUN, VERB, NOUN...|
|Un pequeño pero c...|[pequeño, complet...|[pequeño, complet...|[ADJ, ADJ, NOUN, ...|
|Colombia ha debid...|[colombia, haber,...|[colombia, haber,...|[PROPN, AUX, VERB...|
|      Es una Mierda |            [mierda]|            [mierda]|             [PROPN]|
| El planeta sufre...|[planeta, sufrir,...|[planeta, sufrir,...|[NOUN, VERB, NOUN...|
| Descargue aquí e...|[descargue, aquí,...|[descargue, aquí,...|[VERB, ADV, NOUN,...|
|Pilotos de de Eco...|[piloto, ecopetro...|[piloto, ecopetro...|[NOUN, PROPN, AUX...|
| Los pilotos de d...|[piloto, ecopetro...|[piloto, ecopetro...|[NOUN, NOUN, AUX,...|
|Los que creen que...|[crear, va, quita...|[crear, va,

### Handling n-grams

In [40]:
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [41]:
# User defined function to join the list
udf_join_arr = F.udf(lambda x: ' '.join(x), T.StringType())

In [42]:
processed_texts = processed_texts.withColumn('finished_posTagger',  udf_join_arr(F.col('finished_posTagger')))

In [43]:
processed_texts.show()

+--------------------+--------------------+--------------------+--------------------+
|             CONTENT|     finished_1-gram|    finished_n-grams|  finished_posTagger|
+--------------------+--------------------+--------------------+--------------------+
|Los proyectos se ...|[proyecto, ubicar...|[proyecto, ubicar...|NOUN VERB NOUN AD...|
|Un pequeño pero c...|[pequeño, complet...|[pequeño, complet...|ADJ ADJ NOUN NOUN...|
|Colombia ha debid...|[colombia, haber,...|[colombia, haber,...|PROPN AUX VERB VE...|
|      Es una Mierda |            [mierda]|            [mierda]|               PROPN|
| El planeta sufre...|[planeta, sufrir,...|[planeta, sufrir,...|NOUN VERB NOUN AD...|
| Descargue aquí e...|[descargue, aquí,...|[descargue, aquí,...|VERB ADV NOUN ADJ...|
|Pilotos de de Eco...|[piloto, ecopetro...|[piloto, ecopetro...|NOUN PROPN AUX VE...|
| Los pilotos de d...|[piloto, ecopetro...|[piloto, ecopetro...|NOUN NOUN AUX VER...|
|Los que creen que...|[crear, va, quita...|[crear, va,

In [44]:
list([(x, y) for x,y in zip(processed_texts.select('finished_1-gram').toPandas()['finished_1-gram'].tolist()[0], processed_texts.select('finished_posTagger').toPandas()['finished_posTagger'].str.split(' ').tolist()[0])])[:4]

[('proyecto', 'NOUN'),
 ('ubicar', 'VERB'),
 ('desembocadura', 'NOUN'),
 ('de+el', 'ADJ')]

In [45]:
posDocumentAssembler = DocumentAssembler()\
    .setInputCol('finished_posTagger')\
    .setOutputCol('pos_document')

In [46]:
posTokenizer = Tokenizer()\
    .setInputCols('pos_document')\
    .setOutputCol('pos')

In [47]:
posNGrammer = NGramGenerator()\
    .setInputCols('pos')\
    .setOutputCol('pos_ngrams')\
    .setN(3)\
    .setEnableCumulative(True)\
    .setDelimiter('_')

In [48]:
posFinisher = Finisher()\
    .setInputCols(['pos', 'pos_ngrams'])

In [49]:
posPipeline = Pipeline()\
    .setStages([posDocumentAssembler,
                posTokenizer,
                posNGrammer,
                posFinisher])

In [50]:
processed_texts = posPipeline.fit(processed_texts).transform(processed_texts)

In [51]:
processed_texts.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             CONTENT|     finished_1-gram|    finished_n-grams|  finished_posTagger|        finished_pos| finished_pos_ngrams|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Los proyectos se ...|[proyecto, ubicar...|[proyecto, ubicar...|NOUN VERB NOUN AD...|[NOUN, VERB, NOUN...|[NOUN, VERB, NOUN...|
|Un pequeño pero c...|[pequeño, complet...|[pequeño, complet...|ADJ ADJ NOUN NOUN...|[ADJ, ADJ, NOUN, ...|[ADJ, ADJ, NOUN, ...|
|Colombia ha debid...|[colombia, haber,...|[colombia, haber,...|PROPN AUX VERB VE...|[PROPN, AUX, VERB...|[PROPN, AUX, VERB...|
|      Es una Mierda |            [mierda]|            [mierda]|               PROPN|             [PROPN]|             [PROPN]|
| El planeta sufre...|[planeta, sufrir,...|[planeta, sufrir,...|NOUN VERB NOUN AD...|[NOUN, VERB, NOUN..

### Filtering POSTags

These tags mark the core part-of-speech categories.

__Alphabetical listing__:
- ADJ: adjective (noun modifiers)
- ADP: adposition (preposiciones y postposiciones, e.g., in, to, during)
- ADV: adverb (verb -sometines also adjective- modifiers)
- AUX: auxiliary 
- CCONJ: coorinating conjuction (links words without subordination)
- DET: determiner
- INTJ: interjection
- NOUN: noun
- NUM: numeral
- PART: particle
- PRON: pronoun
- PROPN: proper noun
- PUNCT: punctuation
- SCONJ: subordinating conjunction
- SYM: symbol
- VERB: verb
- X: other

Reference: https://universaldependencies.org/u/pos/

#### Filtering 1-grams

In [52]:
allowed_cats = ['NUM', 'ADJ', 'NOUN', 'PROPN', 'VERB', 'ADV', 'X']

def filter_pos(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags) if pos in allowed_cats]

udf_filter_pos = F.udf(filter_pos, T.ArrayType(T.StringType()))

In [53]:
processed_texts = processed_texts.withColumn('filtered_1-gram', udf_filter_pos(F.col('finished_1-gram'), F.col('finished_pos')))

In [54]:
processed_texts.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             CONTENT|     finished_1-gram|    finished_n-grams|  finished_posTagger|        finished_pos| finished_pos_ngrams|     filtered_1-gram|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Los proyectos se ...|[proyecto, ubicar...|[proyecto, ubicar...|NOUN VERB NOUN AD...|[NOUN, VERB, NOUN...|[NOUN, VERB, NOUN...|[proyecto, ubicar...|
|Un pequeño pero c...|[pequeño, complet...|[pequeño, complet...|ADJ ADJ NOUN NOUN...|[ADJ, ADJ, NOUN, ...|[ADJ, ADJ, NOUN, ...|[pequeño, complet...|
|Colombia ha debid...|[colombia, haber,...|[colombia, haber,...|PROPN AUX VERB VE...|[PROPN, AUX, VERB...|[PROPN, AUX, VERB...|[colombia, debido...|
|      Es una Mierda |            [mierda]|            [mierda]|               PROPN|             [PROPN]|

#### Filtering n-grams

In [55]:
# Add punctuation

filter_3 = ['NUM_ADV_VERB', 'NUM_ADV_ADJ', 'NUM_ADJ_NOUN', 'NUM_NOUN_VERB', 'NUM_NOUN_ADJ',\
            'PROPN_PROPN_PROPN', 'PROPN_VERB_PROPN', 'PROPN_VERB_NOUN', 'PROPN_VERB_ADV', 'PROPN_ADJ_VERB', 'PROPN_ADV_ADJ', 'PROPN_ADV_VERB',\
            'NOUN_PROPN_PROPN', 'NOUM_VERB_NOUN', 'NOUN_VERB_PROPN', 'NOUN_PROPN_VERB', 'NOUM_VERB_NUM', 'NOUN_NUM_VERB', 'NOUN_VERB_ADV',\
            'VERB_ADJ_PROPN', 'VERB_ADJ_NOUN', 'VERB_PROPN_PROPN', 'VERB_NOUN_NOUN', 'VERB_NOUN_PROPN', 'VERB_NOUN_ADJ', 'VERB_PROPN_NOUN', 'VERB_NUM_NOUN', 'VERB_ADV_ADV',\
            'ADJ_NOUN_VERB', 'ADJ_PROPN_VERB', 'ADJ_PROPN_PROPN', 'ADJ_NOUN_PROPN', 'ADJ_PROPN_NOUN', 'ADJ_NOUN_NOUN', 'ADJ_NOUN_ADJ', 'ADJ_PROPN_ADJ', 'ADJ_VERB_NOUN',\
            'ADV_VERB_PROPN', 'ADV_VERB_NOUN']
filter_2 = ['PROPN_PROPN', 'PROPN_NOUN', 'PROPN_ADJ', 'PROPN_VERB',\
            'NOUN_NOUN', 'NOUN_PROPN', 'NOUN_VERB', 'NOUN_ADJ',\
            'NUM_NOUN',\
            'ADJ_NOUN', 'ADJ_PROPN',\
            'ADV_VERB', 'ADV_ADJ',\
            'VERB_NOUN', 'VERB_PROPN', 'VERB_ADV', 'VERB_ADJ']

def filter_pos_ngrams(words, pos_tags):
    return [word for word, pos in zip(words, pos_tags)\
            if (len(pos.split('_')) == 3\
                and\
               pos in filter_3)\
           or (len(pos.split('_')) == 2\
              and\
              pos in filter_2)]

udf_filter_pos_combs = F.udf(filter_pos_ngrams, T.ArrayType(T.StringType()))

In [56]:
processed_texts = processed_texts.withColumn('filtered_ngrams',udf_filter_pos_combs(F.col('finished_n-grams'), F.col('finished_pos_ngrams')))
processed_texts.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             CONTENT|     finished_1-gram|    finished_n-grams|  finished_posTagger|        finished_pos| finished_pos_ngrams|     filtered_1-gram|     filtered_ngrams|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Los proyectos se ...|[proyecto, ubicar...|[proyecto, ubicar...|NOUN VERB NOUN AD...|[NOUN, VERB, NOUN...|[NOUN, VERB, NOUN...|[proyecto, ubicar...|[proyecto_ubicar,...|
|Un pequeño pero c...|[pequeño, complet...|[pequeño, complet...|ADJ ADJ NOUN NOUN...|[ADJ, ADJ, NOUN, ...|[ADJ, ADJ, NOUN, ...|[pequeño, complet...|[completo_análisi...|
|Colombia ha debid...|[colombia, haber,...|[colombia, haber,...|PROPN AUX VERB VE...|[PROPN, AUX, VERB...|[PROPN, AUX, VERB...|[colombia, debido...|[7

In [57]:
list([(x, y) for x,y in zip(processed_texts.select('finished_1-gram').toPandas()['finished_1-gram'].tolist()[0], processed_texts.select('finished_posTagger').toPandas()['finished_posTagger'].str.split(' ').tolist()[0])])

[('proyecto', 'NOUN'),
 ('ubicar', 'VERB'),
 ('desembocadura', 'NOUN'),
 ('de+el', 'ADJ'),
 ('río', 'NOUN'),
 ('Sogamoso', 'PROPN'),
 (',', 'PUNCT'),
 ('2', 'NUM'),
 ('recurso', 'NOUN'),
 ('importante', 'ADJ'),
 ('tener', 'VERB'),
 ('zona', 'NOUN'),
 ('humedal', 'ADJ'),
 ('caño', 'ADJ'),
 ('interconectar', 'VERB'),
 ('ríos', 'PROPN'),
 ('ciénaga', 'PROPN'),
 ('aun', 'SCONJ'),
 ('habitar', 'VERB'),
 ('manatí', 'ADV')]

In [58]:
# processed_texts.select('filtered_ngrams').toPandas().filtered_ngrams.tolist()

# Joiner and Vectorizer

In [59]:
from pyspark.sql.functions import concat

processed_texts = processed_texts.withColumn('final', concat(F.col('filtered_1-gram'), F.col('filtered_ngrams')))

In [60]:
# TF: Term Frequency

from pyspark.ml.feature import CountVectorizer

tfizer = CountVectorizer(inputCol='final',\
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_texts)
tf_result = tf_model.transform(processed_texts)

In [61]:
# tf_result.select('tf_features').toPandas().tf_features.tolist()

In [62]:
# IDF: Inverse Document Frequency

from pyspark.ml.feature import IDF

idfizer = IDF(inputCol='tf_features',\
              outputCol='tf_idf_features')
tfidf_result = idfizer.fit(tf_result).transform(tf_result)

In [63]:
# tfidf_result.select('tf_idf_features').toPandas().tf_idf_features.tolist()

# LDA

In [64]:
from pyspark.ml.clustering import LDA

num_topics = 6
max_iter = 10

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features', seed=24)
lda_model = lda.fit(tfidf_result)

In [65]:
vocab = tf_model.vocabulary

def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]

udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

In [66]:
num_top_words = 20


topics = lda_model.describeTopics(num_top_words). withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show()

+-----+--------------------+
|topic|          topicWords|
+-----+--------------------+
|    0|[de+el, favor, ha...|
|    1|[saber, páramo, m...|
|    2|[de+el, a+el, soc...|
|    3|[1, piloto, frack...|
|    4|[fracking, colomb...|
|    5|[de+el, a+el, ten...|
+-----+--------------------+



In [67]:
topics.toPandas().iloc[0]['topicWords']

['de+el',
 'favor',
 'hacer',
 'votar',
 'decir',
 'petróleo',
 'favor_de+el',
 'a+el',
 'de+el_petróleo',
 'contar',
 'mas',
 'nunca',
 'dos',
 'representante',
 'bueno',
 'día',
 'país',
 'actividad',
 'votar_favor',
 'matar']

# Stop Spark Context

In [68]:
spark.stop()

In [69]:
time.time()- t0

363.2160909175873