#24. Dado un tamaño de vocabulario parametrizable y una lista de stopwords también parametrizable implemente tf-IDF para los textos de los contenidos de forma distribuida. Debe obtener un vector por cada texto (⭐⭐⭐).

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# create the Spark Session
spark = SparkSession.builder.getOrCreate()

# create the Spark Context
sc = spark.sparkContext

In [None]:
df = spark.read.csv('/content/drive/MyDrive/Datasets/contents_text_sample.csv', header=True, inferSchema=True, multiLine=True, escape='"')

In [None]:
text_rdd = df.rdd

In [None]:
text_rdd

MapPartitionsRDD[8] at javaToPython at NativeMethodAccessorImpl.java:0

In [None]:
text_rdd.getNumPartitions()

1

In [None]:
text_rdd.take(2)

Uso la libreria nltk para importarme un listado de stopwords en español

In [None]:
import nltk
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('stopwords')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


True

In [None]:
from nltk.corpus import stopwords

In [None]:
stopwordsEsp = stopwords.words('spanish')
stopwordsEsp

In [None]:
from nltk.tokenize import word_tokenize

Armo un RDD conformado por tuplas en donde esta el texto y luego un listado de palabras del texto

In [None]:
textos_filtrados_rdd = text_rdd.filter(lambda x: x[2] != None).map(lambda x: (x[2] ,word_tokenize(x[2], language='spanish')))

In [None]:
cant_textos = textos_filtrados_rdd.count()

In [None]:
cant_textos

In [None]:
textos_filtrados_rdd.take(2)

Me quedo con las palabras validas, estas son las que tienen caracteres del abecedario, y las que no integran el listado de stopwords. Además, las convierto a minuscula. Cacheo este RDD porque lo voy a usar varias veces

In [None]:
def palabras_validas (textos_con_palabras_invalidas, stopwords):
  palabras_validas = []
  listado_palabras = textos_con_palabras_invalidas[1]
  texto = textos_con_palabras_invalidas[0]
  for palabra in listado_palabras:
    if palabra.isalpha():
      palabra_minusc = palabra.lower()
      if palabra_minusc not in stopwords:
        palabras_validas.append(palabra_minusc)

  return (texto, palabras_validas)

In [None]:
textos_con_palabras_rdd = textos_filtrados_rdd.map(lambda x: palabras_validas(x, stopwordsEsp)).cache()

Para seleccionar las n features que integraran mi vector cuento cuantas veces aparece cada palabra, y me quedo con las n que más aparezcan en todos mis textos

In [None]:
palabras_que_mas_aparecen = textos_con_palabras_rdd.map(lambda x: list(x[1])).flatMap(lambda x: x).map(lambda x: (x, 1)).countByKey()

In [None]:
def seleccionar_n_palabras_a_usar(palabras, n):
  palabras_a_usar = list(sorted(palabras, key=palabras.get, reverse=True))[:n]
  return palabras_a_usar

palabras_a_usar = seleccionar_n_palabras_a_usar(palabras_que_mas_aparecen, 20)

Armo un RDD que este integrado por (texto, diccionario de cantidad). El diccionario de cantidad indica cuantas veces aparece cada palabra en el texto.

In [None]:
def cant_palabra_por_texto(texto_y_palabras):
  cant_por_palabra = {}
  for palabra in texto_y_palabras[1]:
    if palabra in cant_por_palabra:
      cant_por_palabra[palabra] +=1
    else:
      cant_por_palabra[palabra] = 1

  return(texto_y_palabras[0], cant_por_palabra)

In [None]:
frec_palabra_por_texto_rdd = textos_con_palabras_rdd.map(cant_palabra_por_texto)

In [None]:
frec_palabra_por_texto_rdd.take(2)

Ahora puedo calcular el tf

In [None]:
def calcular_tf(texto_y_palabras, palabras_a_usar):
  tf = {}
  palabras_por_cant = texto_y_palabras[1]
  cantidad_de_palabras = len(palabras_por_cant)
  for palabra in palabras_a_usar: #Lo calculo solo para las palabras que me interesan
    if palabra in palabras_por_cant:
      tf[palabra] = palabras_por_cant[palabra] / cantidad_de_palabras
    else:
      tf[palabra] = 0

  return (texto_y_palabras[0], tf)

In [None]:
tf_por_palabra_rdd = frec_palabra_por_texto_rdd.map(lambda x: calcular_tf(x, palabras_a_usar))

