# **Preprocesamiento de los datos**    

En esta etapa del desarrollo se evalúa la estructura del dataset, con el fin de definir y aplicar las trasnformaciones que sean requeridas para lograr una representación de características efectiva, que habilite el posterior modelamiento que de respuesta a la pregunta establecida como alcance del presente proyecto:    
\
*¿Cómo agrupar las reseñas escritas por los usuarios, para facilitar la comprensión de la percepción que se tiene sobre diferentes productos?*

In [1]:
!pip install pyspark
!apt install openjdk-11-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=4a955744d12048f18b6a0896aaffffc9b80806552f66d235649a17ab1ddb49cd
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
openjdk-11-jdk-headless is already the newest version (11.0.19+7~us1-0ubuntu1~20.04.1).
0 upgraded, 0 newly installed, 0 to remove and 24 not upgraded.


In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

## **Configuración y creación de la sesión de spark**

In [3]:
conf = SparkConf().set("spark.ui.port", "4050")

sc = SparkContext(conf = conf)
spark = SparkSession.builder.config("spark.driver.memory", "8g").getOrCreate()

spark

## **Extracción de Datos:**

In [4]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
PATH_VIDEOS = r"/content/drive/Shareddrives/MSc Ciencia Datos/Mineria/amazon_reviews_us_Video_v1_00.tsv"

In [6]:
raw_data = spark.read.csv(
    PATH_VIDEOS,
    header = True,
    multiLine = True,
    escape = '"',
    sep = "\t"
)

videos_data = raw_data.limit(100000)
print(f'Shape: {videos_data.count()}, {len(videos_data.columns)}')
videos_data.show()

Shape: 100000, 15
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   49033728|R1P1G5KZ05H6RD|6302503213|     748506413|The Night They Sa...|           Video|          5|            0|          0|   N|                Y|    Very satisfied!!|Fast shipping. Pl...| 2015-08-31|
|         US|   17857748|R106N066IUN8ZV|B000059PET|     478710180|Hamlet / Kline, N...|           Vide

