## Implementación del taller 3 Modelo de clasificación de Papers utilizando PySpark

Aplicación del taller número 3 del curso ST1800-Almacenamiento y recuperación de la información

#### Equipo:

1. Juan Carlos Agudelo Acevedo
2. Daian Fajardo Becerra
3. Hernan Sepulveda Jimenez

Profesor: Edwin Nelson Montoya Munera

In [None]:
#FUNCIONES LECTURA JSON

In [2]:
from py4j.java_gateway import *
port = launch_gateway()
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=port),
callback_server_parameters=CallbackServerParameters(port=0))
random = gateway.jvm.java.util.Random()

In [3]:
import os
import json
from pprint import pprint
from copy import deepcopy
import numpy as np
import pandas as pd
from tqdm.notebook import tqdm
import re

def format_name(author):
    middle_name = " ".join(author['middle'])
    
    if author['middle']:
        return " ".join([author['first'], middle_name, author['last']])
    else:
        return " ".join([author['first'], author['last']])


def format_affiliation(affiliation):
    text = []
    location = affiliation.get('location')
    if location:
        text.extend(list(affiliation['location'].values()))
    
    institution = affiliation.get('institution')
    if institution:
        text = [institution] + text
    return ", ".join(text)

def format_authors(authors, with_affiliation=False):
    name_ls = []
    
    for author in authors:
        name = format_name(author)
        if with_affiliation:
            affiliation = format_affiliation(author['affiliation'])
            if affiliation:
                name_ls.append(f"{name} ({affiliation})")
            else:
                name_ls.append(name)
        else:
            name_ls.append(name)
    
    return ", ".join(name_ls)


def format_body(body_text):
    texts = [(di['section'], di['text']) for di in body_text]
    texts_di = {di['section']: "" for di in body_text}
    
    for section, text in texts:
        texts_di[section] += text

    body = ""

    for section, text in texts_di.items():
        body += section
        body += "\n\n"
        body += text
        body += "\n\n"
    
    return body

def format_bib(bibs):
    if type(bibs) == dict:
        bibs = list(bibs.values())
    bibs = deepcopy(bibs)
    formatted = []
    
    for bib in bibs:
        bib['authors'] = format_authors(
            bib['authors'], 
            with_affiliation=False
        )
        formatted_ls = [str(bib[k]) for k in ['title', 'authors', 'venue', 'year']]
        formatted.append(", ".join(formatted_ls))

        
    return "; ".join(formatted)


def load_files(dirname):
    filenames = os.listdir(dirname)
    raw_files = []

    for filename in tqdm(filenames):
        filename = dirname + filename
        file = json.load(open(filename, 'rb'))
        raw_files.append(file)
    
    return raw_files


def generate_clean_df(all_files):
    cleaned_files = []
    
    for file in tqdm(all_files):
        features = [
            file['paper_id'],
            file['metadata']['title'],
            format_authors(file['metadata']['authors']),
            format_authors(file['metadata']['authors'], 
                           with_affiliation=True),
            format_body(file['abstract']),
            format_body(file['body_text']),
            format_bib(file['bib_entries']),
            file['metadata']['authors'],
            file['bib_entries']
        ]

        cleaned_files.append(features)

    col_names = ['paper_id', 'title', 'authors',
                 'affiliations', 'abstract', 'text', 
                 'bibliography','raw_authors','raw_bibliography']

    clean_df = pd.DataFrame(cleaned_files, columns=col_names)
    clean_df = clean_df.drop(['raw_authors', 'raw_bibliography'], axis=1)
    clean_df.head()
    
    return clean_df

In [4]:
## Path y lectura de archivos

pmc_dir = 'C:/Users/LENOVO/Documents/Maestría/Semestre I/Almacenamiento y Recuperacion de Informacion/Taller3/Script/jason_prueba2/'
pmc_files = load_files(pmc_dir)
pmc_df = generate_clean_df(pmc_files)

HBox(children=(FloatProgress(value=0.0, max=1478.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=1478.0), HTML(value='')))




In [5]:
# Creamos la columna de etiqueta
pmc_df.insert(7,'tags',"")

#### Asignación de etiquetas para los papers por método de frecuencias.