In [None]:
tf_por_palabra_rdd.take(2)

Calculo las apariciones de palabras por texto. Es decir, si una palabra aparece dos veces en un texto solo la contabilizo una única vez para ese texto. Solo lo cuento para las n palabras que voy a usar

In [None]:
apariciones_palabras = textos_con_palabras_rdd.map(lambda x: set(x[1])).flatMap(lambda x: x).filter(lambda x: x in palabras_a_usar).map(lambda x: (x, 1)).countByKey()

In [None]:
apariciones_palabras

defaultdict(int,
            {'categoría': 104785,
             'ref': 61378,
             'https': 32430,
             'ficha': 65187,
             'the': 26927,
             'of': 34339,
             'web': 29843,
             'http': 46199,
             'cita': 38310,
             'año': 30495,
             'años': 26861,
             'archivo': 25467,
             'br': 22798,
             'nbsp': 12969,
             'small': 12060,
             'center': 16843,
             'unidos': 18547,
             'background': 6446,
             'club': 9424,
             'usuario': 1916})

Calculo el IDF

In [None]:
import math
def calcular_idf(cant_palabras_en_textos, cant_textos, palabras_a_usar):
  idf_palabra = {}
  for palabra in palabras_a_usar: #Lo calculo solo para las palabras que me interesan
    if palabra in cant_palabras_en_textos:
      apariciones = cant_palabras_en_textos[palabra]
      idf_palabra[palabra] = math.log10((cant_textos + 1) / apariciones)
    else:
      idf_palabra[palabra] = 0

  return idf_palabra

In [None]:
idf_palabras = calcular_idf(apariciones_palabras, cant_textos, palabras_a_usar)

In [None]:
idf_palabras

{'ref': 0.5271196864257847,
 'categoría': 0.2948333015524862,
 'center': 1.088712970632922,
 'cita': 0.7318202672984468,
 'br': 0.9572356699592305,
 'the': 0.8849444489380297,
 'http': 0.6504998441405967,
 'of': 0.7793447755389064,
 'archivo': 0.9091546309799515,
 'background': 1.5058421181052088,
 'nbsp': 1.2022259290282105,
 'small': 1.2337851114773921,
 'usuario': 2.032736914538999,
 'club': 1.3408971418384983,
 'https': 0.8041854706085518,
 'web': 0.8402899403617035,
 'unidos': 1.046858747314909,
 'ficha': 0.5009714246414839,
 'año': 0.8309037815878048,
 'años': 0.8860102424306711}

Ahora puedo calcular el TFIDF. El RDD final queda como (texto, diccionario de TFIDF)

In [None]:
def calcular_tfidf(tf_palabras, idf):
  tfidf_por_palabra = {}
  for palabra in tf_palabras:
    tfidf_por_palabra[palabra] =  tf_palabras[palabra] * idf[palabra]

  return tfidf_por_palabra

In [None]:
tfid_rdd = tf_por_palabra_rdd.map(lambda x: (x[0], calcular_tfidf(x[1], idf_palabras)))

In [None]:
tfid_rdd.take(10)

[("{{Ficha de taxón\n| name = Anacardiáceas\n| image = Gui1 cashewfruit2.jpg\n| image_caption = ''[[Anacardium occidentale]]''\n| regnum = [[Plantae]]\n| divisio  = [[Magnoliophyta]]\n| classis  = [[Magnoliopsida]]\n| ordo = [[Sapindales]]\n| familia = '''Anacardiaceae'''\n| familia_authority = [[Robert Brown|R.Br.]]\n| subdivision_ranks = Géneros\n| subdivision = \nVer texto\n}}\n'''Anacardiaceae''' es una [[familia (biología)|familia]] de [[Plantae|plantas]] esencialmente [[árbol|arbóreas]] y [[arbusto|arbustivas]] perteneciente al [[Orden (biología)|orden]] [[Sapindales]]. La constituyen 77 [[género (biología)|géneros]] con unas 700 especies aceptadas, de las casi 3000 descritas, propias de países tropicales, cálidos y templados.<ref>[http://www.theplantlist.org/1.1/browse/A/Anacardiaceae/ Anacardiaceae en The Plant List, vers. 1.1, 2013]</ref>\n\n== Descripción ==\nSon árboles, arbustos, raramente subarbustos o [[trepadora]]s, frecuentemente con [[savia]] [[veneno]]sa. Son plantas 