# Taller 3

**Autor:** Juan Pablo Gaviria

## Deteccion de Topicos

Se realizará la detección de Topicos con pySpark y LDA

### Configuración Spark

In [25]:
# Importacion de dependencias
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml.feature import Tokenizer,CountVectorizer,IDF
from pyspark.sql.functions import *
from pyspark.sql.types import *
import re
from pyspark.ml.feature import StopWordsRemover
import nltk
from nltk.stem import LancasterStemmer
from nltk.stem import WordNetLemmatizer
import numpy as np
import pyLDAvis
from pyspark.ml.clustering import LDA

In [2]:
# Creación de session en pyspark
sc = SparkContext('local','Covid-Topics-Detection')
spark = SparkSession(sc)

In [3]:
# Lectura de datos
initialDataSet = spark.read.csv('/home/jgaviria/Workspace/DataScience/datasets/metadata.csv', inferSchema=True, header=True)
initialDataSet.show(5)

+--------+--------------------+--------+--------------------+--------------------+--------+---------+-------+--------------------+------------+--------------------+--------------+------+----------------+--------+--------------------+--------------------+--------------------+-----+
|cord_uid|                 sha|source_x|               title|                 doi|   pmcid|pubmed_id|license|            abstract|publish_time|             authors|       journal|mag_id|who_covidence_id|arxiv_id|      pdf_json_files|      pmc_json_files|                 url|s2_id|
+--------+--------------------+--------+--------------------+--------------------+--------+---------+-------+--------------------+------------+--------------------+--------------+------+----------------+--------+--------------------+--------------------+--------------------+-----+
|ug7v899j|d1aafb70c066a2068...|     PMC|Clinical features...|10.1186/1471-2334...|PMC35282| 11472636|  no-cc|OBJECTIVE: This r...|  2001-07-04|Madani, Tar

### Preparación de Texto

In [4]:
# Se eliminan las columnas que no se utilizaran
dataSet = initialDataSet.drop('sha','doi','pmcid','pubmed_id','license','mag_id','who_covidence_id','arxiv_id','pdf_json_files','pmc_json_files','url','s2_id')
dataSet.show(10)
dataSet.count()