Basandonos en un analisis mixto entre palabras obtenidas del LDA, kernels de Kaggle y revisión de algunos Papers destacados, creamos un listado de posibles categorias que aparecen en gran medida en todos los papers y que podrian marcar una diferencia a la hora de buscar un paper de interés.
Tomando las etiquetas Transmisión, Incubacion, Riesgo, Infecciones, Origen, Evolución, Vacunas y Supervisión procederemos a elegir las mas representativas según la frecuencia en los papers.

In [6]:
# Etiquetas seleccionadas a partir del analisis de los papers
possible_tags=['transmission','incubation','risk','infections','origin','evolution','vaccines','surveillance']

In [7]:
# Funcion para asignar la etiqueta con mayor frecuencia por cada paper
for i, row in tqdm(pmc_df.iterrows()):
    tag={}
    for i in possible_tags:
        tag[i]=0
        #print(i)
    for word in possible_tags:
        if word in row["text"].lower():
            tag[word]=sum(1 for _ in re.finditer(r'\b%s\b' % re.escape(word), row["text"],re.IGNORECASE))
    tag={k: v for k, v in sorted(tag.items(), key=lambda item: item[1],reverse = True)}
    tag={k: tag[k] for k in list(tag)[:1]}
    tags=""
    for i in tag.items():
        if i[1]!=0:
            tags+=i[0]+", "
    tags=tags[:-2]
    row['tags']=tags   

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




In [8]:
pmc_df.head()

Unnamed: 0,paper_id,title,authors,affiliations,abstract,text,bibliography,tags
0,000b7d1517ceebb34e1e3e817695b6de03e2fa78,Supplementary Information An eco-epidemiologic...,"Julien Mélade, Nicolas Wieseke 4#, Beza Ramazi...","Julien Mélade (2 rue Maxime Rivière, 97490 Sai...",,\n\n- Figure S1 : Phylogeny of all sequences b...,"NDV/HQ266603/Chicken/1992, , , None; MuV/FJ375...",evolution
1,002e1edfb72cdb4ac11608f785221825d911de83,Profile of Antibodies to the Nucleocapsid Prot...,"Xuan Liu, Yulin Shi, Ping Li, Linhai Li, Yanpi...",Xuan Liu (General Hospital of Guangzhou Comman...,Abstract\n\nProfiles of antibodies to the nucl...,\n\nThe novel severe acute respiratory syndrom...,Identification of a novel coronavirus in patie...,
2,004f0f8bb66cf446678dc13cf2701feec4f36d76,Healthcare-resource-adjusted vulnerabilities t...,"Hanchu Zhou, Jiannan Yang, Kaicheng Tang, † , ...","Hanchu Zhou (City University of Hong Kong, Hon...",,Introduction\n\nThe 2019-nCoV epidemic has spr...,World Health Organizations. Novel Coronavirus ...,incubation
3,0092846a890dd9dfd5e55d0731eee0175e485c08,Supplementary appendix,,,,"\n\n-Secondary efficacy parameters, by days of...",Design and performance of the CDC real-time re...,
4,00af80743cef9bd8c04c532d74e9c67f0c9312e4,S2 Appendix: ERGM Alternating Graph Statistics,,,,"\n\nfor fixed constant weights λ i ≥ 1, (often...",connectivity and degree distributions: Exponen...,


In [9]:
# Funcion para obtener los papers que contienen exclusivamente alguna etiqueta de las anterior mencionadas

def find_papers_from_tags(df,tags):  
    new_df=pd.DataFrame(columns=["paper_id","title","authors","affiliations","abstract","text","bibliography","tags"])
    tags=tags.split(",")
    for i, row in tqdm(df.iterrows()):
        for tag in tags:
            if tag in row['tags']:
                new_df=new_df.append(row)
                break
    return new_df

#### Conclusión de asignación de etiquetas

Al comprobar las frecuencias, vemos que las etiquetas Transmisión, Incubación, Riesgo, Infeccion y Vacunas son las más representativas, cabe destacar que 5 categorias para un clasificador multinomial basado en texto representa un gran reto que decidimos tomar pues las etiquetas se solapan en gran medida haciendo asi más dificil el ejercicio posterior de clasificación, este problema puede presentarse en gran medida en la mayoria de maneras de generar las etiquetas.

In [10]:
# Papers Finales
new_df=find_papers_from_tags(pmc_df,"transmission,incubation,risk,infections,vaccines")

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))