In [7]:
videos_data.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: string (nullable = true)
 |-- total_votes: string (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



El atributo de análisis principal es el body review, pues de acuerdo al alcance del proyecto, el objetivo es lograr una agrupación de las reseñas que permita entender en mejor medida la percepción de los usuarios. Así pues, se seleccionan sólo las columnas ```review_id``` y ```review_body``` para los pasos siguientes.



In [8]:
reviews = videos_data.select('review_id', lower('review_body').alias('review_body'))
reviews.show(10, False)

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|review_id     |review_body                                                                                                                                                                                                      

## **Procesamiento y Representación de Características:**

### **Tokenización:**

In [9]:
from pyspark.ml.feature import Tokenizer

tokenizer = Tokenizer(inputCol = 'review_body', outputCol = "tokens")

reviews = tokenizer.transform(reviews)
reviews.show(5)

+--------------+--------------------+--------------------+
|     review_id|         review_body|              tokens|
+--------------+--------------------+--------------------+
|R1P1G5KZ05H6RD|fast shipping. pl...|[fast, shipping.,...|
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|
| R7WTAA1S5O7D9|         great movie|      [great, movie]|
|R32HFMVWLYOYJK|i love the martin...|[i, love, the, ma...|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|
+--------------+--------------------+--------------------+
only showing top 5 rows



### **Limpieza de tokens:**
Se remueven los caracteres especiales o signos de puntuación que puedan existir en los tokens.

In [10]:
import re
from pyspark.sql.types import ArrayType

def clean_token(tokens):
    tokens = [re.sub(r'[^A-Za-z0-9]+', '', w) for w in tokens]
    tokens = [w.lower() for w in tokens if len(w) > 0]
    tokens = [w for w in tokens if w.isalpha()]

    return tokens

clean_token_udf = udf(clean_token, ArrayType(StringType()))

In [11]:
reviews = reviews.withColumn('clean_tokens', clean_token_udf('tokens'))
reviews.show(5)

+--------------+--------------------+--------------------+--------------------+
|     review_id|         review_body|              tokens|        clean_tokens|
+--------------+--------------------+--------------------+--------------------+
|R1P1G5KZ05H6RD|fast shipping. pl...|[fast, shipping.,...|[fast, shipping, ...|
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|
| R7WTAA1S5O7D9|         great movie|      [great, movie]|      [great, movie]|
|R32HFMVWLYOYJK|i love the martin...|[i, love, the, ma...|[i, love, the, ma...|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, what, thi...|
+--------------+--------------------+--------------------+--------------------+
only showing top 5 rows



### **Remoción de Stopwords:**

In [12]:
from pyspark.ml.feature import StopWordsRemover

own_stop_words = [
    'br', 'film', 'movie', 'one',
    'movies', 'films'
]
own_stop_words.extend(StopWordsRemover().getStopWords())
own_stop_words = list(set(own_stop_words))

stopwords_remover = StopWordsRemover(inputCol = 'clean_tokens', outputCol = 'cleaned_tokens', stopWords = own_stop_words)
reviews = stopwords_remover.transform(reviews)
reviews.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|
+--------------+--------------------+--------------------+--------------------+--------------------+
|R1P1G5KZ05H6RD|fast shipping. pl...|[fast, shipping.,...|[fast, shipping, ...|[fast, shipping, ...|
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|[kevin, kline, ve...|
| R7WTAA1S5O7D9|         great movie|      [great, movie]|      [great, movie]|             [great]|
|R32HFMVWLYOYJK|i love the martin...|[i, love, the, ma...|[i, love, the, ma...|[love, martin, la...|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, what, thi...|[yknow, reminded,...|
+--------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



### **Aplicando Stemming:**

In [13]:
!pip install nltk

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [14]:
from nltk.stem import PorterStemmer

porter_stemmer = PorterStemmer()
stemmer_udf = udf(lambda x:  [porter_stemmer.stem(w) for w in x], ArrayType(StringType()))

In [15]:
reviews = reviews.withColumn('refined_tokens', stemmer_udf(('cleaned_tokens')))
reviews.show(10)

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|      refined_tokens|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|R1P1G5KZ05H6RD|fast shipping. pl...|[fast, shipping.,...|[fast, shipping, ...|[fast, shipping, ...|[fast, ship, plea...|
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|[kevin, kline, ve...|[kevin, kline, ve...|
| R7WTAA1S5O7D9|         great movie|      [great, movie]|      [great, movie]|             [great]|             [great]|
|R32HFMVWLYOYJK|i love the martin...|[i, love, the, ma...|[i, love, the, ma...|[love, martin, la...|[love, martin, la...|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, what, thi...|[yknow, reminded,...|[yknow, remind, s...|
|R1S3T3GWUGQTW7|wonderfu

### **Cantidad de tokens:**

In [16]:
from pyspark.sql.functions import size

reviews = reviews.withColumn('num_tokens', size('refined_tokens'))
reviews.show(5)

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|      refined_tokens|num_tokens|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|R1P1G5KZ05H6RD|fast shipping. pl...|[fast, shipping.,...|[fast, shipping, ...|[fast, shipping, ...|[fast, ship, plea...|         6|
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|[kevin, kline, ve...|[kevin, kline, ve...|        17|
| R7WTAA1S5O7D9|         great movie|      [great, movie]|      [great, movie]|             [great]|             [great]|         1|
|R32HFMVWLYOYJK|i love the martin...|[i, love, the, ma...|[i, love, the, ma...|[love, martin, la...|[love, martin, la...|         4|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, wha

In [17]:
from pyspark.sql.types import IntegerType

reviews = reviews.withColumn('num_tokens', reviews['num_tokens'].cast(IntegerType()))
reviews.select('num_tokens').summary("count", "min", "10%","25%", "75%", "90%", "max").show()

+-------+----------+
|summary|num_tokens|
+-------+----------+
|  count|    100000|
|    min|         0|
|    10%|         4|
|    25%|        10|
|    75%|        53|
|    90%|       129|
|    max|      4222|
+-------+----------+



Se aplica un filtrado de los reviews dependiendo de la cantidad de tokens que lo componen. Esto con el fin de tener documentos dicientes y que puedan ser comparables entre sí.

In [18]:
reviews_more_than_12 = reviews.filter(reviews['num_tokens'] >= 12)
reviews_filtered = reviews_more_than_12.filter(reviews['num_tokens'] <= 200)

print(f'Rows removed: {reviews.count() - reviews_filtered.count()}')

Rows removed: 33936


In [19]:
reviews_filtered.sort('num_tokens').limit(15).show()

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|      refined_tokens|num_tokens|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|R2TX07AYOIKX7W|love lily tomlin ...|[love, lily, toml...|[love, lily, toml...|[love, lily, toml...|[love, lili, toml...|        12|
|R3NK7014K996PF|was soon looking ...|[was, soon, looki...|[was, soon, looki...|[soon, looking, f...|[soon, look, forw...|        12|
| RECPIN4UZI76Z|honestly i didn't...|[honestly, i, did...|[honestly, i, did...|[honestly, didnt,...|[honestli, didnt,...|        12|
|R3FE5KO613PMVI|movie is great - ...|[movie, is, great...|[movie, is, great...|[great, problem, ...|[great, problem, ...|        12|
|R3J2CP92BEA9ZH|loved this pre an...|[loved, this, pre...|[loved, thi

## **Representación de Características:**

In [20]:
from pyspark.ml.feature import HashingTF, IDF

#  Hashing tf to represent tokens in hash values
NUM_HASHING_TF_FEATURES = 200

hashing_tf = HashingTF(
    inputCol = 'refined_tokens',
    outputCol = 'tf_features',
    numFeatures = NUM_HASHING_TF_FEATURES
)

reviews_filtered = hashing_tf.transform(reviews_filtered)

#   IDF to get numerical values from hashes
idf = IDF(inputCol = "tf_features", outputCol = "features")
idfModel = idf.fit(reviews_filtered)
reviews_filtered = idfModel.transform(reviews_filtered)

In [21]:
reviews_filtered.cache()

DataFrame[review_id: string, review_body: string, tokens: array<string>, clean_tokens: array<string>, cleaned_tokens: array<string>, refined_tokens: array<string>, num_tokens: int, tf_features: vector, features: vector]

In [22]:
reviews_filtered.show(10)

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|      refined_tokens|num_tokens|         tf_features|            features|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|[kevin, kline, ve...|[kevin, kline, ve...|        17|(200,[2,26,40,60,...|(200,[2,26,40,60,...|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, what, thi...|[yknow, reminded,...|[yknow, remind, s...|        63|(200,[0,2,6,11,22...|(200,[0,2,6,11,22...|
| RGORN81H45NI7|this is the best ...|[this, is, the, b...|[this, is, the, b...|[best, exercise, ...|[best, exercis, v...|    

In [23]:
from pyspark.sql.functions import monotonically_increasing_id

reviews_filtered = reviews_filtered.withColumn('uid', monotonically_increasing_id())
reviews_filtered.show(10)

+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+---+
|     review_id|         review_body|              tokens|        clean_tokens|      cleaned_tokens|      refined_tokens|num_tokens|         tf_features|            features|uid|
+--------------+--------------------+--------------------+--------------------+--------------------+--------------------+----------+--------------------+--------------------+---+
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, is...|[kevin, kline, is...|[kevin, kline, ve...|[kevin, kline, ve...|        17|(200,[2,26,40,60,...|(200,[2,26,40,60,...|  0|
| RWT3H6HBVAL6G|y'know what this ...|[y'know, what, th...|[yknow, what, thi...|[yknow, reminded,...|[yknow, remind, s...|        63|(200,[0,2,6,11,22...|(200,[0,2,6,11,22...|  1|
| RGORN81H45NI7|this is the best ...|[this, is, the, b...|[this, is, the, b...|[best, exercise, ...|[best

### **Escritura en disco del resultado:**

In [27]:
PATH_FEATURES_OUT = r"/content/drive/Shareddrives/MSc Ciencia Datos/Mineria/features/"
PATH_FEATURES_FILE = r"amazon_features_reviews_small.parquet"
reviews_filtered.select('review_id', 'review_body', 'refined_tokens', 'num_tokens', 'tf_features', 'features', 'uid')\
    .write.mode('overwrite').parquet(PATH_FEATURES_OUT + PATH_FEATURES_FILE)

print(f'File saved into {PATH_FEATURES_OUT + PATH_FEATURES_FILE}')

File saved into /content/drive/Shareddrives/MSc Ciencia Datos/Mineria/features/amazon_features_reviews_small.parquet


Join

In [25]:
final_reviews_to_analyze = reviews_filtered.select('review_id', 'review_body', 'refined_tokens', 'num_tokens', 'tf_features', 'features', 'uid')\
    .join(
        videos_data.select("review_id", "product_id", "star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase", "review_date"),
        "review_id", how = 'left'
)
final_reviews_to_analyze.show(10)

+--------------+--------------------+--------------------+----------+--------------------+--------------------+---+----------+-----------+-------------+-----------+----+-----------------+-----------+
|     review_id|         review_body|      refined_tokens|num_tokens|         tf_features|            features|uid|product_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|review_date|
+--------------+--------------------+--------------------+----------+--------------------+--------------------+---+----------+-----------+-------------+-----------+----+-----------------+-----------+
|R106N066IUN8ZV|kevin kline is th...|[kevin, kline, ve...|        17|(200,[2,26,40,60,...|(200,[2,26,40,60,...|  0|B000059PET|          5|            0|          0|   N|                Y| 2015-08-31|
|R1STPCBFT9UL3D|this video enable...|[video, enabl, cl...|        13|(200,[7,15,29,74,...|(200,[7,15,29,74,...|  9|B000NPO5QW|          5|            0|          0|   N|                N| 2015-08-31|


In [26]:
PATH_REVIEWS_OUT = r"/content/drive/Shareddrives/MSc Ciencia Datos/Mineria/data/"
PATH_REVIEWS_FILE = r"processed_reviews.parquet"
final_reviews_to_analyze.write.mode('overwrite').parquet(PATH_REVIEWS_OUT + PATH_REVIEWS_FILE)

print(f'File saved into {PATH_REVIEWS_OUT + PATH_REVIEWS_FILE}')

File saved into /content/drive/Shareddrives/MSc Ciencia Datos/Mineria/data/processed_reviews.parquet