+--------+--------+--------------------+--------------------+------------+--------------------+----------------+
|cord_uid|source_x|               title|            abstract|publish_time|             authors|         journal|
+--------+--------+--------------------+--------------------+------------+--------------------+----------------+
|ug7v899j|     PMC|Clinical features...|OBJECTIVE: This r...|  2001-07-04|Madani, Tariq A; ...|  BMC Infect Dis|
|02tnwd4m|     PMC|Nitric oxide: a p...|Inflammatory dise...|  2000-08-15|Vliet, Albert van...|      Respir Res|
|ejv2xln0|     PMC|Surfactant protei...|Surfactant protei...|  2000-08-25|     Crouch, Erika C|      Respir Res|
|2b73a28n|     PMC|Role of endotheli...|Endothelin-1 (ET-...|  2001-02-22|Fagan, Karen A; M...|      Respir Res|
|9785vg6d|     PMC|Gene expression i...|Respiratory syncy...|  2001-05-11|Domachowske, Jose...|      Respir Res|
|zjufx4fo|     PMC|Sequence requirem...|Nidovirus subgeno...|  2001-12-17|Pasternak, Alexan...|T

128492

In [5]:
# Se crea un dataframe unicamente con el id y el texto que se utilizara para la mineria
dataSet.registerTempTable('df')
text = spark.sql("select cord_uid, concat(title,' ',abstract) texto from df")
text.show(5)
print('Numero de documentos: ' + str(text.count()))

+--------+--------------------+
|cord_uid|               texto|
+--------+--------------------+
|ug7v899j|Clinical features...|
|02tnwd4m|Nitric oxide: a p...|
|ejv2xln0|Surfactant protei...|
|2b73a28n|Role of endotheli...|
|9785vg6d|Gene expression i...|
+--------+--------------------+
only showing top 5 rows

Numero de documentos: 128492


In [6]:
# Se remueven las filas nulas
text = text.na.drop()
text.show(5)
print('Numero de documentos: '+str(text.count()))

+--------+--------------------+
|cord_uid|               texto|
+--------+--------------------+
|ug7v899j|Clinical features...|
|02tnwd4m|Nitric oxide: a p...|
|ejv2xln0|Surfactant protei...|
|2b73a28n|Role of endotheli...|
|9785vg6d|Gene expression i...|
+--------+--------------------+
only showing top 5 rows

Numero de documentos: 101638


In [7]:
def arraySize(lista):
    return len(lista)

In [8]:
# Se tokeniza la columna de texto
tokenizerObj = Tokenizer(inputCol='texto', outputCol='tokens')
text = tokenizerObj.transform(text)
text.show(5)

+--------+--------------------+--------------------+
|cord_uid|               texto|              tokens|
+--------+--------------------+--------------------+
|ug7v899j|Clinical features...|[clinical, featur...|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide:, ...|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|
|2b73a28n|Role of endotheli...|[role, of, endoth...|
|9785vg6d|Gene expression i...|[gene, expression...|
+--------+--------------------+--------------------+
only showing top 5 rows



In [9]:
# Se agrega columna con el numero total de tokens
udfArraySize = udf(arraySize, IntegerType())
text = text.withColumn('numero_tokens', udfArraySize(col('tokens')))
text.show(20)

+--------+--------------------+--------------------+-------------+
|cord_uid|               texto|              tokens|numero_tokens|
+--------+--------------------+--------------------+-------------+
|ug7v899j|Clinical features...|[clinical, featur...|          277|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide:, ...|          150|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|          225|
|2b73a28n|Role of endotheli...|[role, of, endoth...|           74|
|9785vg6d|Gene expression i...|[gene, expression...|          120|
|zjufx4fo|Sequence requirem...|[sequence, requir...|          186|
|ymceytj3|Crystal structure...|[crystal, structu...|          127|
|wzj2glte|Synthesis of a no...|[synthesis, of, a...|          161|
|2sfqsfm1|Structure of coro...|[structure, of, c...|          188|
|i0zym7iq|Discontinuous and...|[discontinuous, a...|          183|
|5yhe786e|Debate: Transfusi...|[debate:, transfu...|          123|
|8zchiykl|The 21st Internat...|[the, 21st, inter...|          

In [10]:
# Eliminar caracteres no alfabeticos
def noAlphaRemove_udf(x):
    newTokens = [re.sub(r'[^A-Za-z]+','',w) for w in x]
    newTokens = [w.lower() for w in newTokens if len(w) > 1]
    return newTokens
noAlphaRemove = udf(lambda s: noAlphaRemove_udf(s), ArrayType(StringType()))

text = text.withColumn('tokens', noAlphaRemove(col('tokens')))
text = text.withColumn('numero_tokens', udfArraySize(col('tokens')))
text.show(10)

+--------+--------------------+--------------------+-------------+
|cord_uid|               texto|              tokens|numero_tokens|
+--------+--------------------+--------------------+-------------+
|ug7v899j|Clinical features...|[clinical, featur...|          235|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide, p...|          149|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|          222|
|2b73a28n|Role of endotheli...|[role, of, endoth...|           70|
|9785vg6d|Gene expression i...|[gene, expression...|          119|
|zjufx4fo|Sequence requirem...|[sequence, requir...|          180|
|ymceytj3|Crystal structure...|[crystal, structu...|          122|
|wzj2glte|Synthesis of a no...|[synthesis, of, n...|          147|
|2sfqsfm1|Structure of coro...|[structure, of, c...|          175|
|i0zym7iq|Discontinuous and...|[discontinuous, a...|          169|
+--------+--------------------+--------------------+-------------+
only showing top 10 rows



In [11]:
# Eliminar stopWords
stopWordRemover=StopWordsRemover(inputCol='tokens',outputCol='noStopTokens')
text = stopWordRemover.transform(text)
text = text.drop(col('numero_tokens'))
text = text.withColumn('numero_tokens', udfArraySize(col('noStopTokens')))
text.show(20)

+--------+--------------------+--------------------+--------------------+-------------+
|cord_uid|               texto|              tokens|        noStopTokens|numero_tokens|
+--------+--------------------+--------------------+--------------------+-------------+
|ug7v899j|Clinical features...|[clinical, featur...|[clinical, featur...|          139|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide, p...|[nitric, oxide, p...|           84|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|[surfactant, prot...|          142|
|2b73a28n|Role of endotheli...|[role, of, endoth...|[role, endothelin...|           44|
|9785vg6d|Gene expression i...|[gene, expression...|[gene, expression...|           81|
|zjufx4fo|Sequence requirem...|[sequence, requir...|[sequence, requir...|          122|
|ymceytj3|Crystal structure...|[crystal, structu...|[crystal, structu...|           81|
|wzj2glte|Synthesis of a no...|[synthesis, of, n...|[synthesis, novel...|           90|
|2sfqsfm1|Structure of coro...|[

In [12]:
# Stemming
stemmer = LancasterStemmer()
def stemming_udf(x):
    newTokens = [stemmer.stem(w) for w in x]
    return newTokens
stemmingFunction = udf(lambda s: stemming_udf(s), ArrayType(StringType()))

text = text.withColumn('StemmTokens', stemmingFunction(col('noStopTokens')))
text = text.drop(col('numero_tokens'))
text = text.withColumn('numero_tokens', udfArraySize(col('StemmTokens')))
text.show(10)

+--------+--------------------+--------------------+--------------------+--------------------+-------------+
|cord_uid|               texto|              tokens|        noStopTokens|         StemmTokens|numero_tokens|
+--------+--------------------+--------------------+--------------------+--------------------+-------------+
|ug7v899j|Clinical features...|[clinical, featur...|[clinical, featur...|[clin, feat, cult...|          139|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide, p...|[nitric, oxide, p...|[nit, oxid, proin...|           84|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|[surfactant, prot...|[surfact, protein...|          142|
|2b73a28n|Role of endotheli...|[role, of, endoth...|[role, endothelin...|[rol, endothelin,...|           44|
|9785vg6d|Gene expression i...|[gene, expression...|[gene, expression...|[gen, express, ep...|           81|
|zjufx4fo|Sequence requirem...|[sequence, requir...|[sequence, requir...|[sequ, requir, rn...|          122|
|ymceytj3|Crystal s

In [13]:
# Lemma
lematizer = WordNetLemmatizer()
def lematizer_udf(x):
    newTokens = [lematizer.lemmatize(w) for w in x]
    return newTokens
lematizerFunction = udf(lambda s: lematizer_udf(s), ArrayType(StringType()))

text = text.withColumn('LemaTokens', lematizerFunction(col('StemmTokens')))
text = text.drop(col('numero_tokens'))
text = text.withColumn('numero_tokens', udfArraySize(col('LemaTokens')))
text.show(10)

+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|cord_uid|               texto|              tokens|        noStopTokens|         StemmTokens|          LemaTokens|numero_tokens|
+--------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------+
|ug7v899j|Clinical features...|[clinical, featur...|[clinical, featur...|[clin, feat, cult...|[clin, feat, cult...|          139|
|02tnwd4m|Nitric oxide: a p...|[nitric, oxide, p...|[nitric, oxide, p...|[nit, oxid, proin...|[nit, oxid, proin...|           84|
|ejv2xln0|Surfactant protei...|[surfactant, prot...|[surfactant, prot...|[surfact, protein...|[surfact, protein...|          142|
|2b73a28n|Role of endotheli...|[role, of, endoth...|[role, endothelin...|[rol, endothelin,...|[rol, endothelin,...|           44|
|9785vg6d|Gene expression i...|[gene, expression...|[gene, expression...|[gen, express, ep

In [14]:
# TF Vector
cv = CountVectorizer(inputCol="LemaTokens", outputCol="rawFeatures", vocabSize = 1000)
cvmodel = cv.fit(text)
featurizedData = cvmodel.transform(text)

In [15]:
# Vocabulary BoW
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)
print('BoW: ' + len(vocab))

In [22]:
# IDF
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

In [26]:
# LDA
lda = LDA(k=25, seed=123, optimizer="em", featuresCol="features")
ldamodel = lda.fit(rescaledData)
ldatopics = ldamodel.describeTopics()

In [28]:
ldaResults = ldamodel.transform(rescaledData)

In [27]:
import datetime
print(datetime.datetime.now())

2020-05-22 01:20:34.568225