In [11]:
# Proporcion de etiquetas en el data frame final
new_df['tags'].value_counts(normalize=True) * 100

risk            47.090663
transmission    27.198917
infections      17.185386
incubation       4.871448
vaccines         3.653586
Name: tags, dtype: float64

In [None]:
# Preprocesamiento y Modelo en PySpark

In [12]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.session import SparkSession
import re

# Carga de funciones escenciales, sc y spark en la sesión 
sc = SparkContext()
spark = SparkSession(sc)
sql = SQLContext(sc)

# Convierto el dataframe pandas a dataframe Spark (rdd)
spark_df = sql.createDataFrame(new_df)
spark_df.printSchema()

root
 |-- paper_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- affiliations: string (nullable = true)
 |-- abstract: string (nullable = true)
 |-- text: string (nullable = true)
 |-- bibliography: string (nullable = true)
 |-- tags: string (nullable = true)



#### Etapa de Tokenization

In [13]:
## Tokenizacion
from pyspark.ml.feature import Tokenizer
tokenization=Tokenizer(inputCol='text',outputCol='tokens')
tokenized_df=tokenization.transform(spark_df)
#tokenized_df.select('tokens').show(1,False)
tokenized_df.select(['paper_id','tags','tokens']).show()

+--------------------+------------+--------------------+
|            paper_id|        tags|              tokens|
+--------------------+------------+--------------------+
|004f0f8bb66cf4466...|  incubation|[introduction, , ...|
|00c75478b9f6b815f...|        risk|[dear, editor:, ,...|
|00fddd1ce0dae8535...|        risk|[, , i, n, the, i...|
|01686b614a614913b...|        risk|[aerosols, , sali...|
|01a5049f7f6965eac...|transmission|[dear, editor, , ...|
|022fbc7bea2e25340...|        risk|[correspondence, ...|
|023782486e5eef9d9...|  infections|[robert, walgate,...|
|0244a8c0153dcbe7d...|transmission|[, , east, respir...|
|024b30561568979f5...|  infections|[introduction, , ...|
|028a4948d8e10f9ec...|        risk|[jaad, online:, n...|
|02c2c01e1908658a0...|        risk|[contents, lists,...|
|03345c814dbe72221...|transmission|[sir, model, deta...|
|0401e2a525cc6eeb5...|        risk|[, , authors, hav...|
|044d1e54d0a62dcd6...|        risk|[contents, lists,...|
|04bf954dd55e2ee7e...|    vacci

#### Remoción de Stopwords  

In [14]:
## Remoción de Stopwords
from pyspark.ml.feature import StopWordsRemover
stopword_removal=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df=stopword_removal.transform(tokenized_df)
#refined_df2 = refined_df.select(['tokens','refined_tokens'])
refined_df2 = refined_df.select(['paper_id','tags','refined_tokens'])
#refined_df2.printSchema()
refined_df2.show()

+--------------------+------------+--------------------+
|            paper_id|        tags|      refined_tokens|
+--------------------+------------+--------------------+
|004f0f8bb66cf4466...|  incubation|[introduction, , ...|
|00c75478b9f6b815f...|        risk|[dear, editor:, ,...|
|00fddd1ce0dae8535...|        risk|[, , n, islamic, ...|
|01686b614a614913b...|        risk|[aerosols, , sali...|
|01a5049f7f6965eac...|transmission|[dear, editor, , ...|
|022fbc7bea2e25340...|        risk|[correspondence, ...|
|023782486e5eef9d9...|  infections|[robert, walgate,...|
|0244a8c0153dcbe7d...|transmission|[, , east, respir...|
|024b30561568979f5...|  infections|[introduction, , ...|
|028a4948d8e10f9ec...|        risk|[jaad, online:, n...|
|02c2c01e1908658a0...|        risk|[contents, lists,...|
|03345c814dbe72221...|transmission|[sir, model, deta...|
|0401e2a525cc6eeb5...|        risk|[, , authors, not...|
|044d1e54d0a62dcd6...|        risk|[contents, lists,...|
|04bf954dd55e2ee7e...|    vacci

#### Etapa de Lematización

In [15]:
# Lematizacion
import nltk
from nltk.stem import WordNetLemmatizer
wordnet_lemmatizer = WordNetLemmatizer()
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType
from pyspark.sql.functions import *

lemma_udf = udf(lambda row: [wordnet_lemmatizer.lemmatize(w) for w in row], ArrayType(StringType()))

lemma_df = refined_df2.withColumn('tokens_lemma', lemma_udf(col('refined_tokens')))
refined_df3 = lemma_df.select(['paper_id','tags','refined_tokens','tokens_lemma'])
#refined_df3.select(['tokens_lemma']).show(1,False)
refined_df3.show()

+--------------------+------------+--------------------+--------------------+
|            paper_id|        tags|      refined_tokens|        tokens_lemma|
+--------------------+------------+--------------------+--------------------+
|004f0f8bb66cf4466...|  incubation|[introduction, , ...|[introduction, , ...|
|00c75478b9f6b815f...|        risk|[dear, editor:, ,...|[dear, editor:, ,...|
|00fddd1ce0dae8535...|        risk|[, , n, islamic, ...|[, , n, islamic, ...|
|01686b614a614913b...|        risk|[aerosols, , sali...|[aerosol, , saliv...|
|01a5049f7f6965eac...|transmission|[dear, editor, , ...|[dear, editor, , ...|
|022fbc7bea2e25340...|        risk|[correspondence, ...|[correspondence, ...|
|023782486e5eef9d9...|  infections|[robert, walgate,...|[robert, walgate,...|
|0244a8c0153dcbe7d...|transmission|[, , east, respir...|[, , east, respir...|
|024b30561568979f5...|  infections|[introduction, , ...|[introduction, , ...|
|028a4948d8e10f9ec...|        risk|[jaad, online:, n...|[jaad, o

#### Remoción de caracteres especiales (no alfanumericos) y palabras con longitud menos a 3 caracteres

In [16]:
# Remoción de tokens con menos de 3 caracteres
cleaning_udf = udf(lambda row: [w for w in row in len(w)>2], ArrayType(StringType()))

clean_df = refined_df3.withColumn('cleaned_tokens', lemma_udf(col('tokens_lemma')))
refined_df4 = clean_df.select(['paper_id','tags','refined_tokens','cleaned_tokens'])
#refined_df4.show()

# Remocion de caracteres no alfanumericos
spec_char_udf = udf(lambda row: [re.sub(r'[^A-Za-z0-9]+','',w) for w in row], ArrayType(StringType()))

clean_df2 = refined_df4.withColumn('cleaned2_tokens', spec_char_udf(col('cleaned_tokens')))
refined_df5 = clean_df2.select(['paper_id','tags','cleaned2_tokens'])
refined_df5.show() 

+--------------------+------------+--------------------+
|            paper_id|        tags|     cleaned2_tokens|
+--------------------+------------+--------------------+
|004f0f8bb66cf4466...|  incubation|[introduction, , ...|
|00c75478b9f6b815f...|        risk|[dear, editor, , ...|
|00fddd1ce0dae8535...|        risk|[, , n, islamic, ...|
|01686b614a614913b...|        risk|[aerosol, , saliv...|
|01a5049f7f6965eac...|transmission|[dear, editor, , ...|
|022fbc7bea2e25340...|        risk|[correspondence, ...|
|023782486e5eef9d9...|  infections|[robert, walgate,...|
|0244a8c0153dcbe7d...|transmission|[, , east, respir...|
|024b30561568979f5...|  infections|[introduction, , ...|
|028a4948d8e10f9ec...|        risk|[jaad, online, no...|
|02c2c01e1908658a0...|        risk|[content, list, a...|
|03345c814dbe72221...|transmission|[sir, model, deta...|
|0401e2a525cc6eeb5...|        risk|[, , author, noth...|
|044d1e54d0a62dcd6...|        risk|[content, list, a...|
|04bf954dd55e2ee7e...|    vacci

### Conteo de tokens con CountVectorizer y CountVectorizerModel 

Esta función la utilizalamos para ayudar a convertir una colección de documentos de texto en vectores de **conteos** de tokens.
Cuando un diccionario apriori no está disponible, CountVectorizer puede usarse como un estimator para extraer 
el vocabulario y generar el modelo CountVectorizerModel nos permitirá ver las representaciones dispersas para los documentos 
sobre el vocabulario, que luego se pueden pasar a otros algoritmos, adicionalmente con el parametro minDF en 3 aprovechamos que las palabras que no aparecen ni en un 3% en los textos los descartaremos.

In [17]:
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="cleaned2_tokens", outputCol="countVectotizer", minDF = 3)

tokens_dep = refined_df5.select(['cleaned2_tokens','tags'])
modelCountVectorizer = cv.fit(tokens_dep)
result = modelCountVectorizer.transform(tokens_dep)
refined_df6 = result.select(['tags','cleaned2_tokens','countVectotizer'])
result.show(10)

+--------------------+------------+--------------------+
|     cleaned2_tokens|        tags|     countVectotizer|
+--------------------+------------+--------------------+
|[introduction, , ...|  incubation|(7605,[0,3,4,7,11...|
|[dear, editor, , ...|        risk|(7605,[0,1,2,4,6,...|
|[, , n, islamic, ...|        risk|(7605,[0,1,2,4,5,...|
|[aerosol, , saliv...|        risk|(7605,[0,1,2,6,7,...|
|[dear, editor, , ...|transmission|(7605,[0,2,6,8,9,...|
|[correspondence, ...|        risk|(7605,[0,1,2,4,5,...|
|[robert, walgate,...|  infections|(7605,[0,1,4,5,7,...|
|[, , east, respir...|transmission|(7605,[0,2,4,5,6,...|
|[introduction, , ...|  infections|(7605,[0,2,3,4,5,...|
|[jaad, online, no...|        risk|(7605,[0,1,2,3,4,...|
+--------------------+------------+--------------------+
only showing top 10 rows



In [18]:
# TF - IDF
from pyspark.ml.feature import HashingTF, IDF

hashingTF = HashingTF(inputCol="cleaned2_tokens", outputCol="rawFeatures")
featurizedData = hashingTF.transform(result)
#featurizedData.show(10)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.show(10)

+--------------------+------------+--------------------+--------------------+--------------------+
|     cleaned2_tokens|        tags|     countVectotizer|         rawFeatures|            features|
+--------------------+------------+--------------------+--------------------+--------------------+
|[introduction, , ...|  incubation|(7605,[0,3,4,7,11...|(262144,[836,1769...|(262144,[836,1769...|
|[dear, editor, , ...|        risk|(7605,[0,1,2,4,6,...|(262144,[666,1769...|(262144,[666,1769...|
|[, , n, islamic, ...|        risk|(7605,[0,1,2,4,5,...|(262144,[687,2472...|(262144,[687,2472...|
|[aerosol, , saliv...|        risk|(7605,[0,1,2,6,7,...|(262144,[14,666,3...|(262144,[14,666,3...|
|[dear, editor, , ...|transmission|(7605,[0,2,6,8,9,...|(262144,[1731,176...|(262144,[1731,176...|
|[correspondence, ...|        risk|(7605,[0,1,2,4,5,...|(262144,[813,2710...|(262144,[813,2710...|
|[robert, walgate,...|  infections|(7605,[0,1,4,5,7,...|(262144,[666,1156...|(262144,[666,1156...|
|[, , east

### Word2Vec

Word2Vec al ser un estimador que toma secuencias de palabras que representan documentos, con la funcion Word2VecModel entrenamos un modelo que asigna cada palabra a un vector único de tamaño fijo utilizando el promedio de todas las palabras en el documento, este vector se puede usar como características para predicciones o cálculos de similitud de documentos.

In [36]:
# Word2vec (Opcional)
from pyspark.ml.feature import Word2Vec

word2Vec = Word2Vec(vectorSize=3, minCount=0, inputCol="cleaned2_tokens", outputCol="word2Vec")
modelWord2Vec = word2Vec.fit(result)
result2 = modelWord2Vec.transform(result)
result2.show(10)

Vectors = modelWord2Vec.getVectors()
Vectors.show(10)

+--------------------+------------+--------------------+--------------------+
|     cleaned2_tokens|        tags|     countVectotizer|            word2Vec|
+--------------------+------------+--------------------+--------------------+
|[introduction, , ...|  incubation|(7605,[0,3,4,7,11...|[0.22456058674946...|
|[dear, editor, , ...|        risk|(7605,[0,1,2,4,6,...|[0.20205751460577...|
|[, , n, islamic, ...|        risk|(7605,[0,1,2,4,5,...|[0.22655495013870...|
|[aerosol, , saliv...|        risk|(7605,[0,1,2,6,7,...|[0.15916733699970...|
|[dear, editor, , ...|transmission|(7605,[0,2,6,8,9,...|[0.19882603538939...|
|[correspondence, ...|        risk|(7605,[0,1,2,4,5,...|[0.23686730223547...|
|[robert, walgate,...|  infections|(7605,[0,1,4,5,7,...|[0.20553478638240...|
|[, , east, respir...|transmission|(7605,[0,2,4,5,6,...|[0.19253316088884...|
|[introduction, , ...|  infections|(7605,[0,2,3,4,5,...|[0.22448527269232...|
|[jaad, online, no...|        risk|(7605,[0,1,2,3,4,...|[0.20141

### Modelos de Clasificación

Para este problema de clasificación multinomial en donde buscamos identificar la categoria a la que pertenecen los Papers sobre el covid y asi simplificar las busquedas que usuarios esten interesados en encontrar, colocamos a competir dos algoritmos de clasificación, un Nayve Bayer y un Random Forest los cuales tienen una naturaleza muy diferentes pues mientras uno utiliza probabilidades condicionales a través de las transformaciones previas de los textos el otro tiene una arquitectura de arboles los cuales se rigen bajo impureza y criterios como el de Gini para finalmente tomar un promedio de arboles y tener estimaciones de las probabilidades para cada categoria que asignamos en cada Paper para los conjuntos training y test.

Reuniendo el preprocesamiento en un pipeline bajo la libreria Pipeline, unimos los features finales que serviran para la clasificación tomando un muestreo (80%/20%) respectivamente para entrenamiento y evaluacion. Dadas las limitaciones en procesamiento, entrenamos con una muestra de 1478 papers.

In [19]:
# Modelo de clasificación

# Indices

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "tags", outputCol = "label")
pipeline = Pipeline(stages=[cv,hashingTF,idf,label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(refined_df5)
refined_df6 = pipelineFit.transform(refined_df5)
refined_df6.show(5)

+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+
|            paper_id|        tags|     cleaned2_tokens|     countVectotizer|         rawFeatures|            features|label|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+-----+
|004f0f8bb66cf4466...|  incubation|[introduction, , ...|(7605,[0,3,4,7,11...|(262144,[836,1769...|(262144,[836,1769...|  3.0|
|00c75478b9f6b815f...|        risk|[dear, editor, , ...|(7605,[0,1,2,4,6,...|(262144,[666,1769...|(262144,[666,1769...|  0.0|
|00fddd1ce0dae8535...|        risk|[, , n, islamic, ...|(7605,[0,1,2,4,5,...|(262144,[687,2472...|(262144,[687,2472...|  0.0|
|01686b614a614913b...|        risk|[aerosol, , saliv...|(7605,[0,1,2,6,7,...|(262144,[14,666,3...|(262144,[14,666,3...|  0.0|
|01a5049f7f6965eac...|transmission|[dear, editor, , ...|(7605,[0,2,6,8,9,...|(262144,[1731,176...|(262144,[1731,176...

In [20]:
# semilla para reproducir
#trainingData = refined_df6
(trainingData, testData) = refined_df6.randomSplit([0.7, 0.3], seed = 1000)
#print("Training Dataset Count: " + str(trainingData.count()))
#print("Test Dataset Count: " + str(testData.count()))

#trainingData.printSchema()

#refined_df6

In [21]:
# Nayve Bayes

from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Modelo de entrenamiento
nb = NaiveBayes(smoothing=1)
model2 = nb.fit(trainingData)
predictions2 = model2.transform(testData)
predictions2.filter(predictions2['prediction'] == 0).select("paper_id","cleaned2_tokens","tags","probability","label","prediction").orderBy("probability", ascending=False).show(n = 10, truncate = 30)

# Evaluador de precisión de modelo
evaluator2 = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator2.evaluate(predictions2)  

+------------------------------+------------------------------+------------+------------------------------+-----+----------+
|                      paper_id|               cleaned2_tokens|        tags|                   probability|label|prediction|
+------------------------------+------------------------------+------------+------------------------------+-----+----------+
|a81191091dd2de0cfa93a7869b6...|[, , address, need, populat...|  infections|[1.0,5.821152268582547E-17,...|  2.0|       0.0|
|9ede8bf72b414ce35b05828c6ed...|[, , infectious, disease, o...|  incubation|[1.0,5.0289789898927E-18,1....|  3.0|       0.0|
|1bed4f8a5412cfa315b0936705a...|[, , virus, positive, sense...|transmission|[1.0,2.269125438030524E-21,...|  1.0|       0.0|
|f4a58859fbef59081909da083e0...|[, , dear, editor, read, gr...|  incubation|[1.0,1.318608689196907E-21,...|  3.0|       0.0|
|10c8214c3c721e2a56dae312e8e...|[inuit, community, beat, co...|        risk|[1.0,1.2184645096461142E-22...|  0.0|       0.0|


0.4247730395079629

In [22]:
# RF

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", featuresCol="rawFeatures", numTrees = 100, maxDepth = 4, maxBins = 32)

# Modelo de entrenamiento
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("paper_id","cleaned2_tokens","tags","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

# Evaluador de precisión de modelo
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions) 

+------------------------------+------------------------------+----+------------------------------+-----+----------+
|                      paper_id|               cleaned2_tokens|tags|                   probability|label|prediction|
+------------------------------+------------------------------+----+------------------------------+-----+----------+
|5238852a9879f7f06255ffbeb5a...|[sir, , time, writing, lett...|risk|[0.5455564037754126,0.24373...|  0.0|       0.0|
|e4f808dcd1e97c1941d3709a2b6...|[, , global, pandemic, covi...|risk|[0.5430719666341247,0.24314...|  0.0|       0.0|
|ff4734ba10d0f14431cfc028cbd...|[, , , responsible, covid19...|risk|[0.5385969930716061,0.25131...|  0.0|       0.0|
|94b76eafee4f06be376edc6d432...|[correspondence, , reply, d...|risk|[0.5378213485349316,0.24674...|  0.0|       0.0|
|27e8e8746befaf798daed5fc73a...|[, , covid19, rapidly, expa...|risk|[0.5346771689334063,0.25514...|  0.0|       0.0|
|82945a2379f35c0c5f7b6d6a6a2...|[, , date, coronavirus, sar...|r

0.26035920027109455

Para evaluar los resultados utilizamos el método MulticlassClassificationEvaluator, el cual captura una metrica entre la precisión y el recall pero que no nos muestra valores explicitos para cada valor tenido en cuenta en la metrica, simplemente evalua las salidas del modelo con las etiquetas respectivas en el conjunto de Test, medidas como el ROC son algo mas tediosas de construir en Spark en comparación a R o Python.
Tomando como mejor clasificador el Nayve Bayes, cabe destacar que los modelos de clasificación de textos cuando se trata de libros o papers extensos suelen tener una precision media baja en general y la espectativa de tener precisiones mayores al 70% como en clasificaciones de features menos dispersas es reduce.

### Evaluación de nuevos textos

Tomamos una muestra de 2758 papers completamente nuevos 7 aplicamos el modelo 2 (Nayve Bayes), cabe resaltar un detalle en el preprocesamiento de los nuevos textos, el cual es un limitante de la libreria en la que se realizó el clasificador pues al comparar otras librerias como Caret de R o SciKit Learn de Python, estos ya tienen en cuenta las funciones de preprocesamiento realizadas y se aplican directamente a los textos, procedemos a realizar el preprocesamiento:

In [23]:
## Path y lectura de archivos

dir_new = 'C:/Users/LENOVO/Documents/Maestría/Semestre I/Almacenamiento y Recuperacion de Informacion/Taller3/Script/papers_nuevos/'
files_news = load_files(dir_new)
df_news = generate_clean_df(files_news)

HBox(children=(FloatProgress(value=0.0, max=2759.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=2759.0), HTML(value='')))




In [27]:
spark_df_new = sql.createDataFrame(df_news)


In [29]:
tokenization_new=Tokenizer(inputCol='text',outputCol='tokens')
tokenized_df_new=tokenization_new.transform(spark_df_new)

stopword_removal_new=StopWordsRemover(inputCol='tokens',outputCol='refined_tokens')
refined_df2_new=stopword_removal_new.transform(tokenized_df_new)

lemma_df_new = refined_df2_new.withColumn('tokens_lemma', lemma_udf(col('refined_tokens')))
refined_df3_new = lemma_df_new.select(['paper_id','refined_tokens','tokens_lemma'])

clean_df_new = refined_df3_new.withColumn('cleaned_tokens', lemma_udf(col('tokens_lemma')))
refined_df4_new = clean_df_new.select(['paper_id','refined_tokens','cleaned_tokens'])

clean_df2_new = refined_df4.withColumn('cleaned2_tokens', spec_char_udf(col('cleaned_tokens')))
refined_df5_new = clean_df2_new.select(['paper_id','cleaned2_tokens'])

In [30]:
cv_new = CountVectorizer(inputCol="cleaned2_tokens", outputCol="countVectotizer", minDF = 3)

tokens_dep_new = refined_df5_new.select(['cleaned2_tokens'])
modelCountVectorizer_new = cv_new.fit(tokens_dep_new)
result_new = modelCountVectorizer_new.transform(tokens_dep_new)
refined_df6_new = result_new.select(['cleaned2_tokens','countVectotizer'])

hashingTF_new = HashingTF(inputCol="cleaned2_tokens", outputCol="rawFeatures")
featurizedData_new = hashingTF_new.transform(result_new)

idf_new = IDF(inputCol="rawFeatures", outputCol="features")
idfModel_new = idf_new.fit(featurizedData_new)
rescaledData_new = idfModel_new.transform(featurizedData_new)

In [31]:
pipeline_new = Pipeline(stages=[cv_new,hashingTF_new,idf_new])

pipelineFit_new = pipeline_new.fit(refined_df5_new)
refined_df6_new = pipelineFit_new.transform(refined_df5_new)
refined_df6_new.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+
|            paper_id|     cleaned2_tokens|     countVectotizer|         rawFeatures|            features|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|004f0f8bb66cf4466...|[introduction, , ...|(7605,[0,3,4,7,11...|(262144,[836,1769...|(262144,[836,1769...|
|00c75478b9f6b815f...|[dear, editor, , ...|(7605,[0,1,2,4,6,...|(262144,[666,1769...|(262144,[666,1769...|
|00fddd1ce0dae8535...|[, , n, islamic, ...|(7605,[0,1,2,4,5,...|(262144,[687,2472...|(262144,[687,2472...|
|01686b614a614913b...|[aerosol, , saliv...|(7605,[0,1,2,6,7,...|(262144,[14,666,3...|(262144,[14,666,3...|
|01a5049f7f6965eac...|[dear, editor, , ...|(7605,[0,2,6,8,9,...|(262144,[1731,176...|(262144,[1731,176...|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [33]:
predictions_new = model2.transform(refined_df6_new)


### Papers nuevos con su etiqueta respectiva basadas en el modelo de clasificación

In [35]:
predictions_new.select('paper_id', 'features', 'probability','prediction').show(100)

+--------------------+--------------------+--------------------+----------+
|            paper_id|            features|         probability|prediction|
+--------------------+--------------------+--------------------+----------+
|004f0f8bb66cf4466...|(262144,[836,1769...|[1.0,3.0644112888...|       0.0|
|00c75478b9f6b815f...|(262144,[666,1769...|[1.0,0.0,0.0,0.0,...|       0.0|
|00fddd1ce0dae8535...|(262144,[687,2472...|[1.0,2.3303642305...|       0.0|
|01686b614a614913b...|(262144,[14,666,3...|[1.0,0.0,0.0,0.0,...|       0.0|
|01a5049f7f6965eac...|(262144,[1731,176...|[1.38434738329597...|       1.0|
|022fbc7bea2e25340...|(262144,[813,2710...|[1.0,8.5831504908...|       0.0|
|023782486e5eef9d9...|(262144,[666,1156...|[2.83591685679528...|       2.0|
|0244a8c0153dcbe7d...|(262144,[2938,302...|[3.48407404034288...|       1.0|
|024b30561568979f5...|(262144,[590,1330...|[1.0,1.1289964771...|       0.0|
|028a4948d8e10f9ec...|(262144,[2326,252...|[1.0,1.1338959728...|       0.0|
|02c2c01e190